compute/pkg/raft/node.go

294 lines
7.4 KiB
Go

package raft
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"os"
"path/filepath"
transport "deevirt.fr/compute/pkg/raft/transport"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"deevirt.fr/compute/pkg/config"
etcd_client "deevirt.fr/compute/pkg/etcd"
"deevirt.fr/compute/pkg/scheduler"
)
type RaftNode struct {
Raft *raft.Raft
NodeID string
StateCh chan raft.Observation
scheduler *scheduler.Scheduler
}
type Peers struct {
Id string
Address string
}
func getTLSCredentials(conf *config.Config) credentials.TransportCredentials {
cert, err := tls.LoadX509KeyPair(conf.Manager.TlsCert, conf.Manager.TlsKey)
if err != nil {
log.Fatalf("Erreur chargement du certificat: %v", err)
}
// Charger la CA (facultatif, pour la vérification des clients)
caCert, err := os.ReadFile(conf.Manager.TlsCert)
if err != nil {
log.Fatalf("Erreur chargement CA: %v", err)
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(caCert)
// Créer les credentials TLS
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientCAs: certPool,
InsecureSkipVerify: true,
})
return creds
}
func New(ctx context.Context, conf *config.Config, port int) (*raft.Raft, *transport.Manager, error) {
// Création du répertoire
baseDir := filepath.Join("/var/lib/deevirt/mgr/", conf.NodeID)
err := os.MkdirAll(baseDir, 0740)
if err != nil {
return nil, nil, err
}
// Récupération des Noeuds ID
etcd, err := etcd_client.New(conf.EtcdURI)
if err != nil {
return nil, nil, err
}
defer etcd.Close()
peers := []raft.Server{}
for key, value := range etcd_client.GetNodes(etcd, conf.ClusterID) {
var p string
for _, peer := range conf.Manager.Peers {
if peer == value.IpManagement {
p = peer
}
}
if p != "" {
peers = append(peers, raft.Server{
ID: raft.ServerID(key),
Address: raft.ServerAddress(fmt.Sprintf("%s:%d", p, port)),
})
}
}
c := raft.DefaultConfig()
c.LocalID = raft.ServerID(conf.NodeID)
ldb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "logs.dat"))
if err != nil {
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
}
/*sdb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "stable.dat"))
if err != nil {
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err)
}*/
fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr)
if err != nil {
return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
}
dialOption := []grpc.DialOption{}
if conf.Manager.TlsKey != "" {
dialOption = append(dialOption, grpc.WithTransportCredentials(getTLSCredentials(conf)))
}
tm := transport.New(raft.ServerAddress(fmt.Sprintf("%s:%d", conf.AddressPrivate, port)), dialOption)
r, err := raft.NewRaft(c, nil, ldb, ldb, fss, tm.Transport())
if err != nil {
return nil, nil, fmt.Errorf("raft.NewRaft: %v", err)
}
s, err := scheduler.New()
if err != nil {
return nil, nil, fmt.Errorf("scheduler: %v", err)
}
// Observer pour surveiller les changements d'état
stateCh := make(chan raft.Observation, 1) // Canal de type raft.Observation
r.RegisterObserver(raft.NewObserver(stateCh, true, nil))
node := &RaftNode{
Raft: r,
NodeID: conf.NodeID,
StateCh: stateCh,
scheduler: s,
}
go node.watchStateChanges()
hasState, _ := checkIfStateExists(ldb)
if conf.Manager.Peers[0] == conf.AddressPrivate && !hasState {
println("Démarrage du bootstrap ! ")
cfg := raft.Configuration{
Servers: peers,
}
f := r.BootstrapCluster(cfg)
if err := f.Error(); err != nil {
return nil, nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
}
}
return r, tm, nil
}
// Vérifie si l'état Raft existe déjà
func checkIfStateExists(logStore *raftboltdb.BoltStore) (bool, error) {
// Vérifier les logs Raft
firstIndex, err := logStore.FirstIndex()
if err != nil {
return false, err
}
if firstIndex > 0 {
return true, nil
}
return false, nil
}
// Fonction pour surveiller et afficher les changements d'état
func (n *RaftNode) watchStateChanges() {
for obs := range n.StateCh {
switch evt := obs.Data.(type) {
case raft.RaftState:
if evt == raft.Leader {
go n.scheduler.Start()
log.Println("[ÉVÉNEMENT] Changement d'état Raft :", evt)
} else {
n.scheduler.Stop()
}
log.Println("[ÉVÉNEMENT] Changement d'état Raft :", evt)
case raft.LeaderObservation:
log.Println("[ÉVÉNEMENT] Le leader est", evt.LeaderID)
case raft.PeerObservation:
if n.Raft.State() == raft.Leader {
peerID := evt.Peer.ID
peerAddr := evt.Peer.Address
log.Println("[NOUVEAU NŒUD] Détection de", peerID, "à", peerAddr)
log.Println("[ACTION] Ajout automatique en tant que voter...")
future := n.Raft.AddVoter(peerID, peerAddr, 0, 0)
if err := future.Error(); err != nil {
log.Println("[ERREUR] Impossible d'ajouter", peerID, ":", err)
} else {
log.Println("[SUCCÈS] Voter ajouté :", peerID)
}
}
case raft.FailedHeartbeatObservation:
log.Println("[ÉVÉNEMENT] Perte de connexion avec un nœud :", evt.PeerID)
default:
log.Println("[ÉVÉNEMENT] Autre événement :", evt)
}
}
}
/*func New(ctx context.Context, myID, myAddress string) (*raft.Raft, *transport.Manager, error) {
// Création du répertoire
baseDir := filepath.Join("/var/lib/deevirt/mgr/", myID)
err := os.MkdirAll(baseDir, 0740)
if err != nil {
return nil, nil, err
}
println(myAddress)
peers := []raft.Server{
{
ID: raft.ServerID("nodeA"),
Address: raft.ServerAddress("172.16.9.161:4410"),
},
{
ID: raft.ServerID("nodeB"),
Address: raft.ServerAddress("172.16.9.161:4411"),
},
}
c := raft.DefaultConfig()
c.LocalID = raft.ServerID(myID)
ldb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "logs.dat"))
if err != nil {
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
}
sdb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "stable.dat"))
if err != nil {
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err)
}
fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr)
if err != nil {
return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
}
tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{grpc.WithTransportCredentials(getTLSCredentials())})
r, err := raft.NewRaft(c, nil, ldb, sdb, fss, tm.Transport())
if err != nil {
return nil, nil, fmt.Errorf("raft.NewRaft: %v", err)
}
s, err := scheduler.New()
if err != nil {
return nil, nil, fmt.Errorf("scheduler: %v", err)
}
// Observer pour surveiller les changements d'état
stateCh := make(chan raft.Observation, 1) // Canal de type raft.Observation
r.RegisterObserver(raft.NewObserver(stateCh, true, nil))
node := &RaftNode{
Raft: r,
NodeID: myID,
StateCh: stateCh,
scheduler: s,
}
go node.watchStateChanges()
hasState, _ := checkIfStateExists(ldb)
if myAddress == "172.16.9.161:4410" && !hasState {
println("Démarrage du bootstrap ! ")
cfg := raft.Configuration{
Servers: peers,
}
f := r.BootstrapCluster(cfg)
if err := f.Error(); err != nil {
return nil, nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
}
}
return r, tm, nil
}*/