package main import ( "context" "encoding/base64" "encoding/json" "fmt" "log" "time" raft_hashicorp "github.com/hashicorp/raft" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/protobuf/types/known/timestamppb" "libvirt.org/go/libvirt" "deevirt.fr/compute/cmd/mgr/worker" "deevirt.fr/compute/pkg/config" etcd_client "deevirt.fr/compute/pkg/etcd" "deevirt.fr/compute/pkg/raft" "deevirt.fr/compute/pkg/schema" //"deevirt.fr/compute/pkg/scheduler" ) type RaftNode struct { NodeID string Conf *config.Config Store *raft.Store StateCh chan raft_hashicorp.Observation isReady bool } func (n *RaftNode) init() { println("bootstrap :") nodes := make(schema.Node) // Récupération des Noeuds ID etcd, _ := etcd_client.New(n.Conf.EtcdURI) defer etcd.Close() // On supprime toutes les informations dans etcd ctx := context.Background() etcd.Delete(ctx, fmt.Sprintf("/deevirt/cluster/%s/", n.Conf.ClusterID), clientv3.WithPrefix()) for key, value := range etcd_client.GetNodes(etcd, n.Conf.ClusterID) { var libvirt_uri string nodes[key] = &schema.NodeConfig{ IpManagement: value.IpManagement, } if n.Conf.LibvirtTLS { libvirt_uri = fmt.Sprintf("qemu+tls://%s/system", value.IpManagement) } else { libvirt_uri = fmt.Sprintf("qemu+tcp://%s/system", value.IpManagement) } c, err := libvirt.NewConnect(libvirt_uri) if err != nil { log.Fatalf("Erreur %v", err) } defer c.Close() // On récupère la liste des domaines. getDomains, _ := c.ListAllDomains(libvirt.CONNECT_LIST_DOMAINS_PERSISTENT) for _, domain := range getDomains { conf, _ := domain.GetXMLDesc(libvirt.DOMAIN_XML_INACTIVE) uuid, _ := domain.GetUUIDString() state, _, _ := domain.GetState() // On enregistre la configuration domainStore, _ := json.Marshal(schema.Domain{ Type: "qemu", Config: base64.StdEncoding.EncodeToString([]byte(conf)), }) n.Store.Set(fmt.Sprintf("/domain/%s", uuid), domainStore) // On enregistre le noeud domainStateStore, _ := json.Marshal(schema.DomainNode{ NodeId: key, }) n.Store.Set(fmt.Sprintf("/domain/%s/node", uuid), domainStateStore) // On associe au noeud currentTime := time.Now() newTime := currentTime.Add(3600 * time.Second) // On ajoute 3600 secondes pour permettre au moniteur de se synchroniser DomainLibvirtStore, _ := json.Marshal(schema.DomainAttachment{ State: int(state), Expiry: timestamppb.New(newTime), }) n.Store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", key, uuid), DomainLibvirtStore) } } // On enregistre la configuration des noeuds jNodes, _ := json.Marshal(nodes) n.Store.Set("/nodes", jNodes) } // Fonction pour surveiller et afficher les changements d'état func (n *RaftNode) WatchStateChanges() { work, _ := worker.NewWorker(n.Store) for obs := range n.StateCh { switch evt := obs.Data.(type) { case raft_hashicorp.RaftState: log.Println("[ÉVÉNEMENT] Changement d'état Raft :", evt) if evt == raft_hashicorp.Leader { println(n.Store.Raft.LastIndex()) if n.Store.Raft.LastIndex() == 1 { n.init() } // On attend que les logs soient synchronisés ! barrier := n.Store.Raft.Barrier(10 * time.Second) if err := barrier.Error(); err != nil { return } n.isReady = true // On attend 30 secondes avant de démarrer le worker time.Sleep(30 * time.Second) log.Println("Démarrage du worker !") work.Start() } else { work.Stop() } case raft_hashicorp.LeaderObservation: log.Println("[ÉVÉNEMENT] Le leader est", evt.LeaderID) case raft_hashicorp.PeerObservation: if n.Store.Raft.State() == raft_hashicorp.Leader { peerID := evt.Peer.ID peerAddr := evt.Peer.Address log.Println("[NOUVEAU NŒUD] Détection de", peerID, "à", peerAddr) log.Println("[ACTION] Ajout automatique en tant que voter...") future := n.Store.Raft.AddVoter(peerID, peerAddr, 0, 0) if err := future.Error(); err != nil { log.Println("[ERREUR] Impossible d'ajouter", peerID, ":", err) } else { log.Println("[SUCCÈS] Voter ajouté :", peerID) } } case raft_hashicorp.FailedHeartbeatObservation: log.Println("[ÉVÉNEMENT] Perte de connexion avec un nœud :", evt.PeerID) default: log.Println("[ÉVÉNEMENT] Autre événement :", evt) } } }