diff --git a/cmd/compute_mgr/main.go b/cmd/compute_mgr/main.go index a427739..4836c1e 100644 --- a/cmd/compute_mgr/main.go +++ b/cmd/compute_mgr/main.go @@ -6,11 +6,17 @@ import ( "fmt" "log" "net" + "os" + "time" pb "deevirt.fr/compute/cmd/compute_mgr/proto" "deevirt.fr/compute/cmd/compute_mgr/server/raft" "google.golang.org/grpc" "google.golang.org/grpc/reflection" + + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" ) var ( @@ -18,6 +24,66 @@ var ( raftId = flag.String("raft_id", "", "Node id used by Raft") ) +type UP struct { + instance string + status string +} + +func main0() { + client, err := api.NewClient(api.Config{ + Address: "http://172.16.9.161:9090", + }) + if err != nil { + fmt.Printf("Error creating client: %v\n", err) + os.Exit(1) + } + + v1api := v1.NewAPI(client) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + r := v1.Range{ + Start: time.Now().Add(-time.Minute), + End: time.Now(), + Step: 2 * time.Minute, + } + result, warnings, err := v1api.QueryRange(ctx, "up{job='f242b4bb-b6d0-415f-b3f9-9e9d439532b5'}", r, v1.WithTimeout(5*time.Second)) + if err != nil { + fmt.Printf("Error querying Prometheus: %v\n", err) + os.Exit(1) + } + if len(warnings) > 0 { + fmt.Printf("Warnings: %v\n", warnings) + } + //fmt.Printf("Result:\n%v\n", result) + + // Convertir le résultat en Matrix + matrix, ok := result.(model.Matrix) + if !ok { + fmt.Println("Erreur : le résultat n'est pas de type Matrix") + return + } + + for _, stream := range matrix { + var instance = "" + + for key, value := range stream.Metric { + if key == "node_id" { + //test.instance = string(value) + instance = string(value) + } + + } + + println(instance) + println(stream.Values[0].Value.String()) + + /*for _, pair := range stream.Values { + println(pair.Value.String()) + }*/ + } +} + func main() { flag.Parse() diff --git a/cmd/compute_mgr/server/raft/node.go b/cmd/compute_mgr/server/raft/node.go index e4d5025..b6a2b8c 100644 --- a/cmd/compute_mgr/server/raft/node.go +++ b/cmd/compute_mgr/server/raft/node.go @@ -9,6 +9,7 @@ import ( "path/filepath" "time" + "deevirt.fr/compute/pkg/scheduler" transport "github.com/Jille/raft-grpc-transport" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" @@ -24,7 +25,7 @@ type RaftNode struct { Raft *raft.Raft NodeID string StateCh chan raft.Observation - Scheduler Worker + scheduler *scheduler.Scheduler } type Worker struct { @@ -96,19 +97,25 @@ func NewRaft(ctx context.Context, myID, myAddress string) (*raft.Raft, *transpor return nil, nil, fmt.Errorf("raft.NewRaft: %v", err) } + s, err := scheduler.New() + if err != nil { + return nil, nil, fmt.Errorf("scheduler: %v", err) + } + // Observer pour surveiller les changements d'état stateCh := make(chan raft.Observation, 1) // Canal de type raft.Observation r.RegisterObserver(raft.NewObserver(stateCh, true, nil)) node := &RaftNode{ - Raft: r, - NodeID: myID, - StateCh: stateCh, + Raft: r, + NodeID: myID, + StateCh: stateCh, + scheduler: s, } go node.watchStateChanges() - // Vérification si des logs ou snapshots existent + // 🔍 Vérification si des logs ou snapshots existent hasState, _ := checkIfStateExists(ldb, fss) println(myAddress) @@ -120,14 +127,14 @@ func NewRaft(ctx context.Context, myID, myAddress string) (*raft.Raft, *transpor ID: raft.ServerID(myID), Address: raft.ServerAddress(myAddress), }, - { + /*{ ID: raft.ServerID("nodeB"), Address: raft.ServerAddress("localhost:50053"), }, { ID: raft.ServerID("nodeC"), Address: raft.ServerAddress("localhost:50054"), - }, + },*/ }, } f := r.BootstrapCluster(cfg) @@ -135,6 +142,7 @@ func NewRaft(ctx context.Context, myID, myAddress string) (*raft.Raft, *transpor return nil, nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) } } + return r, tm, nil } @@ -143,10 +151,10 @@ func (w *Worker) Start() { for { select { case <-w.ctx.Done(): - fmt.Println("Worker arrêté !") + fmt.Println("🛑 Worker arrêté !") return default: - fmt.Println("Worker en cours...") + fmt.Println("🔄 Worker en cours...") time.Sleep(1 * time.Second) } } @@ -158,7 +166,7 @@ func (w *Worker) Stop() { w.cancel() // Annuler le contexte w.cancelled = true // Marquer comme annulé } else { - fmt.Println("Cancel déjà appelé, Worker déjà arrêté.") + fmt.Println("❗ Cancel déjà appelé, Worker déjà arrêté.") } } @@ -173,23 +181,11 @@ func (n *RaftNode) watchStateChanges() { case raft.RaftState: if evt == raft.Leader { - ctx, cancel := context.WithCancel(context.Background()) - - worker := Worker{ctx: ctx, cancel: cancel, cancelled: true} - n.Scheduler = worker - go n.Scheduler.Start() + go n.scheduler.Start() log.Println("[ÉVÉNEMENT] Changement d'état Raft :", evt) } else { - if n.Scheduler.ctx != nil { - n.Scheduler.cancel() - n.Scheduler = Worker{} - } - println() - /*if !n.Scheduler.IsCancelled() { - println("pas bon ça !") - n.Scheduler.cancel() - }*/ + n.scheduler.Stop() } log.Println("[ÉVÉNEMENT] Changement d'état Raft :", evt)