package worker import ( "context" "encoding/base64" "encoding/json" "fmt" "log" "math/rand" "sync" "time" "go.uber.org/zap" "deevirt.fr/compute/pkg/config" "deevirt.fr/compute/pkg/libvirt" scheduler "deevirt.fr/compute/pkg/metrics" "deevirt.fr/compute/pkg/raft" "deevirt.fr/compute/pkg/schema" ) type Worker struct { ctx context.Context cancel context.CancelFunc cancelled bool store *raft.Store config *config.Config nodes schema.Node log *zap.SugaredLogger } func NewWorker(r *raft.Store) (*Worker, error) { config, _ := config.New() ctx, cancel := context.WithCancel(context.Background()) logger, _ := zap.NewProduction() s := &Worker{ ctx: ctx, cancel: cancel, cancelled: true, store: r, config: config, log: logger.Sugar(), } return s, nil } func (w *Worker) Start() { go func() { for { select { case <-w.ctx.Done(): log.Println("🛑 Worker arrĂŞtĂ© !") return default: log.Println("🔄 ContrĂ´le pĂ©riodique en cours...") // GĂ©rer les alertes du cluster if err := w.handleClusterAlerts(); err != nil { log.Printf("Erreur lors du traitement des alertes du cluster : %v", err) } // GĂ©rer les alertes des nĹ“uds if err := w.handleNodeAlerts(); err != nil { log.Printf("Erreur lors du traitement des alertes des nĹ“uds : %v", err) } // Attendre 1 minute avant de recommencer time.Sleep(1 * time.Minute) } } }() } func (w *Worker) Stop() { if !w.cancelled { w.cancel() w.cancelled = true } } func (w *Worker) Migrate(wg *sync.WaitGroup, nodeID string, newNodeID string, domainID string) { defer wg.Done() /*c, err := libvirt.New(w.nodes[nodeID].IpManagement, w.store.conf.LibvirtTLS) if err != nil { w.log.Infof("Connexion error to libvirt %v", err.Error()) return } defer c.Close() dom, err := c.LookupDomainByUUIDString(domainID) if err != nil { w.log.Infof("Connexion error to libvirt %v", err.Error()) return } c_new, err := libvirt.New(w.nodes[newNodeID].IpManagement, w.store.conf.LibvirtTLS) if err != nil { w.log.Infof("Connexion error to libvirt %v", err.Error()) return } defer c_new.Close() new_dom, err := dom.Migrate(c_new, go_libvirt.MIGRATE_LIVE|go_libvirt.MIGRATE_PERSIST_DEST|go_libvirt.MIGRATE_UNDEFINE_SOURCE, "", "", 0) if err != nil { w.log.Infof("Migration error %v", err.Error()) return } newDomConfig, _ := new_dom.GetXMLDesc(go_libvirt.DOMAIN_XML_INACTIVE) newDomState, _, _ := new_dom.GetState() new, _ := json.Marshal(deevirt_schema.DomainStore{ Config: newDomConfig, State: int(newDomState), Migrate: false, }) w.store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", newNodeID, domainID), new) w.store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domainID))*/ } func (w *Worker) handleClusterAlerts() error { sched, err := scheduler.New() if err != nil { return fmt.Errorf("impossible de crĂ©er un scheduler : %v", err) } alertCluster, err := sched.GetAlertCluster() if err != nil { return fmt.Errorf("impossible de rĂ©cupĂ©rer les alertes du cluster : %v", err) } for _, alert := range alertCluster { if alert.Severity == "critical" { // Ignorer les alertes critiques continue } } return nil } func (w *Worker) handleNodeAlerts() error { sched, err := scheduler.New() if err != nil { return fmt.Errorf("impossible de crĂ©er un scheduler : %v", err) } alertsNode, err := sched.GetAlertNodes() if err != nil { return fmt.Errorf("impossible de rĂ©cupĂ©rer les alertes des nĹ“uds : %v", err) } for _, alert := range alertsNode { switch alert.Event { case "state": // DĂ©clencher le HA w.handleHAExecution(alert.NodeID) case "cpu": // RĂ©partir sur un nĹ“ud ayant moins de charge CPU w.handleCPULoadBalance(alert.NodeID) case "memory": // RĂ©partir sur un nĹ“ud ayant moins de charge mĂ©moire w.handleMemoryLoadBalance(alert.NodeID) } } return nil } func (w *Worker) handleHAExecution(nodeID string) { var cluster schema.Node c, _ := w.store.Get("/nodes") json.Unmarshal(c, &cluster) // On controle l'accessibilitĂ© Ă  libvirt _, err := libvirt.New(cluster[nodeID].IpManagement, w.config.LibvirtTLS) if err == nil { w.log.Warnf("Le noeud %s accède toujours Ă  libvirt, on avorte", nodeID) return } s, err := scheduler.New() if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) return } res, err := s.GetTopNode(4) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) return } mapSize := len(res) listDoms, _ := w.store.Ls(fmt.Sprintf("/etc/libvirt/qemu/%s", nodeID), raft.LsOptions{ Recursive: false, Data: true, }) for domId := range listDoms { randomINode := rand.Intn(mapSize) // On rĂ©cupère la configuration currentDom, _ := w.store.Get(fmt.Sprintf("/domain/%s", domId)) println(currentDom) dom := schema.Domain{} json.Unmarshal(currentDom, &dom) currentDomAttachmentJson, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId)) currentDomAttachment := schema.DomainAttachment{} json.Unmarshal(currentDomAttachmentJson, ¤tDomAttachment) // On se connecte au nouveau noeud newConn, err := libvirt.New(cluster[res[randomINode].NodeID].IpManagement, w.config.LibvirtTLS) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) continue } config, _ := base64.StdEncoding.DecodeString(dom.Config) newDom, err := newConn.DomainDefineXML(string(config)) if err != nil { w.log.Errorf("Oups, impossible de dĂ©finir la configuration sur le noeud %s, %v", randomINode, err) return } if currentDomAttachment.State == 1 { err = newDom.Create() if err != nil { w.log.Errorf("Error start domain %s on the node %s", domId, res[randomINode].NodeID, domId) return } } w.log.Infof("HA of %s to %s for domain %s", nodeID, res[randomINode].NodeID, domId) } } func (w *Worker) handleCPULoadBalance(nodeID string) { var wg sync.WaitGroup w.log.Infof("RĂ©partition de la charge CPU pour le nĹ“ud: %s", nodeID) // On recherche les plus grosses VM mĂ©moires sur le Noeud s, err := scheduler.New() if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } res, err := s.GetTopNodeCPU(4) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } mapSize := len(res) doms, err := s.GetTopDomainCPUUse(nodeID, 4) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } for _, dom := range doms { println(dom.DomainID) randomINode := rand.Intn(mapSize) wg.Add(1) go w.Migrate(&wg, dom.NodeID, res[randomINode].NodeID, dom.DomainID) } wg.Wait() } func (w *Worker) handleMemoryLoadBalance(nodeID string) { // ImplĂ©mentation de la rĂ©partition de charge mĂ©moire var wg sync.WaitGroup w.log.Infof("RĂ©partition de la charge CPU pour le nĹ“ud: %s", nodeID) // On recherche les plus grosses VM mĂ©moires sur le Noeud s, err := scheduler.New() if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } res, err := s.GetTopNodeMemory(4) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } mapSize := len(res) doms, err := s.GetTopDomainMemoryUse(nodeID, 4) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } for _, dom := range doms { println(dom.DomainID) randomINode := rand.Intn(mapSize) wg.Add(1) go w.Migrate(&wg, dom.NodeID, res[randomINode].NodeID, dom.DomainID) } wg.Wait() }