From ceaa0d592ee28be54f3905437b485d1148ad2799 Mon Sep 17 00:00:00 2001 From: Mickael BOURNEUF Date: Tue, 25 Feb 2025 19:17:20 +0100 Subject: [PATCH] =?UTF-8?q?Impl=C3=A9mentation=20de=20la=20HA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/monitor/events/qemu.go | 17 +-- pkg/api/libvirt/libvirt.go | 18 +++ pkg/api/raft/node.go | 11 +- pkg/api/raft/scheduler.go | 144 --------------------- pkg/api/raft/schema.go | 13 +- pkg/api/raft/worker.go | 259 +++++++++++++++++++++++++++++++++++++ pkg/scheduler/ha.go | 21 ++- pkg/scheduler/scheduler.go | 13 +- 8 files changed, 310 insertions(+), 186 deletions(-) create mode 100644 pkg/api/libvirt/libvirt.go delete mode 100644 pkg/api/raft/scheduler.go create mode 100644 pkg/api/raft/worker.go diff --git a/cmd/monitor/events/qemu.go b/cmd/monitor/events/qemu.go index 7874a27..e231698 100644 --- a/cmd/monitor/events/qemu.go +++ b/cmd/monitor/events/qemu.go @@ -1,20 +1,7 @@ package events import ( - "context" - "encoding/json" - "encoding/xml" - "log" - "strings" - "time" - - "deevirt.fr/compute/pkg/config" - "deevirt.fr/compute/pkg/schema" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "libvirt.org/go/libvirt" - - pb "deevirt.fr/compute/pkg/api/proto" ) type Qemu struct { @@ -28,7 +15,7 @@ type Qemu struct { } func QemuEvents(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainQemuMonitorEvent) { - var desc schema.Domain + /*var desc schema.Domain config, _ := config.New() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -61,5 +48,5 @@ func QemuEvents(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainQemu client.Event(ctx, &pb.DomainEventRequest{ Event: e, - }) + })*/ } diff --git a/pkg/api/libvirt/libvirt.go b/pkg/api/libvirt/libvirt.go new file mode 100644 index 0000000..70d840d --- /dev/null +++ b/pkg/api/libvirt/libvirt.go @@ -0,0 +1,18 @@ +package libvirt + +import ( + "fmt" + + go_libvirt "libvirt.org/go/libvirt" +) + +func New(address string, tls bool) (*go_libvirt.Connect, error) { + var libvirt_uri string + if tls { + libvirt_uri = fmt.Sprintf("qemu+tls://%s/system", address) + } else { + libvirt_uri = fmt.Sprintf("qemu+tcp://%s/system", address) + } + + return go_libvirt.NewConnect(libvirt_uri) +} diff --git a/pkg/api/raft/node.go b/pkg/api/raft/node.go index be0a290..b936379 100644 --- a/pkg/api/raft/node.go +++ b/pkg/api/raft/node.go @@ -30,7 +30,7 @@ func (n *RaftNode) init() { defer etcd.Close() for key, value := range etcd_client.GetNodes(etcd, n.Store.conf.ClusterID) { - nodes[key] = NodeStoreInfo{ + nodes[key] = &NodeStoreInfo{ IpManagement: value.IpManagement, Scoring: 0, } @@ -75,7 +75,7 @@ func (n *RaftNode) init() { // Fonction pour surveiller et afficher les changements d'état func (n *RaftNode) WatchStateChanges() { - sched, _ := NewScheduler(n.Store) + worker, _ := NewWorker(n.Store) for obs := range n.StateCh { switch evt := obs.Data.(type) { @@ -87,13 +87,12 @@ func (n *RaftNode) WatchStateChanges() { n.init() } - // On attend une seconde avant de démarrer le scheduler + // On attend une seconde avant de démarrer le worker time.Sleep(1 * time.Second) - go sched.Start() - + worker.Start() } else { - sched.Stop() + worker.Stop() } log.Println("[ÉVÉNEMENT] Changement d'état Raft :", evt) diff --git a/pkg/api/raft/scheduler.go b/pkg/api/raft/scheduler.go deleted file mode 100644 index 58ab1a6..0000000 --- a/pkg/api/raft/scheduler.go +++ /dev/null @@ -1,144 +0,0 @@ -package raft - -import ( - "context" - "encoding/json" - "fmt" - "log" - "time" - - prom_api "github.com/prometheus/client_golang/api" - v1 "github.com/prometheus/client_golang/api/prometheus/v1" - "github.com/prometheus/common/model" - "go.uber.org/zap" - - "deevirt.fr/compute/pkg/config" -) - -type Scheduler struct { - ctx context.Context - cancel context.CancelFunc - cancelled bool - - store *Store - - config *config.Config - log *zap.Logger -} - -func NewScheduler(r *Store) (*Scheduler, error) { - config, _ := config.New() - ctx, cancel := context.WithCancel(context.Background()) - - logger, _ := zap.NewProduction() - - s := &Scheduler{ - ctx: ctx, - cancel: cancel, - cancelled: true, - - store: r, - - config: config, - log: logger, - } - - return s, nil -} - -func (w *Scheduler) api() (v1.API, error) { - client, err := prom_api.NewClient(prom_api.Config{ - Address: "http://172.16.9.161:9090", - }) - if err != nil { - w.log.Error("Prometheus HS") - return nil, nil - } - - return v1.NewAPI(client), nil -} - -func (w *Scheduler) Start() { - go func() { - // On synchronise l'état des hotes - - for { - select { - case <-w.ctx.Done(): - fmt.Println("🛑 Worker arrêté !") - return - default: - fmt.Println("🔄 Controle périodique en cours...") - w.Alerts() - /*for _, t := range w.checkHA() { - w.restartDomain(t) - }*/ - - time.Sleep(1 * time.Minute) - } - } - }() -} - -func (w *Scheduler) Stop() { - if !w.cancelled { - w.cancel() - w.cancelled = true - } -} - -/* -On récupère les alertes -*/ -func (w *Scheduler) Alerts() { - api, err := w.api() - if err != nil { - return - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - // On controle l'état du cluster - query := fmt.Sprintf("ALERTS_FOR_STATE{cluster_id=\"%s\", type=\"deevirt_default\"}\n", w.config.ClusterID) - alerts, _, err := api.Query(ctx, query, time.Now()) - if err != nil { - log.Fatalf("Erreur lors de la récupération des alertes filtrées: %v", err) - } - - if alerts.Type() == model.ValVector { - for _, alert := range alerts.(model.Vector) { - if alert.Metric["severity"] == "critical" { - // En situation critique, on abandonne toutes les actions - return - } - } - } - - query = fmt.Sprintf("ALERTS_FOR_STATE{cluster_id=\"%s\", type=\"deevirt_node_default\"}\n", w.config.ClusterID) - alerts, _, err = api.Query(ctx, query, time.Now()) - if err != nil { - log.Fatalf("Erreur lors de la récupération des alertes filtrées: %v", err) - } - - if alerts.Type() == model.ValVector { - for _, alert := range alerts.(model.Vector) { - println(alert.Metric["node_id"]) - t, _ := w.store.Ls(fmt.Sprintf("/etc/libvirt/qemu/%s", alert.Metric["node_id"]), LsOptions{ - Recursive: false, - Data: true, - }) - - for k, v := range t { - var n DomainStore - json.Unmarshal(v, &n) - - fmt.Printf("On relance la VM %s\n", k) - - fmt.Printf("%v\n", n.State) - } - - log.Printf("%v\n", alert) - } - } -} diff --git a/pkg/api/raft/schema.go b/pkg/api/raft/schema.go index 0582ca7..2760395 100644 --- a/pkg/api/raft/schema.go +++ b/pkg/api/raft/schema.go @@ -1,19 +1,20 @@ package raft -type NodeStore map[string]NodeStoreInfo +type NodeStore map[string]*NodeStoreInfo type NodeStoreInfo struct { IpManagement string + Alive bool Scoring int } -type SchemaDomain struct { - Config string `json:"config"` - State int `json:"state"` -} - type DomainStore struct { Config string `json:"config"` State int `json:"state"` Migrate bool `json:"Migrate"` } + +type SchemaDomain struct { + Config string `json:"config"` + State int `json:"state"` +} diff --git a/pkg/api/raft/worker.go b/pkg/api/raft/worker.go new file mode 100644 index 0000000..66b6624 --- /dev/null +++ b/pkg/api/raft/worker.go @@ -0,0 +1,259 @@ +package raft + +import ( + "context" + "encoding/json" + "fmt" + "log" + "math/rand" + "time" + + "go.uber.org/zap" + go_libvirt "libvirt.org/go/libvirt" + + "deevirt.fr/compute/pkg/api/libvirt" + "deevirt.fr/compute/pkg/config" + "deevirt.fr/compute/pkg/scheduler" +) + +type Worker struct { + ctx context.Context + cancel context.CancelFunc + cancelled bool + + store *Store + + config *config.Config + nodes NodeStore + log *zap.SugaredLogger +} + +func NewWorker(r *Store) (*Worker, error) { + config, _ := config.New() + ctx, cancel := context.WithCancel(context.Background()) + + logger, _ := zap.NewProduction() + + s := &Worker{ + ctx: ctx, + cancel: cancel, + cancelled: true, + + store: r, + + config: config, + log: logger.Sugar(), + } + + return s, nil +} + +func (w *Worker) Start() { + go func() { + for { + select { + case <-w.ctx.Done(): + log.Println("🛑 Worker arrêté !") + return + default: + log.Println("🔄 Contrôle périodique en cours...") + + // Contrôler la connexion avec libvirt + w.handleLibvirtControl() + + // Gérer les alertes du cluster + if err := w.handleClusterAlerts(); err != nil { + log.Printf("Erreur lors du traitement des alertes du cluster : %v", err) + } + + // Gérer les alertes des nœuds + if err := w.handleNodeAlerts(); err != nil { + log.Printf("Erreur lors du traitement des alertes des nœuds : %v", err) + } + + // Attendre 1 minute avant de recommencer + time.Sleep(1 * time.Minute) + } + } + }() +} + +func (w *Worker) Stop() { + if !w.cancelled { + w.cancel() + w.cancelled = true + } +} + +/* +On controle périodiquement l'accessibilité à libvirt, indépendamment du programme moniteur. +Cette vérification assure un double controle pour la HA. +*/ +func (w *Worker) handleLibvirtControl() { + var nodes NodeStore + cluster, err := w.store.Get("/etc/libvirt/cluster") + if err != nil { + w.log.Errorf("Erreur lors de la récupération des données de cluster: %v", err) + return + } + + // Désérialisation des données du cluster + err = json.Unmarshal(cluster, &nodes) + if err != nil { + w.log.Errorf("Erreur lors de la désérialisation des données de cluster: %v", err) + return + } + + for _, conf := range nodes { + // Créer une connexion à libvirt + c, err := libvirt.New(conf.IpManagement, w.store.conf.LibvirtTLS) + if err != nil { + w.log.Warnf("Impossible de créer la connexion libvirt pour %s: %v", conf.IpManagement, err) + conf.Alive = false + continue // Passer à l'élément suivant + } + defer c.Close() // Assurer la fermeture de la connexion à la fin + + // Vérifier si le noeud est vivant + alive, err := c.IsAlive() + if err != nil { + w.log.Warnf("Erreur lors de la vérification de la vie de %s: %v", conf.IpManagement, err) + conf.Alive = false + } else { + conf.Alive = alive + } + } + + // Mise à jour des nodes dans le store + w.nodes = nodes + nodesUpdate, err := json.Marshal(nodes) + if err != nil { + w.log.Errorf("Erreur lors de la sérialisation des données de cluster: %v", err) + return + } + + // Sauvegarder les données mises à jour + err = w.store.Set("/etc/libvirt/cluster", nodesUpdate) + if err != nil { + w.log.Errorf("Erreur lors de la mise à jour du store: %v", err) + } +} + +func (w *Worker) handleClusterAlerts() error { + sched, err := scheduler.New() + if err != nil { + return fmt.Errorf("impossible de créer un scheduler : %v", err) + } + + alertCluster, err := sched.GetAlertCluster() + if err != nil { + return fmt.Errorf("impossible de récupérer les alertes du cluster : %v", err) + } + + for _, alert := range alertCluster { + if alert.Severity == "critical" { + // Ignorer les alertes critiques + continue + } + } + return nil +} + +func (w *Worker) handleNodeAlerts() error { + sched, err := scheduler.New() + if err != nil { + return fmt.Errorf("impossible de créer un scheduler : %v", err) + } + + alertsNode, err := sched.GetAlertNodes() + if err != nil { + return fmt.Errorf("impossible de récupérer les alertes des nœuds : %v", err) + } + + for _, alert := range alertsNode { + switch alert.Event { + case "state": + // Déclencher le HA + w.handleHAExecution(alert.NodeID) + case "cpu": + // Répartir sur un nœud ayant moins de charge CPU + w.handleCPULoadBalance(alert.NodeID) + case "memory": + // Répartir sur un nœud ayant moins de charge mémoire + w.handleMemoryLoadBalance(alert.NodeID) + } + } + return nil +} + +func (w *Worker) handleHAExecution(nodeID string) { + if w.nodes[nodeID].Alive { + // L'information n'est pas cohérente, on ne fait rien + w.log.Infof("L'agent moniteur ne répond pas, mais le noeud reste fonctionnel") + return + } + var cluster NodeStore + c, _ := w.store.Get("/etc/libvirt/cluster") + json.Unmarshal(c, &cluster) + + listDoms, _ := w.store.Ls(fmt.Sprintf("/etc/libvirt/qemu/%s", nodeID), LsOptions{ + Recursive: false, + Data: true, + }) + + s, err := scheduler.New() + if err != nil { + w.log.Errorf("Connexion error to libvirt %v", err) + } + + res, err := s.GetTopNode(4) + if err != nil { + w.log.Errorf("Connexion error to libvirt %v", err) + } + mapSize := len(res) + + for domId := range listDoms { + randomINode := rand.Intn(mapSize) + currentDom, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId)) + var cDomStore DomainStore + json.Unmarshal(currentDom, &cDomStore) + c, err := libvirt.New(cluster[res[randomINode].NodeID].IpManagement, w.store.conf.LibvirtTLS) + if err != nil { + w.log.Errorf("Connexion error to libvirt %v", err) + continue + } + newDom, err := c.DomainDefineXML(cDomStore.Config) + if err != nil { + w.log.Errorf("Error create config %s on the node %s", domId, res[randomINode].NodeID, domId) + continue + } else { + w.store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", res[randomINode].NodeID, domId), currentDom) + w.store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId)) + + var dStore DomainStore + data, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", res[randomINode].NodeID, domId)) + json.Unmarshal(data, &dStore) + + if go_libvirt.DomainState(dStore.State) == go_libvirt.DOMAIN_RUNNING { + err = newDom.Create() + if err != nil { + w.log.Errorf("Error start domain %s on the node %s", domId, res[randomINode].NodeID, domId) + newDom.Undefine() + } + } + } + + w.log.Infof("HA of %s to %s for domain %s", nodeID, res[randomINode].NodeID, domId) + } + //_, err = c.DomainCreateXML(cDomStore.Config, go_libvirt.DOMAIN_START_VALIDATE) +} + +func (w *Worker) handleCPULoadBalance(nodeID string) { + // Implémentation de la répartition de charge CPU + w.log.Infof("Répartition de la charge CPU pour le nœud: %s", nodeID) +} + +func (w *Worker) handleMemoryLoadBalance(nodeID string) { + // Implémentation de la répartition de charge mémoire + w.log.Infof("Répartition de la charge mémoire pour le nœud: %s", nodeID) +} diff --git a/pkg/scheduler/ha.go b/pkg/scheduler/ha.go index 45d7804..4e05096 100644 --- a/pkg/scheduler/ha.go +++ b/pkg/scheduler/ha.go @@ -8,6 +8,19 @@ import ( "github.com/prometheus/common/model" ) +type AlertsCluster struct { + Severity string + Event string + Score int +} + +type AlertsNode struct { + NodeID string + Event string + Severity string + Score int +} + func (s *Scheduler) GetAlertCluster() ([]AlertsCluster, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -22,7 +35,8 @@ func (s *Scheduler) GetAlertCluster() ([]AlertsCluster, error) { data := []AlertsCluster{} for _, res := range res.(model.Vector) { data = append(data, AlertsCluster{ - Severity: string(res.Metric["Severity"]), + Event: string(res.Metric["event"]), + Severity: string(res.Metric["severity"]), Score: int(res.Value), }) } @@ -44,8 +58,9 @@ func (s *Scheduler) GetAlertNodes() ([]AlertsNode, error) { data := []AlertsNode{} for _, res := range res.(model.Vector) { data = append(data, AlertsNode{ - NodeID: string(res.Metric["NodeID"]), - Severity: string(res.Metric["Severity"]), + Event: string(res.Metric["event"]), + NodeID: string(res.Metric["node_id"]), + Severity: string(res.Metric["severity"]), Score: int(res.Value), }) } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 0ae385a..b1ea2c0 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -25,17 +25,6 @@ type TopNode struct { Score int } -type AlertsCluster struct { - Severity string - Score int -} - -type AlertsNode struct { - NodeID string - Severity string - Score int -} - func New() (*Scheduler, error) { config, _ := config.New() @@ -70,7 +59,7 @@ func (s *Scheduler) GetTopNode(number int) ([]TopNode, error) { + (1 - sum(libvirt_node_memory_usage_bytes{cluster_id="%s"} / libvirt_node_memory_total_bytes) by (node_id)) * 0.7 ) * 100 - ) by (node_id)) + ) by (node_id) > 30) `, number, s.Config.ClusterID, s.Config.ClusterID) api, _ := prom.New()