diff --git a/pkg/scheduler/ha.go b/pkg/scheduler/ha.go new file mode 100644 index 0000000..45b79ec --- /dev/null +++ b/pkg/scheduler/ha.go @@ -0,0 +1,97 @@ +package scheduler + +import ( + "context" + "fmt" + "log" + "regexp" + "strings" + "time" + + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type nodeDown struct { + node_id string + domains []string +} + +func (w *Scheduler) checkHA() []nodeDown { + s := []nodeDown{} + + etcd, err := clientv3.New(clientv3.Config{ + Endpoints: strings.Split(w.config.EtcdURI, ","), + DialTimeout: 5 * time.Second, + }) + if err != nil { + log.Fatalf("Error connexion to etcd: %v", err) + } + defer etcd.Close() + + r := v1.Range{ + Start: time.Now().Add(-time.Minute), + End: time.Now(), + Step: 2 * time.Minute, + } + + api, err := w.api() + if err != nil { + return s + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + println("up{cluster_id=" + w.config.ClusterID + "}") + + result, warnings, err := api.QueryRange(ctx, "up{cluster_id='"+w.config.ClusterID+"'}", r, v1.WithTimeout(5*time.Second)) + if err != nil { + fmt.Printf("Error querying Prometheus: %v\n", err) + } + if len(warnings) > 0 { + fmt.Printf("Warnings: %v\n", warnings) + } + + matrix, _ := result.(model.Matrix) + + for _, stream := range matrix { + node_id := "" + domains := []string{} + + for key, value := range stream.Metric { + if key == "node_id" { + //test.instance = string(value) + node_id = string(value) + } + + } + + state := int(stream.Values[0].Value) + + if state == 1 { + re := regexp.MustCompile(`qemu/(?P[a-zA-Z0-9-]+)`) + + resp, _ := etcd.Get(ctx, "/cluster/"+w.config.ClusterID+"/host/"+node_id+"/qemu/", clientv3.WithPrefix(), clientv3.WithKeysOnly()) + for _, kv := range resp.Kvs { + matches := re.FindStringSubmatch(string(kv.Key)) + if matches != nil { + index := re.SubexpIndex("domainID") + domains = append(domains, matches[index]) + } + } + + s = append(s, nodeDown{ + node_id: node_id, + domains: domains, + }) + } + + /*for _, pair := range stream.Values { + println(pair.Value.String()) + }*/ + } + + return s +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go new file mode 100644 index 0000000..ff55e47 --- /dev/null +++ b/pkg/scheduler/scheduler.go @@ -0,0 +1,155 @@ +package scheduler + +import ( + "context" + "fmt" + "time" + + "deevirt.fr/compute/pkg/config" + prom_api "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "go.uber.org/zap" +) + +type Scheduler struct { + ctx context.Context + cancel context.CancelFunc + cancelled bool + + config *config.Config + log *zap.Logger +} + +func New() (*Scheduler, error) { + config, _ := config.NewConfig() + ctx, cancel := context.WithCancel(context.Background()) + + logger, _ := zap.NewProduction() + + s := &Scheduler{ + ctx: ctx, + cancel: cancel, + cancelled: true, + + config: config, + log: logger, + } + + return s, nil +} + +func (w *Scheduler) api() (v1.API, error) { + client, err := prom_api.NewClient(prom_api.Config{ + Address: "http://172.16.9.161:9090", + }) + if err != nil { + w.log.Error("Prometheus HS") + return nil, nil + } + + return v1.NewAPI(client), nil +} + +type scoringNode struct { + cpu float64 + memory float64 +} + +type scoring struct { + domain map[string]scoringNode +} + +func (w *Scheduler) restartDomain(domain nodeDown) { + api, err := w.api() + if err != nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + query := fmt.Sprintf(` + 100 * ( + (sum(rate(libvirt_node_cpu_time_seconds_total{cluster_id='%s'}[5m])) by (node_id) / sum(libvirt_node_cpu_threads{cluster_id='%s'}) by (node_id) + + + sum(libvirt_node_memory_usage_bytes{cluster_id='%s'}) by (node_id) / sum(libvirt_node_memory_total_bytes{cluster_id='%s'}) by (node_id)) + / 2 + )`, w.config.ClusterID, w.config.ClusterID, w.config.ClusterID, w.config.ClusterID) + + cpu, _, _ := api.Query(ctx, query, time.Now(), v1.WithTimeout(5*time.Second)) + + matrix, _ := cpu.(model.Vector) + for _, stream := range matrix { + println(stream.Value.String()) + } + + /*cpu, _, _ := api.Query(ctx, "rate(libvirt_node_cpu_time_seconds_total{cluster_id='"+w.config.ClusterID+"'}[5m]) * 100", time.Now(), v1.WithTimeout(5*time.Second)) + + score. + + matrix, _ := cpu.(model.Vector) + for _, stream := range matrix { + total := 100 + + for key, value := range stream.Metric { + if key == "threads" { + threads, _ := strconv.Atoi(string(value)) + + total = threads * 100 + + println(total) + } + + //fmt.Printf("%s => %s\n", key, value) + } + + usage := float64(stream.Value) + + p := usage / float64(total) * 100 + + fmt.Printf("%.2f%%\n", p) + + //println(stream.Value.String()) + } + + memory_usage, _, _ := api.Query(ctx, + "(libvirt_node_memory_usage_bytes{cluster_id='"+w.config.ClusterID+"'}/1024e2)", + time.Now(), + v1.WithTimeout(5*time.Second)) + + memory_total, _, _ := api.Query(ctx, + "(libvirt_node_memory_usage_bytes{cluster_id='"+w.config.ClusterID+"'}/1024e2)", + time.Now(), + v1.WithTimeout(5*time.Second))*/ + + //fmt.Printf("==>%v\n", cpu) + //fmt.Printf("%v\n", memory) + +} + +func (w *Scheduler) Start() { + go func() { + for { + select { + case <-w.ctx.Done(): + fmt.Println("🛑 Worker arrêté !") + return + default: + fmt.Println("🔄 Controle périodique en cours...") + for _, t := range w.checkHA() { + w.restartDomain(t) + } + + time.Sleep(1 * time.Minute) + } + } + }() +} + +func (w *Scheduler) Stop() { + if !w.cancelled { + w.cancel() + w.cancelled = true + } +}