Implémentation de la HA

This commit is contained in:
Mickael BOURNEUF 2025-02-25 19:17:20 +01:00
parent c69ec9d70c
commit ceaa0d592e
8 changed files with 310 additions and 186 deletions

View File

@ -1,20 +1,7 @@
package events
import (
"context"
"encoding/json"
"encoding/xml"
"log"
"strings"
"time"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/schema"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"libvirt.org/go/libvirt"
pb "deevirt.fr/compute/pkg/api/proto"
)
type Qemu struct {
@ -28,7 +15,7 @@ type Qemu struct {
}
func QemuEvents(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainQemuMonitorEvent) {
var desc schema.Domain
/*var desc schema.Domain
config, _ := config.New()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@ -61,5 +48,5 @@ func QemuEvents(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainQemu
client.Event(ctx, &pb.DomainEventRequest{
Event: e,
})
})*/
}

View File

@ -0,0 +1,18 @@
package libvirt
import (
"fmt"
go_libvirt "libvirt.org/go/libvirt"
)
func New(address string, tls bool) (*go_libvirt.Connect, error) {
var libvirt_uri string
if tls {
libvirt_uri = fmt.Sprintf("qemu+tls://%s/system", address)
} else {
libvirt_uri = fmt.Sprintf("qemu+tcp://%s/system", address)
}
return go_libvirt.NewConnect(libvirt_uri)
}

View File

@ -30,7 +30,7 @@ func (n *RaftNode) init() {
defer etcd.Close()
for key, value := range etcd_client.GetNodes(etcd, n.Store.conf.ClusterID) {
nodes[key] = NodeStoreInfo{
nodes[key] = &NodeStoreInfo{
IpManagement: value.IpManagement,
Scoring: 0,
}
@ -75,7 +75,7 @@ func (n *RaftNode) init() {
// Fonction pour surveiller et afficher les changements d'état
func (n *RaftNode) WatchStateChanges() {
sched, _ := NewScheduler(n.Store)
worker, _ := NewWorker(n.Store)
for obs := range n.StateCh {
switch evt := obs.Data.(type) {
@ -87,13 +87,12 @@ func (n *RaftNode) WatchStateChanges() {
n.init()
}
// On attend une seconde avant de démarrer le scheduler
// On attend une seconde avant de démarrer le worker
time.Sleep(1 * time.Second)
go sched.Start()
worker.Start()
} else {
sched.Stop()
worker.Stop()
}
log.Println("[ÉVÉNEMENT] Changement d'état Raft :", evt)

View File

@ -1,144 +0,0 @@
package raft
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
prom_api "github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"go.uber.org/zap"
"deevirt.fr/compute/pkg/config"
)
type Scheduler struct {
ctx context.Context
cancel context.CancelFunc
cancelled bool
store *Store
config *config.Config
log *zap.Logger
}
func NewScheduler(r *Store) (*Scheduler, error) {
config, _ := config.New()
ctx, cancel := context.WithCancel(context.Background())
logger, _ := zap.NewProduction()
s := &Scheduler{
ctx: ctx,
cancel: cancel,
cancelled: true,
store: r,
config: config,
log: logger,
}
return s, nil
}
func (w *Scheduler) api() (v1.API, error) {
client, err := prom_api.NewClient(prom_api.Config{
Address: "http://172.16.9.161:9090",
})
if err != nil {
w.log.Error("Prometheus HS")
return nil, nil
}
return v1.NewAPI(client), nil
}
func (w *Scheduler) Start() {
go func() {
// On synchronise l'état des hotes
for {
select {
case <-w.ctx.Done():
fmt.Println("🛑 Worker arrêté !")
return
default:
fmt.Println("🔄 Controle périodique en cours...")
w.Alerts()
/*for _, t := range w.checkHA() {
w.restartDomain(t)
}*/
time.Sleep(1 * time.Minute)
}
}
}()
}
func (w *Scheduler) Stop() {
if !w.cancelled {
w.cancel()
w.cancelled = true
}
}
/*
On récupère les alertes
*/
func (w *Scheduler) Alerts() {
api, err := w.api()
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// On controle l'état du cluster
query := fmt.Sprintf("ALERTS_FOR_STATE{cluster_id=\"%s\", type=\"deevirt_default\"}\n", w.config.ClusterID)
alerts, _, err := api.Query(ctx, query, time.Now())
if err != nil {
log.Fatalf("Erreur lors de la récupération des alertes filtrées: %v", err)
}
if alerts.Type() == model.ValVector {
for _, alert := range alerts.(model.Vector) {
if alert.Metric["severity"] == "critical" {
// En situation critique, on abandonne toutes les actions
return
}
}
}
query = fmt.Sprintf("ALERTS_FOR_STATE{cluster_id=\"%s\", type=\"deevirt_node_default\"}\n", w.config.ClusterID)
alerts, _, err = api.Query(ctx, query, time.Now())
if err != nil {
log.Fatalf("Erreur lors de la récupération des alertes filtrées: %v", err)
}
if alerts.Type() == model.ValVector {
for _, alert := range alerts.(model.Vector) {
println(alert.Metric["node_id"])
t, _ := w.store.Ls(fmt.Sprintf("/etc/libvirt/qemu/%s", alert.Metric["node_id"]), LsOptions{
Recursive: false,
Data: true,
})
for k, v := range t {
var n DomainStore
json.Unmarshal(v, &n)
fmt.Printf("On relance la VM %s\n", k)
fmt.Printf("%v\n", n.State)
}
log.Printf("%v\n", alert)
}
}
}

View File

@ -1,19 +1,20 @@
package raft
type NodeStore map[string]NodeStoreInfo
type NodeStore map[string]*NodeStoreInfo
type NodeStoreInfo struct {
IpManagement string
Alive bool
Scoring int
}
type SchemaDomain struct {
Config string `json:"config"`
State int `json:"state"`
}
type DomainStore struct {
Config string `json:"config"`
State int `json:"state"`
Migrate bool `json:"Migrate"`
}
type SchemaDomain struct {
Config string `json:"config"`
State int `json:"state"`
}

259
pkg/api/raft/worker.go Normal file
View File

@ -0,0 +1,259 @@
package raft
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"time"
"go.uber.org/zap"
go_libvirt "libvirt.org/go/libvirt"
"deevirt.fr/compute/pkg/api/libvirt"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/scheduler"
)
type Worker struct {
ctx context.Context
cancel context.CancelFunc
cancelled bool
store *Store
config *config.Config
nodes NodeStore
log *zap.SugaredLogger
}
func NewWorker(r *Store) (*Worker, error) {
config, _ := config.New()
ctx, cancel := context.WithCancel(context.Background())
logger, _ := zap.NewProduction()
s := &Worker{
ctx: ctx,
cancel: cancel,
cancelled: true,
store: r,
config: config,
log: logger.Sugar(),
}
return s, nil
}
func (w *Worker) Start() {
go func() {
for {
select {
case <-w.ctx.Done():
log.Println("🛑 Worker arrêté !")
return
default:
log.Println("🔄 Contrôle périodique en cours...")
// Contrôler la connexion avec libvirt
w.handleLibvirtControl()
// Gérer les alertes du cluster
if err := w.handleClusterAlerts(); err != nil {
log.Printf("Erreur lors du traitement des alertes du cluster : %v", err)
}
// Gérer les alertes des nœuds
if err := w.handleNodeAlerts(); err != nil {
log.Printf("Erreur lors du traitement des alertes des nœuds : %v", err)
}
// Attendre 1 minute avant de recommencer
time.Sleep(1 * time.Minute)
}
}
}()
}
func (w *Worker) Stop() {
if !w.cancelled {
w.cancel()
w.cancelled = true
}
}
/*
On controle périodiquement l'accessibilité à libvirt, indépendamment du programme moniteur.
Cette vérification assure un double controle pour la HA.
*/
func (w *Worker) handleLibvirtControl() {
var nodes NodeStore
cluster, err := w.store.Get("/etc/libvirt/cluster")
if err != nil {
w.log.Errorf("Erreur lors de la récupération des données de cluster: %v", err)
return
}
// Désérialisation des données du cluster
err = json.Unmarshal(cluster, &nodes)
if err != nil {
w.log.Errorf("Erreur lors de la désérialisation des données de cluster: %v", err)
return
}
for _, conf := range nodes {
// Créer une connexion à libvirt
c, err := libvirt.New(conf.IpManagement, w.store.conf.LibvirtTLS)
if err != nil {
w.log.Warnf("Impossible de créer la connexion libvirt pour %s: %v", conf.IpManagement, err)
conf.Alive = false
continue // Passer à l'élément suivant
}
defer c.Close() // Assurer la fermeture de la connexion à la fin
// Vérifier si le noeud est vivant
alive, err := c.IsAlive()
if err != nil {
w.log.Warnf("Erreur lors de la vérification de la vie de %s: %v", conf.IpManagement, err)
conf.Alive = false
} else {
conf.Alive = alive
}
}
// Mise à jour des nodes dans le store
w.nodes = nodes
nodesUpdate, err := json.Marshal(nodes)
if err != nil {
w.log.Errorf("Erreur lors de la sérialisation des données de cluster: %v", err)
return
}
// Sauvegarder les données mises à jour
err = w.store.Set("/etc/libvirt/cluster", nodesUpdate)
if err != nil {
w.log.Errorf("Erreur lors de la mise à jour du store: %v", err)
}
}
func (w *Worker) handleClusterAlerts() error {
sched, err := scheduler.New()
if err != nil {
return fmt.Errorf("impossible de créer un scheduler : %v", err)
}
alertCluster, err := sched.GetAlertCluster()
if err != nil {
return fmt.Errorf("impossible de récupérer les alertes du cluster : %v", err)
}
for _, alert := range alertCluster {
if alert.Severity == "critical" {
// Ignorer les alertes critiques
continue
}
}
return nil
}
func (w *Worker) handleNodeAlerts() error {
sched, err := scheduler.New()
if err != nil {
return fmt.Errorf("impossible de créer un scheduler : %v", err)
}
alertsNode, err := sched.GetAlertNodes()
if err != nil {
return fmt.Errorf("impossible de récupérer les alertes des nœuds : %v", err)
}
for _, alert := range alertsNode {
switch alert.Event {
case "state":
// Déclencher le HA
w.handleHAExecution(alert.NodeID)
case "cpu":
// Répartir sur un nœud ayant moins de charge CPU
w.handleCPULoadBalance(alert.NodeID)
case "memory":
// Répartir sur un nœud ayant moins de charge mémoire
w.handleMemoryLoadBalance(alert.NodeID)
}
}
return nil
}
func (w *Worker) handleHAExecution(nodeID string) {
if w.nodes[nodeID].Alive {
// L'information n'est pas cohérente, on ne fait rien
w.log.Infof("L'agent moniteur ne répond pas, mais le noeud reste fonctionnel")
return
}
var cluster NodeStore
c, _ := w.store.Get("/etc/libvirt/cluster")
json.Unmarshal(c, &cluster)
listDoms, _ := w.store.Ls(fmt.Sprintf("/etc/libvirt/qemu/%s", nodeID), LsOptions{
Recursive: false,
Data: true,
})
s, err := scheduler.New()
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
}
res, err := s.GetTopNode(4)
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
}
mapSize := len(res)
for domId := range listDoms {
randomINode := rand.Intn(mapSize)
currentDom, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId))
var cDomStore DomainStore
json.Unmarshal(currentDom, &cDomStore)
c, err := libvirt.New(cluster[res[randomINode].NodeID].IpManagement, w.store.conf.LibvirtTLS)
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
continue
}
newDom, err := c.DomainDefineXML(cDomStore.Config)
if err != nil {
w.log.Errorf("Error create config %s on the node %s", domId, res[randomINode].NodeID, domId)
continue
} else {
w.store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", res[randomINode].NodeID, domId), currentDom)
w.store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId))
var dStore DomainStore
data, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", res[randomINode].NodeID, domId))
json.Unmarshal(data, &dStore)
if go_libvirt.DomainState(dStore.State) == go_libvirt.DOMAIN_RUNNING {
err = newDom.Create()
if err != nil {
w.log.Errorf("Error start domain %s on the node %s", domId, res[randomINode].NodeID, domId)
newDom.Undefine()
}
}
}
w.log.Infof("HA of %s to %s for domain %s", nodeID, res[randomINode].NodeID, domId)
}
//_, err = c.DomainCreateXML(cDomStore.Config, go_libvirt.DOMAIN_START_VALIDATE)
}
func (w *Worker) handleCPULoadBalance(nodeID string) {
// Implémentation de la répartition de charge CPU
w.log.Infof("Répartition de la charge CPU pour le nœud: %s", nodeID)
}
func (w *Worker) handleMemoryLoadBalance(nodeID string) {
// Implémentation de la répartition de charge mémoire
w.log.Infof("Répartition de la charge mémoire pour le nœud: %s", nodeID)
}

View File

@ -8,6 +8,19 @@ import (
"github.com/prometheus/common/model"
)
type AlertsCluster struct {
Severity string
Event string
Score int
}
type AlertsNode struct {
NodeID string
Event string
Severity string
Score int
}
func (s *Scheduler) GetAlertCluster() ([]AlertsCluster, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
@ -22,7 +35,8 @@ func (s *Scheduler) GetAlertCluster() ([]AlertsCluster, error) {
data := []AlertsCluster{}
for _, res := range res.(model.Vector) {
data = append(data, AlertsCluster{
Severity: string(res.Metric["Severity"]),
Event: string(res.Metric["event"]),
Severity: string(res.Metric["severity"]),
Score: int(res.Value),
})
}
@ -44,8 +58,9 @@ func (s *Scheduler) GetAlertNodes() ([]AlertsNode, error) {
data := []AlertsNode{}
for _, res := range res.(model.Vector) {
data = append(data, AlertsNode{
NodeID: string(res.Metric["NodeID"]),
Severity: string(res.Metric["Severity"]),
Event: string(res.Metric["event"]),
NodeID: string(res.Metric["node_id"]),
Severity: string(res.Metric["severity"]),
Score: int(res.Value),
})
}

View File

@ -25,17 +25,6 @@ type TopNode struct {
Score int
}
type AlertsCluster struct {
Severity string
Score int
}
type AlertsNode struct {
NodeID string
Severity string
Score int
}
func New() (*Scheduler, error) {
config, _ := config.New()
@ -70,7 +59,7 @@ func (s *Scheduler) GetTopNode(number int) ([]TopNode, error) {
+
(1 - sum(libvirt_node_memory_usage_bytes{cluster_id="%s"} / libvirt_node_memory_total_bytes) by (node_id)) * 0.7
) * 100
) by (node_id))
) by (node_id) > 30)
`, number, s.Config.ClusterID, s.Config.ClusterID)
api, _ := prom.New()