diff --git a/pkg/api/raft/schema.go b/pkg/api/raft/schema.go index 2760395..100b77d 100644 --- a/pkg/api/raft/schema.go +++ b/pkg/api/raft/schema.go @@ -18,3 +18,9 @@ type SchemaDomain struct { Config string `json:"config"` State int `json:"state"` } + +// Metrics +type DomainUsage struct { + DomID string + Usage float64 +} diff --git a/pkg/api/raft/worker.go b/pkg/api/raft/worker.go index 66b6624..8cc1ac6 100644 --- a/pkg/api/raft/worker.go +++ b/pkg/api/raft/worker.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "math/rand" + "sync" "time" "go.uber.org/zap" @@ -85,6 +86,48 @@ func (w *Worker) Stop() { } } +func (w *Worker) Migrate(wg *sync.WaitGroup, nodeID string, newNodeID string, domainID string) { + defer wg.Done() + + c, err := libvirt.New(w.nodes[nodeID].IpManagement, w.store.conf.LibvirtTLS) + if err != nil { + w.log.Infof("Connexion error to libvirt %v", err.Error()) + return + } + defer c.Close() + + dom, err := c.LookupDomainByUUIDString(domainID) + if err != nil { + w.log.Infof("Connexion error to libvirt %v", err.Error()) + return + } + + c_new, err := libvirt.New(w.nodes[newNodeID].IpManagement, w.store.conf.LibvirtTLS) + if err != nil { + w.log.Infof("Connexion error to libvirt %v", err.Error()) + return + } + defer c_new.Close() + + new_dom, err := dom.Migrate(c_new, go_libvirt.MIGRATE_LIVE|go_libvirt.MIGRATE_PERSIST_DEST|go_libvirt.MIGRATE_UNDEFINE_SOURCE, "", "", 0) + if err != nil { + w.log.Infof("Migration error %v", err.Error()) + return + } + + newDomConfig, _ := new_dom.GetXMLDesc(go_libvirt.DOMAIN_XML_INACTIVE) + newDomState, _, _ := new_dom.GetState() + + new, _ := json.Marshal(DomainStore{ + Config: newDomConfig, + State: int(newDomState), + Migrate: false, + }) + + w.store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", newNodeID, domainID), new) + w.store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domainID)) +} + /* On controle périodiquement l'accessibilité à libvirt, indépendamment du programme moniteur. Cette vérification assure un double controle pour la HA. @@ -249,11 +292,67 @@ func (w *Worker) handleHAExecution(nodeID string) { } func (w *Worker) handleCPULoadBalance(nodeID string) { - // Implémentation de la répartition de charge CPU + var wg sync.WaitGroup + w.log.Infof("Répartition de la charge CPU pour le nœud: %s", nodeID) + + // On recherche les plus grosses VM mémoires sur le Noeud + s, err := scheduler.New() + if err != nil { + w.log.Errorf("Connexion error to libvirt %v", err) + } + + res, err := s.GetTopNodeCPU(4) + if err != nil { + w.log.Errorf("Connexion error to libvirt %v", err) + } + mapSize := len(res) + + doms, err := s.GetTopDomainCPUUse(nodeID, 4) + if err != nil { + w.log.Errorf("Connexion error to libvirt %v", err) + } + + for _, dom := range doms { + println(dom.DomainID) + randomINode := rand.Intn(mapSize) + wg.Add(1) + go w.Migrate(&wg, dom.NodeID, res[randomINode].NodeID, dom.DomainID) + } + + wg.Wait() + } func (w *Worker) handleMemoryLoadBalance(nodeID string) { // Implémentation de la répartition de charge mémoire - w.log.Infof("Répartition de la charge mémoire pour le nœud: %s", nodeID) + var wg sync.WaitGroup + + w.log.Infof("Répartition de la charge CPU pour le nœud: %s", nodeID) + + // On recherche les plus grosses VM mémoires sur le Noeud + s, err := scheduler.New() + if err != nil { + w.log.Errorf("Connexion error to libvirt %v", err) + } + + res, err := s.GetTopNodeMemory(4) + if err != nil { + w.log.Errorf("Connexion error to libvirt %v", err) + } + mapSize := len(res) + + doms, err := s.GetTopDomainMemoryUse(nodeID, 4) + if err != nil { + w.log.Errorf("Connexion error to libvirt %v", err) + } + + for _, dom := range doms { + println(dom.DomainID) + randomINode := rand.Intn(mapSize) + wg.Add(1) + go w.Migrate(&wg, dom.NodeID, res[randomINode].NodeID, dom.DomainID) + } + + wg.Wait() } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b1ea2c0..ebb249c 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -22,7 +22,13 @@ type Scheduler struct { type TopNode struct { NodeID string - Score int + Score float64 +} + +type TopDomain struct { + NodeID string + DomainID string + Score float64 } func New() (*Scheduler, error) { @@ -59,7 +65,8 @@ func (s *Scheduler) GetTopNode(number int) ([]TopNode, error) { + (1 - sum(libvirt_node_memory_usage_bytes{cluster_id="%s"} / libvirt_node_memory_total_bytes) by (node_id)) * 0.7 ) * 100 - ) by (node_id) > 30) + ) by (node_id) > 30 + and on(node_id) libvirt_up == 1) `, number, s.Config.ClusterID, s.Config.ClusterID) api, _ := prom.New() @@ -72,7 +79,129 @@ func (s *Scheduler) GetTopNode(number int) ([]TopNode, error) { for _, res := range res.(model.Vector) { data = append(data, TopNode{ NodeID: string(res.Metric["node_id"]), - Score: int(res.Value), + Score: float64(res.Value), + }) + } + + return data, nil +} + +func (s *Scheduler) GetTopNodeCPU(number int) ([]TopNode, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // On calcul un score global, pondéré à 30% sur le processeur, 70% pour la mémoire. + /*query := fmt.Sprintf(` + topk(1, ( (1 - sum(rate(libvirt_domain_cpu_time_seconds_total{cluster_id="%s"}[5m]) / libvirt_domain_virtual_cpus) + by (node_id, domain_id)) * 100) > 30 + and on(node_id, domain_id) libvirt_domain_state == 1) + `, s.Config.ClusterID)*/ + + query := fmt.Sprintf(` + topk(%d, ( (1 - sum(rate(libvirt_node_cpu_time_seconds_total{cluster_id="%s"}[5m]) / libvirt_node_cpu_threads) + by (node_id)) * 100) > 30 + and on(node_id) libvirt_up == 1) + `, number, s.Config.ClusterID) + + api, _ := prom.New() + res, _, err := api.Query(ctx, query, time.Now()) + if err != nil { + return nil, fmt.Errorf("erreur lors de la récupération des alertes filtrées: %v", err) + } + + data := []TopNode{} + for _, res := range res.(model.Vector) { + data = append(data, TopNode{ + NodeID: string(res.Metric["node_id"]), + Score: float64(res.Value), + }) + } + + return data, nil +} + +func (s *Scheduler) GetTopNodeMemory(number int) ([]TopNode, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // On calcul un score global, pondéré à 30% sur le processeur, 70% pour la mémoire. + query := fmt.Sprintf(` + topk(%d, ( (1 - sum(libvirt_node_memory_usage_bytes{cluster_id="%s"} / libvirt_node_memory_total_bytes) + by (node_id)) * 100) > 30 + and on(node_id) libvirt_up == 1) + `, number, s.Config.ClusterID) + + api, _ := prom.New() + res, _, err := api.Query(ctx, query, time.Now()) + if err != nil { + return nil, fmt.Errorf("erreur lors de la récupération des alertes filtrées: %v", err) + } + + data := []TopNode{} + for _, res := range res.(model.Vector) { + data = append(data, TopNode{ + NodeID: string(res.Metric["node_id"]), + Score: float64(res.Value), + }) + } + + return data, nil +} + +// Domains +func (s *Scheduler) GetTopDomainCPUUse(nodeID string, number int) ([]TopDomain, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // On calcul un score global, pondéré à 30% sur le processeur, 70% pour la mémoire. + query := fmt.Sprintf(` + topk(%d, sum( + rate(libvirt_domain_cpu_time_seconds_total{node_id="%s"}[5m]) / libvirt_domain_virtual_cpus) + by (node_id, domain_id) * 100 + and on(domain_id) libvirt_domain_state == 1 and on(node_id) libvirt_up == 1) + `, number, nodeID) + + api, _ := prom.New() + res, _, err := api.Query(ctx, query, time.Now()) + if err != nil { + return nil, fmt.Errorf("erreur lors de la récupération des alertes filtrées: %v", err) + } + + data := []TopDomain{} + for _, res := range res.(model.Vector) { + data = append(data, TopDomain{ + NodeID: string(res.Metric["node_id"]), + DomainID: string(res.Metric["domain_id"]), + Score: float64(res.Value), + }) + } + + return data, nil +} + +func (s *Scheduler) GetTopDomainMemoryUse(nodeID string, number int) ([]TopDomain, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // On calcul un score global, pondéré à 30% sur le processeur, 70% pour la mémoire. + query := fmt.Sprintf(` + topk(%d, sum(libvirt_domain_balloon_current_bytes{node_id="%s"}) + by (node_id, domain_id) + and on(domain_id) libvirt_domain_state == 1 and on(node_id) libvirt_up == 1) + `, number, nodeID) + + api, _ := prom.New() + res, _, err := api.Query(ctx, query, time.Now()) + if err != nil { + return nil, fmt.Errorf("erreur lors de la récupération des alertes filtrées: %v", err) + } + + data := []TopDomain{} + for _, res := range res.(model.Vector) { + data = append(data, TopDomain{ + NodeID: string(res.Metric["node_id"]), + DomainID: string(res.Metric["domain_id"]), + Score: float64(res.Value), }) }