compute/pkg/scheduler/scheduler.go
2025-02-16 09:09:39 +01:00

156 lines
3.2 KiB
Go

package scheduler
import (
"context"
"fmt"
"time"
"deevirt.fr/compute/pkg/config"
prom_api "github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"go.uber.org/zap"
)
type Scheduler struct {
ctx context.Context
cancel context.CancelFunc
cancelled bool
config *config.Config
log *zap.Logger
}
func New() (*Scheduler, error) {
config, _ := config.NewConfig()
ctx, cancel := context.WithCancel(context.Background())
logger, _ := zap.NewProduction()
s := &Scheduler{
ctx: ctx,
cancel: cancel,
cancelled: true,
config: config,
log: logger,
}
return s, nil
}
func (w *Scheduler) api() (v1.API, error) {
client, err := prom_api.NewClient(prom_api.Config{
Address: "http://172.16.9.161:9090",
})
if err != nil {
w.log.Error("Prometheus HS")
return nil, nil
}
return v1.NewAPI(client), nil
}
type scoringNode struct {
cpu float64
memory float64
}
type scoring struct {
domain map[string]scoringNode
}
func (w *Scheduler) restartDomain(domain nodeDown) {
api, err := w.api()
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
query := fmt.Sprintf(`
100 * (
(sum(rate(libvirt_node_cpu_time_seconds_total{cluster_id='%s'}[5m])) by (node_id) / sum(libvirt_node_cpu_threads{cluster_id='%s'}) by (node_id)
+
sum(libvirt_node_memory_usage_bytes{cluster_id='%s'}) by (node_id) / sum(libvirt_node_memory_total_bytes{cluster_id='%s'}) by (node_id))
/ 2
)`, w.config.ClusterID, w.config.ClusterID, w.config.ClusterID, w.config.ClusterID)
cpu, _, _ := api.Query(ctx, query, time.Now(), v1.WithTimeout(5*time.Second))
matrix, _ := cpu.(model.Vector)
for _, stream := range matrix {
println(stream.Value.String())
}
/*cpu, _, _ := api.Query(ctx, "rate(libvirt_node_cpu_time_seconds_total{cluster_id='"+w.config.ClusterID+"'}[5m]) * 100", time.Now(), v1.WithTimeout(5*time.Second))
score.
matrix, _ := cpu.(model.Vector)
for _, stream := range matrix {
total := 100
for key, value := range stream.Metric {
if key == "threads" {
threads, _ := strconv.Atoi(string(value))
total = threads * 100
println(total)
}
//fmt.Printf("%s => %s\n", key, value)
}
usage := float64(stream.Value)
p := usage / float64(total) * 100
fmt.Printf("%.2f%%\n", p)
//println(stream.Value.String())
}
memory_usage, _, _ := api.Query(ctx,
"(libvirt_node_memory_usage_bytes{cluster_id='"+w.config.ClusterID+"'}/1024e2)",
time.Now(),
v1.WithTimeout(5*time.Second))
memory_total, _, _ := api.Query(ctx,
"(libvirt_node_memory_usage_bytes{cluster_id='"+w.config.ClusterID+"'}/1024e2)",
time.Now(),
v1.WithTimeout(5*time.Second))*/
//fmt.Printf("==>%v\n", cpu)
//fmt.Printf("%v\n", memory)
}
func (w *Scheduler) Start() {
go func() {
for {
select {
case <-w.ctx.Done():
fmt.Println("🛑 Worker arrêté !")
return
default:
fmt.Println("🔄 Controle périodique en cours...")
for _, t := range w.checkHA() {
w.restartDomain(t)
}
time.Sleep(1 * time.Minute)
}
}
}()
}
func (w *Scheduler) Stop() {
if !w.cancelled {
w.cancel()
w.cancelled = true
}
}