package scheduler import ( "context" "fmt" "time" "deevirt.fr/compute/pkg/config" prom_api "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "go.uber.org/zap" ) type Scheduler struct { ctx context.Context cancel context.CancelFunc cancelled bool config *config.Config log *zap.Logger } func New() (*Scheduler, error) { config, _ := config.NewConfig() ctx, cancel := context.WithCancel(context.Background()) logger, _ := zap.NewProduction() s := &Scheduler{ ctx: ctx, cancel: cancel, cancelled: true, config: config, log: logger, } return s, nil } func (w *Scheduler) api() (v1.API, error) { client, err := prom_api.NewClient(prom_api.Config{ Address: "http://172.16.9.161:9090", }) if err != nil { w.log.Error("Prometheus HS") return nil, nil } return v1.NewAPI(client), nil } type scoringNode struct { cpu float64 memory float64 } type scoring struct { domain map[string]scoringNode } func (w *Scheduler) restartDomain(domain nodeDown) { api, err := w.api() if err != nil { return } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() query := fmt.Sprintf(` 100 * ( (sum(rate(libvirt_node_cpu_time_seconds_total{cluster_id='%s'}[5m])) by (node_id) / sum(libvirt_node_cpu_threads{cluster_id='%s'}) by (node_id) + sum(libvirt_node_memory_usage_bytes{cluster_id='%s'}) by (node_id) / sum(libvirt_node_memory_total_bytes{cluster_id='%s'}) by (node_id)) / 2 )`, w.config.ClusterID, w.config.ClusterID, w.config.ClusterID, w.config.ClusterID) cpu, _, _ := api.Query(ctx, query, time.Now(), v1.WithTimeout(5*time.Second)) matrix, _ := cpu.(model.Vector) for _, stream := range matrix { println(stream.Value.String()) } /*cpu, _, _ := api.Query(ctx, "rate(libvirt_node_cpu_time_seconds_total{cluster_id='"+w.config.ClusterID+"'}[5m]) * 100", time.Now(), v1.WithTimeout(5*time.Second)) score. matrix, _ := cpu.(model.Vector) for _, stream := range matrix { total := 100 for key, value := range stream.Metric { if key == "threads" { threads, _ := strconv.Atoi(string(value)) total = threads * 100 println(total) } //fmt.Printf("%s => %s\n", key, value) } usage := float64(stream.Value) p := usage / float64(total) * 100 fmt.Printf("%.2f%%\n", p) //println(stream.Value.String()) } memory_usage, _, _ := api.Query(ctx, "(libvirt_node_memory_usage_bytes{cluster_id='"+w.config.ClusterID+"'}/1024e2)", time.Now(), v1.WithTimeout(5*time.Second)) memory_total, _, _ := api.Query(ctx, "(libvirt_node_memory_usage_bytes{cluster_id='"+w.config.ClusterID+"'}/1024e2)", time.Now(), v1.WithTimeout(5*time.Second))*/ //fmt.Printf("==>%v\n", cpu) //fmt.Printf("%v\n", memory) } func (w *Scheduler) Start() { go func() { for { select { case <-w.ctx.Done(): fmt.Println("🛑 Worker arrêté !") return default: fmt.Println("🔄 Controle périodique en cours...") for _, t := range w.checkHA() { w.restartDomain(t) } time.Sleep(1 * time.Minute) } } }() } func (w *Scheduler) Stop() { if !w.cancelled { w.cancel() w.cancelled = true } }