package raft import ( "context" "fmt" "log" "os" "path/filepath" transport "github.com/Jille/raft-grpc-transport" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" "google.golang.org/grpc" "deevirt.fr/compute/pkg/config" etcd_client "deevirt.fr/compute/pkg/etcd" "deevirt.fr/compute/pkg/scheduler" ) type RaftNode struct { Raft *raft.Raft NodeID string StateCh chan raft.Observation scheduler *scheduler.Scheduler } type Peers struct { Id string Address string } func New(ctx context.Context, port int) (*raft.Raft, *transport.Manager, error) { // Récupération de la configuration deevirt conf, err := config.New() if err != nil { return nil, nil, err } // Création du répertoire baseDir := filepath.Join("/var/lib/deevirt/mgr/", conf.NodeID) err = os.MkdirAll(baseDir, 0740) if err != nil { return nil, nil, err } // Récupération des Noeuds ID etcd, err := etcd_client.New(conf.EtcdURI) if err != nil { return nil, nil, err } defer etcd.Close() peers := []raft.Server{} for key, value := range etcd_client.GetNodes(etcd, conf.ClusterID) { var p string for _, peer := range conf.Manager.Peers { if peer == value.IpManagement { p = peer } } if p != "" { peers = append(peers, raft.Server{ ID: raft.ServerID(key), Address: raft.ServerAddress(fmt.Sprintf("%s:%d", p, port)), }) } } c := raft.DefaultConfig() c.LocalID = raft.ServerID(conf.NodeID) ldb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "logs.dat")) if err != nil { return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err) } sdb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "stable.dat")) if err != nil { return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err) } fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr) if err != nil { return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err) } tm := transport.New(raft.ServerAddress(fmt.Sprintf("%s:%d", conf.AddressPrivate, port)), []grpc.DialOption{grpc.WithInsecure()}) r, err := raft.NewRaft(c, nil, ldb, sdb, fss, tm.Transport()) if err != nil { return nil, nil, fmt.Errorf("raft.NewRaft: %v", err) } s, err := scheduler.New() if err != nil { return nil, nil, fmt.Errorf("scheduler: %v", err) } // Observer pour surveiller les changements d'état stateCh := make(chan raft.Observation, 1) // Canal de type raft.Observation r.RegisterObserver(raft.NewObserver(stateCh, true, nil)) node := &RaftNode{ Raft: r, NodeID: conf.NodeID, StateCh: stateCh, scheduler: s, } go node.watchStateChanges() hasState, _ := checkIfStateExists(ldb) if conf.Manager.Peers[0] == conf.AddressPrivate && !hasState { println("Démarrage du bootstrap ! ") cfg := raft.Configuration{ Servers: peers, } f := r.BootstrapCluster(cfg) if err := f.Error(); err != nil { return nil, nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) } } return r, tm, nil } // Vérifie si l'état Raft existe déjà func checkIfStateExists(logStore *raftboltdb.BoltStore) (bool, error) { // Vérifier les logs Raft firstIndex, err := logStore.FirstIndex() if err != nil { return false, err } if firstIndex > 0 { return true, nil } return false, nil } // Fonction pour surveiller et afficher les changements d'état func (n *RaftNode) watchStateChanges() { for obs := range n.StateCh { switch evt := obs.Data.(type) { case raft.RaftState: if evt == raft.Leader { go n.scheduler.Start() log.Println("[ÉVÉNEMENT] Changement d'état Raft :", evt) } else { n.scheduler.Stop() } log.Println("[ÉVÉNEMENT] Changement d'état Raft :", evt) case raft.LeaderObservation: log.Println("[ÉVÉNEMENT] Le leader est", evt.LeaderID) case raft.PeerObservation: if n.Raft.State() == raft.Leader { peerID := evt.Peer.ID peerAddr := evt.Peer.Address log.Println("[NOUVEAU NŒUD] Détection de", peerID, "à", peerAddr) log.Println("[ACTION] Ajout automatique en tant que voter...") future := n.Raft.AddVoter(peerID, peerAddr, 0, 0) if err := future.Error(); err != nil { log.Println("[ERREUR] Impossible d'ajouter", peerID, ":", err) } else { log.Println("[SUCCÈS] Voter ajouté :", peerID) } } case raft.FailedHeartbeatObservation: log.Println("[ÉVÉNEMENT] Perte de connexion avec un nœud :", evt.PeerID) default: log.Println("[ÉVÉNEMENT] Autre événement :", evt) } } }