compute/pkg/api/raft/worker.go

359 lines
9.0 KiB
Go

package raft
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"sync"
"time"
"go.uber.org/zap"
go_libvirt "libvirt.org/go/libvirt"
"deevirt.fr/compute/pkg/api/libvirt"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/scheduler"
)
type Worker struct {
ctx context.Context
cancel context.CancelFunc
cancelled bool
store *Store
config *config.Config
nodes NodeStore
log *zap.SugaredLogger
}
func NewWorker(r *Store) (*Worker, error) {
config, _ := config.New()
ctx, cancel := context.WithCancel(context.Background())
logger, _ := zap.NewProduction()
s := &Worker{
ctx: ctx,
cancel: cancel,
cancelled: true,
store: r,
config: config,
log: logger.Sugar(),
}
return s, nil
}
func (w *Worker) Start() {
go func() {
for {
select {
case <-w.ctx.Done():
log.Println("🛑 Worker arrêté !")
return
default:
log.Println("🔄 Contrôle périodique en cours...")
// Contrôler la connexion avec libvirt
w.handleLibvirtControl()
// Gérer les alertes du cluster
if err := w.handleClusterAlerts(); err != nil {
log.Printf("Erreur lors du traitement des alertes du cluster : %v", err)
}
// Gérer les alertes des nœuds
if err := w.handleNodeAlerts(); err != nil {
log.Printf("Erreur lors du traitement des alertes des nœuds : %v", err)
}
// Attendre 1 minute avant de recommencer
time.Sleep(1 * time.Minute)
}
}
}()
}
func (w *Worker) Stop() {
if !w.cancelled {
w.cancel()
w.cancelled = true
}
}
func (w *Worker) Migrate(wg *sync.WaitGroup, nodeID string, newNodeID string, domainID string) {
defer wg.Done()
c, err := libvirt.New(w.nodes[nodeID].IpManagement, w.store.conf.LibvirtTLS)
if err != nil {
w.log.Infof("Connexion error to libvirt %v", err.Error())
return
}
defer c.Close()
dom, err := c.LookupDomainByUUIDString(domainID)
if err != nil {
w.log.Infof("Connexion error to libvirt %v", err.Error())
return
}
c_new, err := libvirt.New(w.nodes[newNodeID].IpManagement, w.store.conf.LibvirtTLS)
if err != nil {
w.log.Infof("Connexion error to libvirt %v", err.Error())
return
}
defer c_new.Close()
new_dom, err := dom.Migrate(c_new, go_libvirt.MIGRATE_LIVE|go_libvirt.MIGRATE_PERSIST_DEST|go_libvirt.MIGRATE_UNDEFINE_SOURCE, "", "", 0)
if err != nil {
w.log.Infof("Migration error %v", err.Error())
return
}
newDomConfig, _ := new_dom.GetXMLDesc(go_libvirt.DOMAIN_XML_INACTIVE)
newDomState, _, _ := new_dom.GetState()
new, _ := json.Marshal(DomainStore{
Config: newDomConfig,
State: int(newDomState),
Migrate: false,
})
w.store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", newNodeID, domainID), new)
w.store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domainID))
}
/*
On controle périodiquement l'accessibilité à libvirt, indépendamment du programme moniteur.
Cette vérification assure un double controle pour la HA.
*/
func (w *Worker) handleLibvirtControl() {
var nodes NodeStore
cluster, err := w.store.Get("/etc/libvirt/cluster")
if err != nil {
w.log.Errorf("Erreur lors de la récupération des données de cluster: %v", err)
return
}
// Désérialisation des données du cluster
err = json.Unmarshal(cluster, &nodes)
if err != nil {
w.log.Errorf("Erreur lors de la désérialisation des données de cluster: %v", err)
return
}
for _, conf := range nodes {
// Créer une connexion à libvirt
c, err := libvirt.New(conf.IpManagement, w.store.conf.LibvirtTLS)
if err != nil {
w.log.Warnf("Impossible de créer la connexion libvirt pour %s: %v", conf.IpManagement, err)
conf.Alive = false
continue // Passer à l'élément suivant
}
defer c.Close() // Assurer la fermeture de la connexion à la fin
// Vérifier si le noeud est vivant
alive, err := c.IsAlive()
if err != nil {
w.log.Warnf("Erreur lors de la vérification de la vie de %s: %v", conf.IpManagement, err)
conf.Alive = false
} else {
conf.Alive = alive
}
}
// Mise à jour des nodes dans le store
w.nodes = nodes
nodesUpdate, err := json.Marshal(nodes)
if err != nil {
w.log.Errorf("Erreur lors de la sérialisation des données de cluster: %v", err)
return
}
// Sauvegarder les données mises à jour
err = w.store.Set("/etc/libvirt/cluster", nodesUpdate)
if err != nil {
w.log.Errorf("Erreur lors de la mise à jour du store: %v", err)
}
}
func (w *Worker) handleClusterAlerts() error {
sched, err := scheduler.New()
if err != nil {
return fmt.Errorf("impossible de créer un scheduler : %v", err)
}
alertCluster, err := sched.GetAlertCluster()
if err != nil {
return fmt.Errorf("impossible de récupérer les alertes du cluster : %v", err)
}
for _, alert := range alertCluster {
if alert.Severity == "critical" {
// Ignorer les alertes critiques
continue
}
}
return nil
}
func (w *Worker) handleNodeAlerts() error {
sched, err := scheduler.New()
if err != nil {
return fmt.Errorf("impossible de créer un scheduler : %v", err)
}
alertsNode, err := sched.GetAlertNodes()
if err != nil {
return fmt.Errorf("impossible de récupérer les alertes des nœuds : %v", err)
}
for _, alert := range alertsNode {
switch alert.Event {
case "state":
// Déclencher le HA
w.handleHAExecution(alert.NodeID)
case "cpu":
// Répartir sur un nœud ayant moins de charge CPU
w.handleCPULoadBalance(alert.NodeID)
case "memory":
// Répartir sur un nœud ayant moins de charge mémoire
w.handleMemoryLoadBalance(alert.NodeID)
}
}
return nil
}
func (w *Worker) handleHAExecution(nodeID string) {
if w.nodes[nodeID].Alive {
// L'information n'est pas cohérente, on ne fait rien
w.log.Infof("L'agent moniteur ne répond pas, mais le noeud reste fonctionnel")
return
}
var cluster NodeStore
c, _ := w.store.Get("/etc/libvirt/cluster")
json.Unmarshal(c, &cluster)
listDoms, _ := w.store.Ls(fmt.Sprintf("/etc/libvirt/qemu/%s", nodeID), LsOptions{
Recursive: false,
Data: true,
})
s, err := scheduler.New()
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
}
res, err := s.GetTopNode(4)
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
}
mapSize := len(res)
for domId := range listDoms {
randomINode := rand.Intn(mapSize)
currentDom, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId))
var cDomStore DomainStore
json.Unmarshal(currentDom, &cDomStore)
c, err := libvirt.New(cluster[res[randomINode].NodeID].IpManagement, w.store.conf.LibvirtTLS)
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
continue
}
newDom, err := c.DomainDefineXML(cDomStore.Config)
if err != nil {
w.log.Errorf("Error create config %s on the node %s", domId, res[randomINode].NodeID, domId)
continue
} else {
w.store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", res[randomINode].NodeID, domId), currentDom)
w.store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId))
var dStore DomainStore
data, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", res[randomINode].NodeID, domId))
json.Unmarshal(data, &dStore)
if go_libvirt.DomainState(dStore.State) == go_libvirt.DOMAIN_RUNNING {
err = newDom.Create()
if err != nil {
w.log.Errorf("Error start domain %s on the node %s", domId, res[randomINode].NodeID, domId)
newDom.Undefine()
}
}
}
w.log.Infof("HA of %s to %s for domain %s", nodeID, res[randomINode].NodeID, domId)
}
//_, err = c.DomainCreateXML(cDomStore.Config, go_libvirt.DOMAIN_START_VALIDATE)
}
func (w *Worker) handleCPULoadBalance(nodeID string) {
var wg sync.WaitGroup
w.log.Infof("Répartition de la charge CPU pour le nœud: %s", nodeID)
// On recherche les plus grosses VM mémoires sur le Noeud
s, err := scheduler.New()
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
}
res, err := s.GetTopNodeCPU(4)
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
}
mapSize := len(res)
doms, err := s.GetTopDomainCPUUse(nodeID, 4)
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
}
for _, dom := range doms {
println(dom.DomainID)
randomINode := rand.Intn(mapSize)
wg.Add(1)
go w.Migrate(&wg, dom.NodeID, res[randomINode].NodeID, dom.DomainID)
}
wg.Wait()
}
func (w *Worker) handleMemoryLoadBalance(nodeID string) {
// Implémentation de la répartition de charge mémoire
var wg sync.WaitGroup
w.log.Infof("Répartition de la charge CPU pour le nœud: %s", nodeID)
// On recherche les plus grosses VM mémoires sur le Noeud
s, err := scheduler.New()
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
}
res, err := s.GetTopNodeMemory(4)
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
}
mapSize := len(res)
doms, err := s.GetTopDomainMemoryUse(nodeID, 4)
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
}
for _, dom := range doms {
println(dom.DomainID)
randomINode := rand.Intn(mapSize)
wg.Add(1)
go w.Migrate(&wg, dom.NodeID, res[randomINode].NodeID, dom.DomainID)
}
wg.Wait()
}