From 25edbabf04d6410130cfcc093a52926c805299bc Mon Sep 17 00:00:00 2001 From: Mickael BOURNEUF Date: Sun, 2 Mar 2025 13:16:25 +0100 Subject: [PATCH] =?UTF-8?q?Refactoring=20termin=C3=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/mgr/domain/events.go | 129 +++++++-------------------- cmd/mgr/main.go | 23 +++-- cmd/mgr/node/node.go | 19 +--- cmd/mgr/{worker/node.go => raft.go} | 46 ++++++---- cmd/mgr/worker/worker.go | 133 +++++++++------------------- cmd/monitor/events/qemu.go | 13 +-- pkg/etcd/client.go | 2 +- pkg/proto/node.pb.go | 40 +++++---- pkg/proto/node.proto | 3 +- pkg/raft/fsm.go | 45 +++++----- pkg/raft/store.go | 17 +++- pkg/schema/domain.go | 7 +- pkg/schema/node.go | 4 +- 13 files changed, 197 insertions(+), 284 deletions(-) rename cmd/mgr/{worker/node.go => raft.go} (79%) diff --git a/cmd/mgr/domain/events.go b/cmd/mgr/domain/events.go index ce95dbb..0fe2eb6 100644 --- a/cmd/mgr/domain/events.go +++ b/cmd/mgr/domain/events.go @@ -3,8 +3,11 @@ package domain import ( "context" "encoding/json" + "fmt" + "deevirt.fr/compute/pkg/amqp" "deevirt.fr/compute/pkg/proto" + "deevirt.fr/compute/pkg/schema" "libvirt.org/go/libvirt" ) @@ -13,29 +16,36 @@ type EventsDetail map[string]string func (d *Domain) domainEventLifecycle(nodeId string, domainId string, state int64, event *libvirt.DomainEventLifecycle) { d.Logger.Sugar().Infof("%s => %s: Evènement %v", nodeId, domainId, event) - /*domStore := schema.Domain{} - domData, err := d.Store.Get(fmt.Sprintf("/etc/libvirt/domain/%s", domainId)) - if err != nil || len(domData) == 0 { - d.Logger.Sugar().Errorf("Critique !!, la VM %s n'existe pas ou comporte une erreur importante !", domainId) - } - json.Unmarshal(domData, &domStore) - switch event.Event { case libvirt.DOMAIN_EVENT_DEFINED: + domStore := schema.Domain{} + domData, err := d.Store.Get(fmt.Sprintf("/domain/%s", domainId)) + if err != nil || len(domData) == 0 { + d.Logger.Sugar().Errorf("Critique !!, la VM %s n'existe pas ou comporte une erreur importante !", domainId) + } + json.Unmarshal(domData, &domStore) + + domNode := schema.DomainNode{} + domNodeJson, err := d.Store.Get(fmt.Sprintf("/domain/%s/node", domainId)) + if err != nil || len(domData) == 0 { + d.Logger.Sugar().Errorf("Critique !!, la VM %s n'existe pas ou comporte une erreur importante !", domainId) + } + json.Unmarshal(domNodeJson, &domNode) + // Changement de noeud ! - oldNodeId := strings.Clone(domStore.NodeId) - dom2node, _ := json.Marshal(deevirt_schema.DomainToNode{ - State: int(state), - }) - d.Store.Set(fmt.Sprintf("/etc/libvirt/domain/qemu/%s/%s", nodeId, domainId), dom2node) + newdomNode := schema.DomainNode{} + newdomNode.NodeId = nodeId + dom2node, _ := json.Marshal(newdomNode) - domStore.NodeId = nodeId - dom2store, _ := json.Marshal(domStore) - d.Store.Set(fmt.Sprintf("/etc/libvirt/domain/%s", domainId), dom2store) + d.Store.Set(fmt.Sprintf("/domain/%s/node", domainId), dom2node) - println(oldNodeId) + domAttachment := schema.DomainAttachment{} + domAttachment.State = int(state) + domAttachmentJson, _ := json.Marshal(domAttachment) + d.Store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeId, domainId), domAttachmentJson) + d.Store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", domNode.NodeId, domainId)) - d.Store.Delete(fmt.Sprintf("/etc/libvirt/domain/qemu/%s/%s", oldNodeId, domainId)) + return case libvirt.DOMAIN_EVENT_STARTED: switch event.Detail { case int(libvirt.DOMAIN_EVENT_STARTED_MIGRATED): @@ -57,15 +67,15 @@ func (d *Domain) domainEventLifecycle(nodeId string, domainId string, state int6 } // MAJ de l'état - nodeData, _ := d.Store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeId, domainId)) - domNodeData := deevirt_schema.DomainToNode{} - json.Unmarshal(nodeData, &domNodeData) - + domNodeData := schema.DomainAttachment{} domNodeData.State = int(state) dom2node, _ := json.Marshal(domNodeData) - d.Store.Set(fmt.Sprintf("/etc/libvirt/domain/qemu/%s/%s", nodeId, domainId), dom2node) + d.Store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeId, domainId), dom2node) // AMQP - On envoi l'évènement brut + desc := schema.DomainXML{} + a, _ := amqp.NewAMQP() + e, _ := json.Marshal(struct { Type string State int @@ -74,18 +84,12 @@ func (d *Domain) domainEventLifecycle(nodeId string, domainId string, state int6 State: int(state), }) - desc := schema.Domain{} - err = xml.Unmarshal([]byte(domStore.Config), &desc) - if err != nil { - log.Fatalln(err) - } - a, _ := amqp.NewAMQP() a.Publisher("vmcenter", "events."+desc.Metadata.DeevirtInstance.DeevirtCompanyID+ "."+desc.Metadata.DeevirtInstance.DeevirtDatacenterID+ "."+domainId, e) - defer a.Close()*/ + defer a.Close() } func (d *Domain) Event(ctx context.Context, req *proto.DomainEventRequest) (*proto.DomainEventResponse, error) { @@ -97,72 +101,5 @@ func (d *Domain) Event(ctx context.Context, req *proto.DomainEventRequest) (*pro } } - /*events := Events{ - NodeID: req.DomainId, - DomainID: req.DomainId, - Event: e, - }*/ - - /*var events Events - - err := json.Unmarshal(req.Event, &events) - if err != nil { - fmt.Println("Erreur lors du décodage JSON:", err) - } - - var edetail EventsDetail - - if events.Event.Event == "MIGRATION" { - err = json.Unmarshal([]byte(events.Event.Details), &edetail) - if err != nil { - fmt.Println("Erreur lors du décodage JSON:", err) - } - - if edetail["status"] == "setup" { - r, _ := d.Store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", events.NodeID, events.DomainID)) - - if r != nil { - var j raft.DomainStore - json.Unmarshal(r, &j) - j.Migrate = true - - new, _ := json.Marshal(j) - d.Store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", events.NodeID, events.DomainID), new) - } else { - new, _ := json.Marshal(raft.DomainStore{ - Config: string(events.Config), - State: events.State, - Migrate: false, - }) - d.Store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", events.NodeID, events.DomainID), new) - } - - fmt.Printf("%s => %v\n", events.NodeID, edetail) - } else if edetail["status"] == "completed" { - r, _ := d.Store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", events.NodeID, events.DomainID)) - - if r != nil { - var j raft.DomainStore - json.Unmarshal(r, &j) - - if j.Migrate { - d.Store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", events.NodeID, events.DomainID)) - } - } - - fmt.Printf("%s => %v\n", events.NodeID, edetail) - } - } - - // AMQP - On envoi l'évènement brut - /*e, _ := json.Marshal(events.Event) - a, _ := amqp.NewAMQP() - a.Publisher("vmcenter", - "events."+events.CompanyID+ - "."+events.DatacenterID+ - "."+events.DomainID, - e) - defer a.Close()*/ - return &proto.DomainEventResponse{}, nil } diff --git a/cmd/mgr/main.go b/cmd/mgr/main.go index e6601c7..4a257ad 100644 --- a/cmd/mgr/main.go +++ b/cmd/mgr/main.go @@ -17,7 +17,6 @@ import ( "deevirt.fr/compute/cmd/mgr/domain" "deevirt.fr/compute/cmd/mgr/node" - "deevirt.fr/compute/cmd/mgr/worker" "deevirt.fr/compute/pkg/config" pb "deevirt.fr/compute/pkg/proto" @@ -77,17 +76,25 @@ func main() { stateCh := make(chan raft_hashicorp.Observation, 1) // Canal de type raft.Observation s.Raft.RegisterObserver(raft_hashicorp.NewObserver(stateCh, true, nil)) - nodes := &worker.RaftNode{ - Bootstrap: false, - Store: s, - NodeID: conf.NodeID, - StateCh: stateCh, + nodes := &RaftNode{ + NodeID: conf.NodeID, + Conf: conf, + Store: s, + StateCh: stateCh, + isReady: false, } go nodes.WatchStateChanges() - // On temporise 5 secondes, le temps de laisser la reprise des logs - time.Sleep(5 * time.Second) + for { + if nodes.isReady { + break + } + + time.Sleep(100 * time.Millisecond) + } + + println("On démarre le serveur GRPC") server := createGRPCServer(conf) pb.RegisterNodeServer(server, &node.Node{ Config: conf, diff --git a/cmd/mgr/node/node.go b/cmd/mgr/node/node.go index 3f03b1f..5644028 100644 --- a/cmd/mgr/node/node.go +++ b/cmd/mgr/node/node.go @@ -1,15 +1,12 @@ package node import ( - "encoding/json" - "fmt" "io" "log" "deevirt.fr/compute/pkg/config" "deevirt.fr/compute/pkg/proto" "deevirt.fr/compute/pkg/raft" - "deevirt.fr/compute/pkg/schema" ) type Node struct { @@ -19,8 +16,6 @@ type Node struct { } func (n *Node) Alive(stream proto.Node_AliveServer) error { - println("Alive") - for { req, err := stream.Recv() if err == io.EOF || err != nil { @@ -28,18 +23,10 @@ func (n *Node) Alive(stream proto.Node_AliveServer) error { return nil } - log.Printf("Received heartbeat: %v", req) - - cluster := schema.NodeStore{} - println("on reçit une demande") - res, _ := n.Store.Get("/etc/libvirt/cluster") - json.Unmarshal(res, &cluster) - fmt.Printf("%v\n", res) - cluster[n.Config.NodeID].LastUpdate = req.Timestamp - - d, _ := json.Marshal(cluster) - n.Store.Set("/etc/libvirt/cluster", d) + log.Printf("Received heartbeat: %s", req.NodeId) + // On envoi les noeuds + res, _ := n.Store.Get("/nodes") stream.Send(&proto.NodeAliveQemuResponse{ Nodes: res, }) diff --git a/cmd/mgr/worker/node.go b/cmd/mgr/raft.go similarity index 79% rename from cmd/mgr/worker/node.go rename to cmd/mgr/raft.go index 7653bc8..8f4788d 100644 --- a/cmd/mgr/worker/node.go +++ b/cmd/mgr/raft.go @@ -1,6 +1,7 @@ -package worker +package main import ( + "context" "encoding/base64" "encoding/json" "fmt" @@ -8,9 +9,11 @@ import ( "time" raft_hashicorp "github.com/hashicorp/raft" + clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/protobuf/types/known/timestamppb" "libvirt.org/go/libvirt" + "deevirt.fr/compute/cmd/mgr/worker" "deevirt.fr/compute/pkg/config" etcd_client "deevirt.fr/compute/pkg/etcd" "deevirt.fr/compute/pkg/raft" @@ -19,25 +22,29 @@ import ( ) type RaftNode struct { - Bootstrap bool - Conf *config.Config - Store *raft.Store - NodeID string - StateCh chan raft_hashicorp.Observation + NodeID string + Conf *config.Config + Store *raft.Store + StateCh chan raft_hashicorp.Observation + isReady bool } func (n *RaftNode) init() { println("bootstrap :") - nodes := make(schema.NodeStore) + nodes := make(schema.Node) // Récupération des Noeuds ID etcd, _ := etcd_client.New(n.Conf.EtcdURI) defer etcd.Close() + // On supprime toutes les informations dans etcd + ctx := context.Background() + etcd.Delete(ctx, fmt.Sprintf("/deevirt/cluster/%s/", n.Conf.ClusterID), clientv3.WithPrefix()) + for key, value := range etcd_client.GetNodes(etcd, n.Conf.ClusterID) { var libvirt_uri string - nodes[key] = &schema.NodeStoreInfo{ + nodes[key] = &schema.NodeConfig{ IpManagement: value.IpManagement, } @@ -71,7 +78,6 @@ func (n *RaftNode) init() { // On enregistre le noeud domainStateStore, _ := json.Marshal(schema.DomainNode{ NodeId: key, - State: int(state), }) n.Store.Set(fmt.Sprintf("/domain/%s/node", uuid), domainStateStore) @@ -79,21 +85,22 @@ func (n *RaftNode) init() { currentTime := time.Now() newTime := currentTime.Add(3600 * time.Second) // On ajoute 3600 secondes pour permettre au moniteur de se synchroniser - DomainLibvirtStore, _ := json.Marshal(schema.DomainLock{ - LifeCycle: int(state), - Expiry: timestamppb.New(newTime), + DomainLibvirtStore, _ := json.Marshal(schema.DomainAttachment{ + State: int(state), + Expiry: timestamppb.New(newTime), }) n.Store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", key, uuid), DomainLibvirtStore) } } + // On enregistre la configuration des noeuds jNodes, _ := json.Marshal(nodes) - n.Store.Set("/cluster", jNodes) + n.Store.Set("/nodes", jNodes) } // Fonction pour surveiller et afficher les changements d'état func (n *RaftNode) WatchStateChanges() { - worker, _ := NewWorker(n.Store) + work, _ := worker.NewWorker(n.Store) for obs := range n.StateCh { switch evt := obs.Data.(type) { @@ -101,7 +108,8 @@ func (n *RaftNode) WatchStateChanges() { log.Println("[ÉVÉNEMENT] Changement d'état Raft :", evt) if evt == raft_hashicorp.Leader { - if n.Bootstrap { + println(n.Store.Raft.LastIndex()) + if n.Store.Raft.LastIndex() == 1 { n.init() } @@ -110,10 +118,14 @@ func (n *RaftNode) WatchStateChanges() { if err := barrier.Error(); err != nil { return } + n.isReady = true + + // On attend 30 secondes avant de démarrer le worker + time.Sleep(30 * time.Second) log.Println("Démarrage du worker !") - worker.Start() + work.Start() } else { - worker.Stop() + work.Stop() } case raft_hashicorp.LeaderObservation: diff --git a/cmd/mgr/worker/worker.go b/cmd/mgr/worker/worker.go index 685ca98..fcaef4a 100644 --- a/cmd/mgr/worker/worker.go +++ b/cmd/mgr/worker/worker.go @@ -2,6 +2,7 @@ package worker import ( "context" + "encoding/base64" "encoding/json" "fmt" "log" @@ -26,7 +27,7 @@ type Worker struct { store *raft.Store config *config.Config - nodes schema.NodeStore + nodes schema.Node log *zap.SugaredLogger } @@ -60,9 +61,6 @@ func (w *Worker) Start() { default: log.Println("🔄 Contrôle périodique en cours...") - // Contrôler la connexion avec libvirt - w.handleLibvirtControl() - // Gérer les alertes du cluster if err := w.handleClusterAlerts(); err != nil { log.Printf("Erreur lors du traitement des alertes du cluster : %v", err) @@ -129,60 +127,6 @@ func (w *Worker) Migrate(wg *sync.WaitGroup, nodeID string, newNodeID string, do w.store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domainID))*/ } -/* -On controle périodiquement l'accessibilité à libvirt, indépendamment du programme moniteur. -Cette vérification assure un double controle pour la HA. -*/ -func (w *Worker) handleLibvirtControl() { - var nodes schema.NodeStore - cluster, err := w.store.Get("/etc/libvirt/cluster") - if err != nil { - w.log.Errorf("Erreur lors de la récupération des données de cluster: %v", err) - return - } - - // Désérialisation des données du cluster - err = json.Unmarshal(cluster, &nodes) - if err != nil { - w.log.Errorf("Erreur lors de la désérialisation des données de cluster: %v", err) - return - } - - for _, conf := range nodes { - // Créer une connexion à libvirt - c, err := libvirt.New(conf.IpManagement, w.config.LibvirtTLS) - if err != nil { - w.log.Warnf("Impossible de créer la connexion libvirt pour %s: %v", conf.IpManagement, err) - //conf.Alive = false - continue // Passer à l'élément suivant - } - defer c.Close() // Assurer la fermeture de la connexion à la fin - - // Vérifier si le noeud est vivant - //alive, err := c.IsAlive() - if err != nil { - w.log.Warnf("Erreur lors de la vérification de la vie de %s: %v", conf.IpManagement, err) - //conf.Alive = false - } else { - //conf.Alive = alive - } - } - - // Mise à jour des nodes dans le store - w.nodes = nodes - nodesUpdate, err := json.Marshal(nodes) - if err != nil { - w.log.Errorf("Erreur lors de la sérialisation des données de cluster: %v", err) - return - } - - // Sauvegarder les données mises à jour - err = w.store.Set("/etc/libvirt/cluster", nodesUpdate) - if err != nil { - w.log.Errorf("Erreur lors de la mise à jour du store: %v", err) - } -} - func (w *Worker) handleClusterAlerts() error { sched, err := scheduler.New() if err != nil { @@ -231,65 +175,72 @@ func (w *Worker) handleNodeAlerts() error { } func (w *Worker) handleHAExecution(nodeID string) { - /*if w.nodes[nodeID].Alive { - // L'information n'est pas cohérente, on ne fait rien - w.log.Infof("L'agent moniteur ne répond pas, mais le noeud reste fonctionnel") - return - } - var cluster NodeStore - c, _ := w.store.Get("/etc/libvirt/cluster") + var cluster schema.Node + c, _ := w.store.Get("/nodes") json.Unmarshal(c, &cluster) - listDoms, _ := w.store.Ls(fmt.Sprintf("/etc/libvirt/qemu/%s", nodeID), LsOptions{ - Recursive: false, - Data: true, - }) + // On controle l'accessibilité à libvirt + _, err := libvirt.New(cluster[nodeID].IpManagement, w.config.LibvirtTLS) + if err == nil { + w.log.Warnf("Le noeud %s accède toujours à libvirt, on avorte", nodeID) + return + } s, err := scheduler.New() if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) + return } res, err := s.GetTopNode(4) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) + return } mapSize := len(res) + listDoms, _ := w.store.Ls(fmt.Sprintf("/etc/libvirt/qemu/%s", nodeID), raft.LsOptions{ + Recursive: false, + Data: true, + }) + for domId := range listDoms { randomINode := rand.Intn(mapSize) - currentDom, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId)) - var cDomStore DomainStore - json.Unmarshal(currentDom, &cDomStore) - c, err := libvirt.New(cluster[res[randomINode].NodeID].IpManagement, w.store.conf.LibvirtTLS) + + // On récupère la configuration + currentDom, _ := w.store.Get(fmt.Sprintf("/domain/%s", domId)) + println(currentDom) + dom := schema.Domain{} + json.Unmarshal(currentDom, &dom) + + currentDomAttachmentJson, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId)) + currentDomAttachment := schema.DomainAttachment{} + json.Unmarshal(currentDomAttachmentJson, ¤tDomAttachment) + + // On se connecte au nouveau noeud + newConn, err := libvirt.New(cluster[res[randomINode].NodeID].IpManagement, w.config.LibvirtTLS) if err != nil { w.log.Errorf("Connexion error to libvirt %v", err) continue } - newDom, err := c.DomainDefineXML(cDomStore.Config) + + config, _ := base64.StdEncoding.DecodeString(dom.Config) + newDom, err := newConn.DomainDefineXML(string(config)) if err != nil { - w.log.Errorf("Error create config %s on the node %s", domId, res[randomINode].NodeID, domId) - continue - } else { - w.store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", res[randomINode].NodeID, domId), currentDom) - w.store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId)) + w.log.Errorf("Oups, impossible de définir la configuration sur le noeud %s, %v", randomINode, err) + return + } - var dStore DomainStore - data, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", res[randomINode].NodeID, domId)) - json.Unmarshal(data, &dStore) - - if go_libvirt.DomainState(dStore.State) == go_libvirt.DOMAIN_RUNNING { - err = newDom.Create() - if err != nil { - w.log.Errorf("Error start domain %s on the node %s", domId, res[randomINode].NodeID, domId) - newDom.Undefine() - } + if currentDomAttachment.State == 1 { + err = newDom.Create() + if err != nil { + w.log.Errorf("Error start domain %s on the node %s", domId, res[randomINode].NodeID, domId) + return } } w.log.Infof("HA of %s to %s for domain %s", nodeID, res[randomINode].NodeID, domId) - }*/ - //_, err = c.DomainCreateXML(cDomStore.Config, go_libvirt.DOMAIN_START_VALIDATE) + } } func (w *Worker) handleCPULoadBalance(nodeID string) { diff --git a/cmd/monitor/events/qemu.go b/cmd/monitor/events/qemu.go index d0cd736..4961383 100644 --- a/cmd/monitor/events/qemu.go +++ b/cmd/monitor/events/qemu.go @@ -14,16 +14,16 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" go_libvirt "libvirt.org/go/libvirt" - "deevirt.fr/compute/pkg/api/libvirt" - pb "deevirt.fr/compute/pkg/api/proto" "deevirt.fr/compute/pkg/config" + "deevirt.fr/compute/pkg/libvirt" + pb "deevirt.fr/compute/pkg/proto" "deevirt.fr/compute/pkg/schema" ) type qemu struct { clientVirt *go_libvirt.Connect config *config.Config - nodes schema.NodeStore + nodes schema.Node } func NewQemu(c *go_libvirt.Connect) qemu { @@ -67,6 +67,8 @@ func (q qemu) stonith(ctx context.Context) { for _, dom := range doms { dom.Destroy() } + + return } } } @@ -115,7 +117,7 @@ func (q qemu) heartbeat() { log.Println("🔌 Connexion fermée par le serveur") break } else { - nodeStore := schema.NodeStore{} + nodeStore := schema.Node{} json.Unmarshal(resp.Nodes, &nodeStore) q.nodes = nodeStore } @@ -124,6 +126,7 @@ func (q qemu) heartbeat() { for { req := &pb.NodeAliveRequest{ + NodeId: q.config.NodeID, Timestamp: timestamppb.New(time.Now()), } @@ -145,7 +148,7 @@ func (q qemu) heartbeat() { } - time.Sleep(100 * time.Millisecond) + time.Sleep(1 * time.Second) } } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index bb1e2c0..4d2e7ec 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -30,7 +30,7 @@ func GetNodes(c *clientv3.Client, cluster_id string) map[string]Node { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - resp, _ := c.Get(ctx, "/cluster/"+cluster_id) + resp, _ := c.Get(ctx, "/deevirt/cluster/"+cluster_id) var nodes map[string]Node diff --git a/pkg/proto/node.pb.go b/pkg/proto/node.pb.go index eea0a77..c26736a 100644 --- a/pkg/proto/node.pb.go +++ b/pkg/proto/node.pb.go @@ -26,7 +26,8 @@ type NodeAliveRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + NodeId string `protobuf:"bytes,1,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"` + Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` } func (x *NodeAliveRequest) Reset() { @@ -61,6 +62,13 @@ func (*NodeAliveRequest) Descriptor() ([]byte, []int) { return file_proto_node_proto_rawDescGZIP(), []int{0} } +func (x *NodeAliveRequest) GetNodeId() string { + if x != nil { + return x.NodeId + } + return "" +} + func (x *NodeAliveRequest) GetTimestamp() *timestamppb.Timestamp { if x != nil { return x.Timestamp @@ -121,21 +129,23 @@ var file_proto_node_proto_rawDesc = []byte{ 0x0a, 0x10, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x64, 0x65, 0x65, 0x76, 0x69, 0x72, 0x74, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, - 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4c, 0x0a, 0x10, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x65, 0x0a, 0x10, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x6c, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, - 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, 0x2d, 0x0a, 0x15, 0x4e, 0x6f, - 0x64, 0x65, 0x41, 0x6c, 0x69, 0x76, 0x65, 0x51, 0x65, 0x6d, 0x75, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x6f, 0x64, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x05, 0x6e, 0x6f, 0x64, 0x65, 0x73, 0x32, 0x50, 0x0a, 0x04, 0x4e, 0x6f, 0x64, - 0x65, 0x12, 0x48, 0x0a, 0x05, 0x41, 0x6c, 0x69, 0x76, 0x65, 0x12, 0x19, 0x2e, 0x64, 0x65, 0x65, - 0x76, 0x69, 0x72, 0x74, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x6c, 0x69, 0x76, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x64, 0x65, 0x65, 0x76, 0x69, 0x72, 0x74, 0x2e, - 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x6c, 0x69, 0x76, 0x65, 0x51, 0x65, 0x6d, 0x75, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x22, 0x2d, 0x0a, 0x15, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x6c, 0x69, 0x76, 0x65, + 0x51, 0x65, 0x6d, 0x75, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, + 0x6e, 0x6f, 0x64, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x6e, 0x6f, 0x64, + 0x65, 0x73, 0x32, 0x50, 0x0a, 0x04, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x48, 0x0a, 0x05, 0x41, 0x6c, + 0x69, 0x76, 0x65, 0x12, 0x19, 0x2e, 0x64, 0x65, 0x65, 0x76, 0x69, 0x72, 0x74, 0x2e, 0x4e, 0x6f, + 0x64, 0x65, 0x41, 0x6c, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, + 0x2e, 0x64, 0x65, 0x65, 0x76, 0x69, 0x72, 0x74, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x6c, 0x69, + 0x76, 0x65, 0x51, 0x65, 0x6d, 0x75, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x28, 0x01, 0x30, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/proto/node.proto b/pkg/proto/node.proto index 06555d2..36f7529 100644 --- a/pkg/proto/node.proto +++ b/pkg/proto/node.proto @@ -10,7 +10,8 @@ service Node { } message NodeAliveRequest { - google.protobuf.Timestamp timestamp = 1; + string node_id = 1; + google.protobuf.Timestamp timestamp = 2; } message NodeAliveQemuResponse { diff --git a/pkg/raft/fsm.go b/pkg/raft/fsm.go index c7718f4..0f7de25 100644 --- a/pkg/raft/fsm.go +++ b/pkg/raft/fsm.go @@ -5,14 +5,14 @@ import ( "encoding/json" "fmt" "io" - "regexp" "time" "github.com/hashicorp/raft" - clientv3 "go.etcd.io/etcd/client/v3" ) -type FSM struct { +type FSM Store + +/*type FSM struct { store *Store client *clientv3.Client } @@ -30,7 +30,7 @@ func NewFSM(endpoints []string, store *Store) (*FSM, error) { store: store, client: client, }, nil -} +}*/ // Apply applies a Raft log entry to the key-value store. func (f *FSM) Apply(l *raft.Log) interface{} { @@ -51,18 +51,13 @@ func (f *FSM) Apply(l *raft.Log) interface{} { } // On réplique sur etcd si ce n'est pas une reprise des logs et si le noeud est leader - if l.Index > f.store.lastIndex && f.store.Raft.State() == raft.Leader { - regex := regexp.MustCompile(`^/domain`) - match := regex.MatchString(c.Key) - + if l.Index > f.lastIndex && f.Raft.State() == raft.Leader { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - if match { - switch c.Op { - case "set": - f.client.Put(ctx, fmt.Sprintf("/deevirt/cluster/%s%s", f.store.conf.ClusterID, c.Key), string(c.Value)) - case "delete": - f.client.Delete(ctx, fmt.Sprintf("/deevirt/cluster/%s%s", f.store.conf.ClusterID, c.Key)) - } + switch c.Op { + case "set": + f.etcd.Put(ctx, fmt.Sprintf("/deevirt/cluster/%s%s", f.conf.ClusterID, c.Key), string(c.Value)) + case "delete": + f.etcd.Delete(ctx, fmt.Sprintf("/deevirt/cluster/%s%s", f.conf.ClusterID, c.Key)) } defer cancel() } @@ -75,12 +70,12 @@ func (f *FSM) Apply(l *raft.Log) interface{} { // Snapshot returns a snapshot of the key-value store. func (f *FSM) Snapshot() (raft.FSMSnapshot, error) { - f.store.mu.Lock() - defer f.store.mu.Unlock() + f.mu.Lock() + defer f.mu.Unlock() // Clone the map. o := make(map[string][]byte) - for k, v := range f.store.m { + for k, v := range f.m { o[k] = v } @@ -97,21 +92,21 @@ func (f *FSM) Restore(rc io.ReadCloser) error { // Set the state from the snapshot, no lock required according to // Hashicorp docs. - f.store.m = o + f.m = o return nil } func (f *FSM) applySet(key string, value []byte) interface{} { - f.store.mu.Lock() - defer f.store.mu.Unlock() - f.store.m[key] = value + f.mu.Lock() + defer f.mu.Unlock() + f.m[key] = value return nil } func (f *FSM) applyDelete(key string) interface{} { - f.store.mu.Lock() - defer f.store.mu.Unlock() - delete(f.store.m, key) + f.mu.Lock() + defer f.mu.Unlock() + delete(f.m, key) return nil } diff --git a/pkg/raft/store.go b/pkg/raft/store.go index 278338c..f208986 100644 --- a/pkg/raft/store.go +++ b/pkg/raft/store.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb/v2" raftwal "github.com/hashicorp/raft-wal" + clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -48,6 +49,7 @@ type command struct { type Store struct { mu sync.Mutex conf *config.Config // Configuration générale + etcd *clientv3.Client m map[string][]byte // The key-value store for the system. @@ -88,8 +90,17 @@ func getTLSCredentials(conf *config.Config) credentials.TransportCredentials { } func New(conf *config.Config) *Store { + client, err := clientv3.New(clientv3.Config{ + Endpoints: strings.Split(conf.EtcdURI, ","), + DialTimeout: 5 * time.Second, + }) + if err != nil { + return nil + } + return &Store{ conf: conf, + etcd: client, m: make(map[string][]byte), logger: log.New(os.Stderr, "[store] ", log.LstdFlags), } @@ -146,12 +157,12 @@ func (s *Store) Open() (*Store, *transport.Manager, error) { tm := transport.New(raft.ServerAddress(s.conf.AddressPrivate), dialOption) - fsm, err := NewFSM(strings.Split(s.conf.EtcdURI, ","), s) + /*fsm, err := NewFSM(strings.Split(s.conf.EtcdURI, ","), s) if err != nil { log.Fatalf("%v", err) - } + }*/ - r, err := raft.NewRaft(c, fsm, logStore, stableStore, fss, tm.Transport()) + r, err := raft.NewRaft(c, (*FSM)(s), logStore, stableStore, fss, tm.Transport()) if err != nil { return nil, nil, fmt.Errorf("raft.NewRaft: %v", err) } diff --git a/pkg/schema/domain.go b/pkg/schema/domain.go index 1f7757f..27a6cba 100644 --- a/pkg/schema/domain.go +++ b/pkg/schema/domain.go @@ -16,7 +16,6 @@ type Domain struct { */ type DomainNode struct { NodeId string `json:"nodeID"` // NodeID Owner - State int `json:"state"` // Son etat persistant } /* @@ -29,9 +28,9 @@ type DomainAgent struct { /* /etc/libvirt/{type}/{node_id}/{domain_id} */ -type DomainLock struct { - LifeCycle int `json:"lifeycle"` // Son etat réel - Expiry *timestamppb.Timestamp `json:"expiry"` // Date d'expiration du verouillage +type DomainAttachment struct { + State int `json:"state"` // Son etat réel + Expiry *timestamppb.Timestamp `json:"expiry"` // Date d'expiration du verouillage } // Other diff --git a/pkg/schema/node.go b/pkg/schema/node.go index 469191e..d8bb805 100644 --- a/pkg/schema/node.go +++ b/pkg/schema/node.go @@ -2,9 +2,9 @@ package schema import "google.golang.org/protobuf/types/known/timestamppb" -type NodeStore map[string]*NodeStoreInfo +type Node map[string]*NodeConfig -type NodeStoreInfo struct { +type NodeConfig struct { LastUpdate *timestamppb.Timestamp IpManagement string }