compute/cmd/mgr/raft.go

155 lines
4.2 KiB
Go

package main
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"time"
raft_hashicorp "github.com/hashicorp/raft"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/types/known/timestamppb"
"libvirt.org/go/libvirt"
"deevirt.fr/compute/cmd/mgr/worker"
"deevirt.fr/compute/pkg/config"
etcd_client "deevirt.fr/compute/pkg/etcd"
"deevirt.fr/compute/pkg/raft"
"deevirt.fr/compute/pkg/schema"
//"deevirt.fr/compute/pkg/scheduler"
)
type RaftNode struct {
NodeID string
Conf *config.Config
Store *raft.Store
StateCh chan raft_hashicorp.Observation
isReady bool
}
func (n *RaftNode) init() {
println("bootstrap :")
nodes := make(schema.Node)
// Récupération des Noeuds ID
etcd, _ := etcd_client.New(n.Conf.EtcdURI)
defer etcd.Close()
// On supprime toutes les informations dans etcd
ctx := context.Background()
etcd.Delete(ctx, fmt.Sprintf("/deevirt/cluster/%s/", n.Conf.ClusterID), clientv3.WithPrefix())
for key, value := range etcd_client.GetNodes(etcd, n.Conf.ClusterID) {
var libvirt_uri string
nodes[key] = &schema.NodeConfig{
IpManagement: value.IpManagement,
}
if n.Conf.LibvirtTLS {
libvirt_uri = fmt.Sprintf("qemu+tls://%s/system", value.IpManagement)
} else {
libvirt_uri = fmt.Sprintf("qemu+tcp://%s/system", value.IpManagement)
}
c, err := libvirt.NewConnect(libvirt_uri)
if err != nil {
log.Fatalf("Erreur %v", err)
}
defer c.Close()
// On récupère la liste des domaines.
getDomains, _ := c.ListAllDomains(libvirt.CONNECT_LIST_DOMAINS_PERSISTENT)
for _, domain := range getDomains {
conf, _ := domain.GetXMLDesc(libvirt.DOMAIN_XML_INACTIVE)
uuid, _ := domain.GetUUIDString()
state, _, _ := domain.GetState()
// On enregistre la configuration
domainStore, _ := json.Marshal(schema.Domain{
Type: "qemu",
Config: base64.StdEncoding.EncodeToString([]byte(conf)),
})
n.Store.Set(fmt.Sprintf("/domain/%s", uuid), domainStore)
// On enregistre le noeud
domainStateStore, _ := json.Marshal(schema.DomainNode{
NodeId: key,
})
n.Store.Set(fmt.Sprintf("/domain/%s/node", uuid), domainStateStore)
// On associe au noeud
currentTime := time.Now()
newTime := currentTime.Add(3600 * time.Second) // On ajoute 3600 secondes pour permettre au moniteur de se synchroniser
DomainLibvirtStore, _ := json.Marshal(schema.DomainAttachment{
State: int(state),
Expiry: timestamppb.New(newTime),
})
n.Store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", key, uuid), DomainLibvirtStore)
}
}
// On enregistre la configuration des noeuds
jNodes, _ := json.Marshal(nodes)
n.Store.Set("/nodes", jNodes)
}
// Fonction pour surveiller et afficher les changements d'état
func (n *RaftNode) WatchStateChanges() {
work, _ := worker.NewWorker(n.Store)
for obs := range n.StateCh {
switch evt := obs.Data.(type) {
case raft_hashicorp.RaftState:
log.Println("[ÉVÉNEMENT] Changement d'état Raft :", evt)
if evt == raft_hashicorp.Leader {
println(n.Store.Raft.LastIndex())
if n.Store.Raft.LastIndex() == 1 {
n.init()
}
// On attend que les logs soient synchronisés !
barrier := n.Store.Raft.Barrier(10 * time.Second)
if err := barrier.Error(); err != nil {
return
}
n.isReady = true
// On attend 30 secondes avant de démarrer le worker
time.Sleep(30 * time.Second)
log.Println("Démarrage du worker !")
work.Start()
} else {
work.Stop()
}
case raft_hashicorp.LeaderObservation:
log.Println("[ÉVÉNEMENT] Le leader est", evt.LeaderID)
case raft_hashicorp.PeerObservation:
if n.Store.Raft.State() == raft_hashicorp.Leader {
peerID := evt.Peer.ID
peerAddr := evt.Peer.Address
log.Println("[NOUVEAU NŒUD] Détection de", peerID, "à", peerAddr)
log.Println("[ACTION] Ajout automatique en tant que voter...")
future := n.Store.Raft.AddVoter(peerID, peerAddr, 0, 0)
if err := future.Error(); err != nil {
log.Println("[ERREUR] Impossible d'ajouter", peerID, ":", err)
} else {
log.Println("[SUCCÈS] Voter ajouté :", peerID)
}
}
case raft_hashicorp.FailedHeartbeatObservation:
log.Println("[ÉVÉNEMENT] Perte de connexion avec un nœud :", evt.PeerID)
default:
log.Println("[ÉVÉNEMENT] Autre événement :", evt)
}
}
}