package raft import ( "context" "encoding/json" "fmt" "log" "math/rand" "sync" "time" "go.uber.org/zap" "deevirt.fr/compute/pkg/api/libvirt" "deevirt.fr/compute/pkg/config" "deevirt.fr/compute/pkg/scheduler" deevirt_schema "deevirt.fr/compute/pkg/schema/deevirt" ) type Worker struct { ctx context.Context cancel context.CancelFunc cancelled bool store *Store config *config.Config nodes deevirt_schema.NodeStore log *zap.SugaredLogger } func NewWorker(r *Store) (*Worker, error) { config, _ := config.New() ctx, cancel := context.WithCancel(context.Background()) logger, _ := zap.NewProduction() s := &Worker{ ctx: ctx, cancel: cancel, cancelled: true, store: r, config: config, log: logger.Sugar(), } return s, nil } func (w *Worker) Start() { go func() { for { select { case <-w.ctx.Done(): log.Println("🛑 Worker arrêté !") return default: log.Println("🔄 Contrôle périodique en cours...") // Contrôler la connexion avec libvirt w.handleLibvirtControl() // Gérer les alertes du cluster if err := w.handleClusterAlerts(); err != nil { log.Printf("Erreur lors du traitement des alertes du cluster : %v", err) } // Gérer les alertes des nœuds if err := w.handleNodeAlerts(); err != nil { log.Printf("Erreur lors du traitement des alertes des nœuds : %v", err) } // Attendre 1 minute avant de recommencer time.Sleep(1 * time.Minute) } } }() } func (w *Worker) Stop() { if !w.cancelled { w.cancel() w.cancelled = true } } func (w *Worker) Migrate(wg *sync.WaitGroup, nodeID string, newNodeID string, domainID string) { defer wg.Done() /*c, err := libvirt.New(w.nodes[nodeID].IpManagement, w.store.conf.LibvirtTLS) if err != nil { w.log.Infof("Connexion error to libvirt %v", err.Error()) return } defer c.Close() dom, err := c.LookupDomainByUUIDString(domainID) if err != nil { w.log.Infof("Connexion error to libvirt %v", err.Error()) return } c_new, err := libvirt.New(w.nodes[newNodeID].IpManagement, w.store.conf.LibvirtTLS) if err != nil { w.log.Infof("Connexion error to libvirt %v", err.Error()) return } defer c_new.Close() new_dom, err := dom.Migrate(c_new, go_libvirt.MIGRATE_LIVE|go_libvirt.MIGRATE_PERSIST_DEST|go_libvirt.MIGRATE_UNDEFINE_SOURCE, "", "", 0) if err != nil { w.log.Infof("Migration error %v", err.Error()) return } newDomConfig, _ := new_dom.GetXMLDesc(go_libvirt.DOMAIN_XML_INACTIVE) newDomState, _, _ := new_dom.GetState() new, _ := json.Marshal(deevirt_schema.DomainStore{ Config: newDomConfig, State: int(newDomState), Migrate: false, }) w.store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", newNodeID, domainID), new) w.store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domainID))*/ } /* On controle périodiquement l'accessibilité à libvirt, indépendamment du programme moniteur. Cette vérification assure un double controle pour la HA. */ func (w *Worker) handleLibvirtControl() { var nodes deevirt_schema.NodeStore cluster, err := w.store.Get("/etc/libvirt/cluster") if err != nil { w.log.Errorf("Erreur lors de la récupération des données de cluster: %v", err) return } // Désérialisation des données du cluster err = json.Unmarshal(cluster, &nodes) if err != nil { w.log.Errorf("Erreur lors de la désérialisation des données de cluster: %v", err) return } for _, conf := range nodes { // Créer une connexion à libvirt c, err := libvirt.New(conf.IpManagement, w.store.conf.LibvirtTLS) if err != nil { w.log.Warnf("Impossible de créer la connexion libvirt pour %s: %v", conf.IpManagement, err) //conf.Alive = false continue // Passer à l'élément suivant } defer c.Close() // Assurer la fermeture de la connexion à la fin // Vérifier si le noeud est vivant //alive, err := c.IsAlive() if err != nil { w.log.Warnf("Erreur lors de la vérification de la vie de %s: %v", conf.IpManagement, err) //conf.Alive = false } else { //conf.Alive = alive } } // Mise à jour des nodes dans le store w.nodes = nodes nodesUpdate, err := json.Marshal(nodes) if err != nil { w.log.Errorf("Erreur lors de la sérialisation des données de cluster: %v", err) return } // Sauvegarder les données mises à jour err = w.store.Set("/etc/libvirt/cluster", nodesUpdate) if err != nil { w.log.Errorf("Erreur lors de la mise à jour du store: %v", err) } } func (w *Worker) handleClusterAlerts() error { sched, err := scheduler.New() if err != nil { return fmt.Errorf("impossible de créer un scheduler : %v", err) } alertCluster, err := sched.GetAlertCluster() if err != nil { return fmt.Errorf("impossible de récupérer les alertes du cluster : %v", err) } for _, alert := range alertCluster { if alert.Severity == "critical" { // Ignorer les alertes critiques continue } } return nil } func (w *Worker) handleNodeAlerts() error { sched, err := scheduler.New() if err != nil { return fmt.Errorf("impossible de créer un scheduler : %v", err) } alertsNode, err := sched.GetAlertNodes() if err != nil { return fmt.Errorf("impossible de récupérer les alertes des nœuds : %v", err) } for _, alert := range alertsNode { switch alert.Event { case "state": // Déclencher le HA w.handleHAExecution(alert.NodeID) case "cpu": // Répartir sur un nœud ayant moins de charge CPU w.handleCPULoadBalance(alert.NodeID) case "memory": // Répartir sur un nœud ayant moins de charge mémoire w.handleMemoryLoadBalance(alert.NodeID) } } return nil } func (w *Worker) handleHAExecution(nodeID string) { /*if w.nodes[nodeID].Alive { // L'information n'est pas cohérente, on ne fait rien w.log.Infof("L'agent moniteur ne répond pas, mais le noeud reste fonctionnel") return } var cluster NodeStore c, _ := w.store.Get("/etc/libvirt/cluster") json.Unmarshal(c, &cluster) listDoms, _ := w.store.Ls(fmt.Sprintf("/etc/libvirt/qemu/%s", nodeID), LsOptions{ Recursive: false, Data: true, }) s, err := scheduler.New() if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } res, err := s.GetTopNode(4) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } mapSize := len(res) for domId := range listDoms { randomINode := rand.Intn(mapSize) currentDom, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId)) var cDomStore DomainStore json.Unmarshal(currentDom, &cDomStore) c, err := libvirt.New(cluster[res[randomINode].NodeID].IpManagement, w.store.conf.LibvirtTLS) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) continue } newDom, err := c.DomainDefineXML(cDomStore.Config) if err != nil { w.log.Errorf("Error create config %s on the node %s", domId, res[randomINode].NodeID, domId) continue } else { w.store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", res[randomINode].NodeID, domId), currentDom) w.store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId)) var dStore DomainStore data, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", res[randomINode].NodeID, domId)) json.Unmarshal(data, &dStore) if go_libvirt.DomainState(dStore.State) == go_libvirt.DOMAIN_RUNNING { err = newDom.Create() if err != nil { w.log.Errorf("Error start domain %s on the node %s", domId, res[randomINode].NodeID, domId) newDom.Undefine() } } } w.log.Infof("HA of %s to %s for domain %s", nodeID, res[randomINode].NodeID, domId) }*/ //_, err = c.DomainCreateXML(cDomStore.Config, go_libvirt.DOMAIN_START_VALIDATE) } func (w *Worker) handleCPULoadBalance(nodeID string) { var wg sync.WaitGroup w.log.Infof("Répartition de la charge CPU pour le nœud: %s", nodeID) // On recherche les plus grosses VM mémoires sur le Noeud s, err := scheduler.New() if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } res, err := s.GetTopNodeCPU(4) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } mapSize := len(res) doms, err := s.GetTopDomainCPUUse(nodeID, 4) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } for _, dom := range doms { println(dom.DomainID) randomINode := rand.Intn(mapSize) wg.Add(1) go w.Migrate(&wg, dom.NodeID, res[randomINode].NodeID, dom.DomainID) } wg.Wait() } func (w *Worker) handleMemoryLoadBalance(nodeID string) { // Implémentation de la répartition de charge mémoire var wg sync.WaitGroup w.log.Infof("Répartition de la charge CPU pour le nœud: %s", nodeID) // On recherche les plus grosses VM mémoires sur le Noeud s, err := scheduler.New() if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } res, err := s.GetTopNodeMemory(4) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } mapSize := len(res) doms, err := s.GetTopDomainMemoryUse(nodeID, 4) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) } for _, dom := range doms { println(dom.DomainID) randomINode := rand.Intn(mapSize) wg.Add(1) go w.Migrate(&wg, dom.NodeID, res[randomINode].NodeID, dom.DomainID) } wg.Wait() }