diff --git a/cmd/compute_qemu/events/events.go b/cmd/compute_qemu/events/events.go new file mode 100644 index 0000000..c6076f6 --- /dev/null +++ b/cmd/compute_qemu/events/events.go @@ -0,0 +1,240 @@ +package events + +import ( + "context" + "encoding/json" + "encoding/xml" + "fmt" + "log" + "strings" + "time" + + "deevirt.fr/compute/pkg/amqp" + "deevirt.fr/compute/pkg/config" + "deevirt.fr/compute/pkg/schema" + clientv3 "go.etcd.io/etcd/client/v3" + "libvirt.org/go/libvirt" +) + +func AgentLifecycle(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventAgentLifecycle) { + println(event.State) + println(event.Reason) + +} + +func Graphics(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventGraphics) { + println(event.String()) +} + +func JobCompleted(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventJobCompleted) { + println(e.Info.DataRemaining) +} + +func MigrationIteration(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventMigrationIteration) { + println(e.Iteration) +} + +func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifecycle) { + var detail, event string + config, _ := config.NewConfig() + domainID, _ := d.GetUUIDString() + + etcd, err := clientv3.New(clientv3.Config{ + Endpoints: strings.Split(config.EtcdURI, ","), + DialTimeout: 5 * time.Second, + }) + if err != nil { + log.Fatalf("Error connexion to etcd: %v", err) + } + defer etcd.Close() + + switch e.Event { + case libvirt.DOMAIN_EVENT_DEFINED: + event = "defined" + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + etcd.Put(ctx, "/cluster/"+config.ClusterID+"/host/"+config.NodeID+"/qemu/"+domainID, "") + cancel() + + switch libvirt.DomainEventDefinedDetailType(e.Detail) { + case libvirt.DOMAIN_EVENT_DEFINED_ADDED: + detail = "added" + case libvirt.DOMAIN_EVENT_DEFINED_UPDATED: + detail = "updated" + case libvirt.DOMAIN_EVENT_DEFINED_RENAMED: + detail = "renamed" + case libvirt.DOMAIN_EVENT_DEFINED_FROM_SNAPSHOT: + detail = "snapshot" + default: + detail = "unknown" + } + + case libvirt.DOMAIN_EVENT_UNDEFINED: + event = "undefined" + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + etcd.Delete(ctx, "/cluster/"+config.ClusterID+"/host/"+config.NodeID+"/qemu/"+domainID) + cancel() + + switch libvirt.DomainEventUndefinedDetailType(e.Detail) { + case libvirt.DOMAIN_EVENT_UNDEFINED_REMOVED: + detail = "removed" + case libvirt.DOMAIN_EVENT_UNDEFINED_RENAMED: + detail = "renamed" + default: + detail = "unknown" + } + + case libvirt.DOMAIN_EVENT_STARTED: + event = "started" + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "2") + cancel() + + switch libvirt.DomainEventStartedDetailType(e.Detail) { + case libvirt.DOMAIN_EVENT_STARTED_BOOTED: + detail = "booted" + case libvirt.DOMAIN_EVENT_STARTED_MIGRATED: + detail = "migrated" + case libvirt.DOMAIN_EVENT_STARTED_RESTORED: + detail = "restored" + case libvirt.DOMAIN_EVENT_STARTED_FROM_SNAPSHOT: + detail = "snapshot" + case libvirt.DOMAIN_EVENT_STARTED_WAKEUP: + detail = "wakeup" + default: + detail = "unknown" + } + + case libvirt.DOMAIN_EVENT_SUSPENDED: + event = "suspended" + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "3") + cancel() + + switch libvirt.DomainEventSuspendedDetailType(e.Detail) { + case libvirt.DOMAIN_EVENT_SUSPENDED_PAUSED: + detail = "paused" + case libvirt.DOMAIN_EVENT_SUSPENDED_MIGRATED: + detail = "migrated" + case libvirt.DOMAIN_EVENT_SUSPENDED_IOERROR: + detail = "I/O error" + case libvirt.DOMAIN_EVENT_SUSPENDED_WATCHDOG: + detail = "watchdog" + case libvirt.DOMAIN_EVENT_SUSPENDED_RESTORED: + detail = "restored" + case libvirt.DOMAIN_EVENT_SUSPENDED_FROM_SNAPSHOT: + detail = "snapshot" + case libvirt.DOMAIN_EVENT_SUSPENDED_API_ERROR: + detail = "api error" + case libvirt.DOMAIN_EVENT_SUSPENDED_POSTCOPY: + detail = "postcopy" + case libvirt.DOMAIN_EVENT_SUSPENDED_POSTCOPY_FAILED: + detail = "postcopy failed" + default: + detail = "unknown" + } + + case libvirt.DOMAIN_EVENT_RESUMED: + event = "resumed" + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "4") + cancel() + + switch libvirt.DomainEventResumedDetailType(e.Detail) { + case libvirt.DOMAIN_EVENT_RESUMED_UNPAUSED: + detail = "unpaused" + case libvirt.DOMAIN_EVENT_RESUMED_MIGRATED: + detail = "migrated" + case libvirt.DOMAIN_EVENT_RESUMED_FROM_SNAPSHOT: + detail = "snapshot" + case libvirt.DOMAIN_EVENT_RESUMED_POSTCOPY: + detail = "postcopy" + case libvirt.DOMAIN_EVENT_RESUMED_POSTCOPY_FAILED: + detail = "postcopy failed" + default: + detail = "unknown" + } + + case libvirt.DOMAIN_EVENT_STOPPED: + event = "stopped" + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "5") + cancel() + + switch libvirt.DomainEventStoppedDetailType(e.Detail) { + case libvirt.DOMAIN_EVENT_STOPPED_SHUTDOWN: + detail = "shutdown" + case libvirt.DOMAIN_EVENT_STOPPED_DESTROYED: + detail = "destroyed" + case libvirt.DOMAIN_EVENT_STOPPED_CRASHED: + detail = "crashed" + case libvirt.DOMAIN_EVENT_STOPPED_MIGRATED: + detail = "migrated" + case libvirt.DOMAIN_EVENT_STOPPED_SAVED: + detail = "saved" + case libvirt.DOMAIN_EVENT_STOPPED_FAILED: + detail = "failed" + case libvirt.DOMAIN_EVENT_STOPPED_FROM_SNAPSHOT: + detail = "snapshot" + default: + detail = "unknown" + } + + case libvirt.DOMAIN_EVENT_SHUTDOWN: + event = "shutdown" + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "6") + cancel() + + switch libvirt.DomainEventShutdownDetailType(e.Detail) { + case libvirt.DOMAIN_EVENT_SHUTDOWN_FINISHED: + detail = "finished" + case libvirt.DOMAIN_EVENT_SHUTDOWN_GUEST: + detail = "guest" + case libvirt.DOMAIN_EVENT_SHUTDOWN_HOST: + detail = "host" + default: + detail = "unknown" + } + + default: + event = "unknown" + } + + // Send event for all clients + if e.Event != libvirt.DOMAIN_EVENT_DEFINED|libvirt.DOMAIN_EVENT_UNDEFINED { + xmlDesc, err := d.GetXMLDesc(0) + if err != nil { + log.Fatalln(err) + } + var desc schema.Domain + err = xml.Unmarshal([]byte(xmlDesc), &desc) + if err != nil { + log.Fatalln(err) + } + + state, _ := json.Marshal(&schema.DomainStateJSON{ + CompanyID: desc.Metadata.DeevirtInstance.DeevirtCompanyID, + DatacenterID: desc.Metadata.DeevirtInstance.DeevirtDatacenterID, + DomainID: domainID, + State: int64(e.Event), + }) + + a, _ := amqp.NewAMQP() + a.Publisher("vmcenter", + "events."+desc.Metadata.DeevirtInstance.DeevirtCompanyID+ + "."+desc.Metadata.DeevirtInstance.DeevirtDatacenterID+ + "."+domainID, + state) + a.Close() + } + + fmt.Printf("Domain event=%q detail=%q\n", event, detail) +} + +func Reboot(c *libvirt.Connect, d *libvirt.Domain) { + +} + +func Watchdog(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventWatchdog) { + println(event.String()) +}