package events import ( "context" "encoding/json" "encoding/xml" "fmt" "log" "strings" "time" "deevirt.fr/compute/pkg/amqp" "deevirt.fr/compute/pkg/config" "deevirt.fr/compute/pkg/schema" clientv3 "go.etcd.io/etcd/client/v3" "libvirt.org/go/libvirt" ) func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifecycle) { var detail, event string config, _ := config.NewConfig() domainID, _ := d.GetUUIDString() etcd, err := clientv3.New(clientv3.Config{ Endpoints: strings.Split(config.EtcdURI, ","), DialTimeout: 5 * time.Second, }) if err != nil { log.Fatalf("Error connexion to etcd: %v", err) } defer etcd.Close() switch e.Event { case libvirt.DOMAIN_EVENT_DEFINED: event = "defined" ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) etcd.Put(ctx, "/cluster/"+config.ClusterID+"/host/"+config.NodeID+"/qemu/"+domainID, "") cancel() switch libvirt.DomainEventDefinedDetailType(e.Detail) { case libvirt.DOMAIN_EVENT_DEFINED_ADDED: detail = "added" case libvirt.DOMAIN_EVENT_DEFINED_UPDATED: detail = "updated" case libvirt.DOMAIN_EVENT_DEFINED_RENAMED: detail = "renamed" case libvirt.DOMAIN_EVENT_DEFINED_FROM_SNAPSHOT: detail = "snapshot" default: detail = "unknown" } case libvirt.DOMAIN_EVENT_UNDEFINED: event = "undefined" ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) etcd.Delete(ctx, "/cluster/"+config.ClusterID+"/host/"+config.NodeID+"/qemu/"+domainID) cancel() switch libvirt.DomainEventUndefinedDetailType(e.Detail) { case libvirt.DOMAIN_EVENT_UNDEFINED_REMOVED: detail = "removed" case libvirt.DOMAIN_EVENT_UNDEFINED_RENAMED: detail = "renamed" default: detail = "unknown" } case libvirt.DOMAIN_EVENT_STARTED: event = "started" ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "2") cancel() switch libvirt.DomainEventStartedDetailType(e.Detail) { case libvirt.DOMAIN_EVENT_STARTED_BOOTED: detail = "booted" case libvirt.DOMAIN_EVENT_STARTED_MIGRATED: detail = "migrated" case libvirt.DOMAIN_EVENT_STARTED_RESTORED: detail = "restored" case libvirt.DOMAIN_EVENT_STARTED_FROM_SNAPSHOT: detail = "snapshot" case libvirt.DOMAIN_EVENT_STARTED_WAKEUP: detail = "wakeup" default: detail = "unknown" } case libvirt.DOMAIN_EVENT_SUSPENDED: event = "suspended" ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "3") cancel() switch libvirt.DomainEventSuspendedDetailType(e.Detail) { case libvirt.DOMAIN_EVENT_SUSPENDED_PAUSED: detail = "paused" case libvirt.DOMAIN_EVENT_SUSPENDED_MIGRATED: detail = "migrated" case libvirt.DOMAIN_EVENT_SUSPENDED_IOERROR: detail = "I/O error" case libvirt.DOMAIN_EVENT_SUSPENDED_WATCHDOG: detail = "watchdog" case libvirt.DOMAIN_EVENT_SUSPENDED_RESTORED: detail = "restored" case libvirt.DOMAIN_EVENT_SUSPENDED_FROM_SNAPSHOT: detail = "snapshot" case libvirt.DOMAIN_EVENT_SUSPENDED_API_ERROR: detail = "api error" case libvirt.DOMAIN_EVENT_SUSPENDED_POSTCOPY: detail = "postcopy" case libvirt.DOMAIN_EVENT_SUSPENDED_POSTCOPY_FAILED: detail = "postcopy failed" default: detail = "unknown" } case libvirt.DOMAIN_EVENT_RESUMED: event = "resumed" ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "4") cancel() switch libvirt.DomainEventResumedDetailType(e.Detail) { case libvirt.DOMAIN_EVENT_RESUMED_UNPAUSED: detail = "unpaused" case libvirt.DOMAIN_EVENT_RESUMED_MIGRATED: detail = "migrated" case libvirt.DOMAIN_EVENT_RESUMED_FROM_SNAPSHOT: detail = "snapshot" case libvirt.DOMAIN_EVENT_RESUMED_POSTCOPY: detail = "postcopy" case libvirt.DOMAIN_EVENT_RESUMED_POSTCOPY_FAILED: detail = "postcopy failed" default: detail = "unknown" } case libvirt.DOMAIN_EVENT_STOPPED: event = "stopped" ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "5") cancel() switch libvirt.DomainEventStoppedDetailType(e.Detail) { case libvirt.DOMAIN_EVENT_STOPPED_SHUTDOWN: detail = "shutdown" case libvirt.DOMAIN_EVENT_STOPPED_DESTROYED: detail = "destroyed" case libvirt.DOMAIN_EVENT_STOPPED_CRASHED: detail = "crashed" case libvirt.DOMAIN_EVENT_STOPPED_MIGRATED: detail = "migrated" case libvirt.DOMAIN_EVENT_STOPPED_SAVED: detail = "saved" case libvirt.DOMAIN_EVENT_STOPPED_FAILED: detail = "failed" case libvirt.DOMAIN_EVENT_STOPPED_FROM_SNAPSHOT: detail = "snapshot" default: detail = "unknown" } case libvirt.DOMAIN_EVENT_SHUTDOWN: event = "shutdown" ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "6") cancel() switch libvirt.DomainEventShutdownDetailType(e.Detail) { case libvirt.DOMAIN_EVENT_SHUTDOWN_FINISHED: detail = "finished" case libvirt.DOMAIN_EVENT_SHUTDOWN_GUEST: detail = "guest" case libvirt.DOMAIN_EVENT_SHUTDOWN_HOST: detail = "host" default: detail = "unknown" } default: event = "unknown" } // Send event for all clients if e.Event != libvirt.DOMAIN_EVENT_DEFINED|libvirt.DOMAIN_EVENT_UNDEFINED { xmlDesc, err := d.GetXMLDesc(0) if err != nil { log.Fatalln(err) } var desc schema.Domain err = xml.Unmarshal([]byte(xmlDesc), &desc) if err != nil { log.Fatalln(err) } state, _ := json.Marshal(&schema.DomainStateJSON{ CompanyID: desc.Metadata.DeevirtInstance.DeevirtCompanyID, DatacenterID: desc.Metadata.DeevirtInstance.DeevirtDatacenterID, DomainID: domainID, State: int64(e.Event), }) a, _ := amqp.NewAMQP() a.Publisher("vmcenter", "events."+desc.Metadata.DeevirtInstance.DeevirtCompanyID+ "."+desc.Metadata.DeevirtInstance.DeevirtDatacenterID+ "."+domainID, state) a.Close() } fmt.Printf("Domain event=%q detail=%q\n", event, detail) }