Refactoring terminé

This commit is contained in:
Mickael BOURNEUF 2025-03-02 13:16:25 +01:00
parent e7ec8897dd
commit 25edbabf04
13 changed files with 197 additions and 284 deletions

View File

@ -3,8 +3,11 @@ package domain
import (
"context"
"encoding/json"
"fmt"
"deevirt.fr/compute/pkg/amqp"
"deevirt.fr/compute/pkg/proto"
"deevirt.fr/compute/pkg/schema"
"libvirt.org/go/libvirt"
)
@ -13,29 +16,36 @@ type EventsDetail map[string]string
func (d *Domain) domainEventLifecycle(nodeId string, domainId string, state int64, event *libvirt.DomainEventLifecycle) {
d.Logger.Sugar().Infof("%s => %s: Evènement %v", nodeId, domainId, event)
/*domStore := schema.Domain{}
domData, err := d.Store.Get(fmt.Sprintf("/etc/libvirt/domain/%s", domainId))
if err != nil || len(domData) == 0 {
d.Logger.Sugar().Errorf("Critique !!, la VM %s n'existe pas ou comporte une erreur importante !", domainId)
}
json.Unmarshal(domData, &domStore)
switch event.Event {
case libvirt.DOMAIN_EVENT_DEFINED:
domStore := schema.Domain{}
domData, err := d.Store.Get(fmt.Sprintf("/domain/%s", domainId))
if err != nil || len(domData) == 0 {
d.Logger.Sugar().Errorf("Critique !!, la VM %s n'existe pas ou comporte une erreur importante !", domainId)
}
json.Unmarshal(domData, &domStore)
domNode := schema.DomainNode{}
domNodeJson, err := d.Store.Get(fmt.Sprintf("/domain/%s/node", domainId))
if err != nil || len(domData) == 0 {
d.Logger.Sugar().Errorf("Critique !!, la VM %s n'existe pas ou comporte une erreur importante !", domainId)
}
json.Unmarshal(domNodeJson, &domNode)
// Changement de noeud !
oldNodeId := strings.Clone(domStore.NodeId)
dom2node, _ := json.Marshal(deevirt_schema.DomainToNode{
State: int(state),
})
d.Store.Set(fmt.Sprintf("/etc/libvirt/domain/qemu/%s/%s", nodeId, domainId), dom2node)
newdomNode := schema.DomainNode{}
newdomNode.NodeId = nodeId
dom2node, _ := json.Marshal(newdomNode)
domStore.NodeId = nodeId
dom2store, _ := json.Marshal(domStore)
d.Store.Set(fmt.Sprintf("/etc/libvirt/domain/%s", domainId), dom2store)
d.Store.Set(fmt.Sprintf("/domain/%s/node", domainId), dom2node)
println(oldNodeId)
domAttachment := schema.DomainAttachment{}
domAttachment.State = int(state)
domAttachmentJson, _ := json.Marshal(domAttachment)
d.Store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeId, domainId), domAttachmentJson)
d.Store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", domNode.NodeId, domainId))
d.Store.Delete(fmt.Sprintf("/etc/libvirt/domain/qemu/%s/%s", oldNodeId, domainId))
return
case libvirt.DOMAIN_EVENT_STARTED:
switch event.Detail {
case int(libvirt.DOMAIN_EVENT_STARTED_MIGRATED):
@ -57,15 +67,15 @@ func (d *Domain) domainEventLifecycle(nodeId string, domainId string, state int6
}
// MAJ de l'état
nodeData, _ := d.Store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeId, domainId))
domNodeData := deevirt_schema.DomainToNode{}
json.Unmarshal(nodeData, &domNodeData)
domNodeData := schema.DomainAttachment{}
domNodeData.State = int(state)
dom2node, _ := json.Marshal(domNodeData)
d.Store.Set(fmt.Sprintf("/etc/libvirt/domain/qemu/%s/%s", nodeId, domainId), dom2node)
d.Store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeId, domainId), dom2node)
// AMQP - On envoi l'évènement brut
desc := schema.DomainXML{}
a, _ := amqp.NewAMQP()
e, _ := json.Marshal(struct {
Type string
State int
@ -74,18 +84,12 @@ func (d *Domain) domainEventLifecycle(nodeId string, domainId string, state int6
State: int(state),
})
desc := schema.Domain{}
err = xml.Unmarshal([]byte(domStore.Config), &desc)
if err != nil {
log.Fatalln(err)
}
a, _ := amqp.NewAMQP()
a.Publisher("vmcenter",
"events."+desc.Metadata.DeevirtInstance.DeevirtCompanyID+
"."+desc.Metadata.DeevirtInstance.DeevirtDatacenterID+
"."+domainId,
e)
defer a.Close()*/
defer a.Close()
}
func (d *Domain) Event(ctx context.Context, req *proto.DomainEventRequest) (*proto.DomainEventResponse, error) {
@ -97,72 +101,5 @@ func (d *Domain) Event(ctx context.Context, req *proto.DomainEventRequest) (*pro
}
}
/*events := Events{
NodeID: req.DomainId,
DomainID: req.DomainId,
Event: e,
}*/
/*var events Events
err := json.Unmarshal(req.Event, &events)
if err != nil {
fmt.Println("Erreur lors du décodage JSON:", err)
}
var edetail EventsDetail
if events.Event.Event == "MIGRATION" {
err = json.Unmarshal([]byte(events.Event.Details), &edetail)
if err != nil {
fmt.Println("Erreur lors du décodage JSON:", err)
}
if edetail["status"] == "setup" {
r, _ := d.Store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", events.NodeID, events.DomainID))
if r != nil {
var j raft.DomainStore
json.Unmarshal(r, &j)
j.Migrate = true
new, _ := json.Marshal(j)
d.Store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", events.NodeID, events.DomainID), new)
} else {
new, _ := json.Marshal(raft.DomainStore{
Config: string(events.Config),
State: events.State,
Migrate: false,
})
d.Store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", events.NodeID, events.DomainID), new)
}
fmt.Printf("%s => %v\n", events.NodeID, edetail)
} else if edetail["status"] == "completed" {
r, _ := d.Store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", events.NodeID, events.DomainID))
if r != nil {
var j raft.DomainStore
json.Unmarshal(r, &j)
if j.Migrate {
d.Store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", events.NodeID, events.DomainID))
}
}
fmt.Printf("%s => %v\n", events.NodeID, edetail)
}
}
// AMQP - On envoi l'évènement brut
/*e, _ := json.Marshal(events.Event)
a, _ := amqp.NewAMQP()
a.Publisher("vmcenter",
"events."+events.CompanyID+
"."+events.DatacenterID+
"."+events.DomainID,
e)
defer a.Close()*/
return &proto.DomainEventResponse{}, nil
}

View File

@ -17,7 +17,6 @@ import (
"deevirt.fr/compute/cmd/mgr/domain"
"deevirt.fr/compute/cmd/mgr/node"
"deevirt.fr/compute/cmd/mgr/worker"
"deevirt.fr/compute/pkg/config"
pb "deevirt.fr/compute/pkg/proto"
@ -77,17 +76,25 @@ func main() {
stateCh := make(chan raft_hashicorp.Observation, 1) // Canal de type raft.Observation
s.Raft.RegisterObserver(raft_hashicorp.NewObserver(stateCh, true, nil))
nodes := &worker.RaftNode{
Bootstrap: false,
Store: s,
NodeID: conf.NodeID,
StateCh: stateCh,
nodes := &RaftNode{
NodeID: conf.NodeID,
Conf: conf,
Store: s,
StateCh: stateCh,
isReady: false,
}
go nodes.WatchStateChanges()
// On temporise 5 secondes, le temps de laisser la reprise des logs
time.Sleep(5 * time.Second)
for {
if nodes.isReady {
break
}
time.Sleep(100 * time.Millisecond)
}
println("On démarre le serveur GRPC")
server := createGRPCServer(conf)
pb.RegisterNodeServer(server, &node.Node{
Config: conf,

View File

@ -1,15 +1,12 @@
package node
import (
"encoding/json"
"fmt"
"io"
"log"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/proto"
"deevirt.fr/compute/pkg/raft"
"deevirt.fr/compute/pkg/schema"
)
type Node struct {
@ -19,8 +16,6 @@ type Node struct {
}
func (n *Node) Alive(stream proto.Node_AliveServer) error {
println("Alive")
for {
req, err := stream.Recv()
if err == io.EOF || err != nil {
@ -28,18 +23,10 @@ func (n *Node) Alive(stream proto.Node_AliveServer) error {
return nil
}
log.Printf("Received heartbeat: %v", req)
cluster := schema.NodeStore{}
println("on reçit une demande")
res, _ := n.Store.Get("/etc/libvirt/cluster")
json.Unmarshal(res, &cluster)
fmt.Printf("%v\n", res)
cluster[n.Config.NodeID].LastUpdate = req.Timestamp
d, _ := json.Marshal(cluster)
n.Store.Set("/etc/libvirt/cluster", d)
log.Printf("Received heartbeat: %s", req.NodeId)
// On envoi les noeuds
res, _ := n.Store.Get("/nodes")
stream.Send(&proto.NodeAliveQemuResponse{
Nodes: res,
})

View File

@ -1,6 +1,7 @@
package worker
package main
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
@ -8,9 +9,11 @@ import (
"time"
raft_hashicorp "github.com/hashicorp/raft"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/types/known/timestamppb"
"libvirt.org/go/libvirt"
"deevirt.fr/compute/cmd/mgr/worker"
"deevirt.fr/compute/pkg/config"
etcd_client "deevirt.fr/compute/pkg/etcd"
"deevirt.fr/compute/pkg/raft"
@ -19,25 +22,29 @@ import (
)
type RaftNode struct {
Bootstrap bool
Conf *config.Config
Store *raft.Store
NodeID string
StateCh chan raft_hashicorp.Observation
NodeID string
Conf *config.Config
Store *raft.Store
StateCh chan raft_hashicorp.Observation
isReady bool
}
func (n *RaftNode) init() {
println("bootstrap :")
nodes := make(schema.NodeStore)
nodes := make(schema.Node)
// Récupération des Noeuds ID
etcd, _ := etcd_client.New(n.Conf.EtcdURI)
defer etcd.Close()
// On supprime toutes les informations dans etcd
ctx := context.Background()
etcd.Delete(ctx, fmt.Sprintf("/deevirt/cluster/%s/", n.Conf.ClusterID), clientv3.WithPrefix())
for key, value := range etcd_client.GetNodes(etcd, n.Conf.ClusterID) {
var libvirt_uri string
nodes[key] = &schema.NodeStoreInfo{
nodes[key] = &schema.NodeConfig{
IpManagement: value.IpManagement,
}
@ -71,7 +78,6 @@ func (n *RaftNode) init() {
// On enregistre le noeud
domainStateStore, _ := json.Marshal(schema.DomainNode{
NodeId: key,
State: int(state),
})
n.Store.Set(fmt.Sprintf("/domain/%s/node", uuid), domainStateStore)
@ -79,21 +85,22 @@ func (n *RaftNode) init() {
currentTime := time.Now()
newTime := currentTime.Add(3600 * time.Second) // On ajoute 3600 secondes pour permettre au moniteur de se synchroniser
DomainLibvirtStore, _ := json.Marshal(schema.DomainLock{
LifeCycle: int(state),
Expiry: timestamppb.New(newTime),
DomainLibvirtStore, _ := json.Marshal(schema.DomainAttachment{
State: int(state),
Expiry: timestamppb.New(newTime),
})
n.Store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", key, uuid), DomainLibvirtStore)
}
}
// On enregistre la configuration des noeuds
jNodes, _ := json.Marshal(nodes)
n.Store.Set("/cluster", jNodes)
n.Store.Set("/nodes", jNodes)
}
// Fonction pour surveiller et afficher les changements d'état
func (n *RaftNode) WatchStateChanges() {
worker, _ := NewWorker(n.Store)
work, _ := worker.NewWorker(n.Store)
for obs := range n.StateCh {
switch evt := obs.Data.(type) {
@ -101,7 +108,8 @@ func (n *RaftNode) WatchStateChanges() {
log.Println("[ÉVÉNEMENT] Changement d'état Raft :", evt)
if evt == raft_hashicorp.Leader {
if n.Bootstrap {
println(n.Store.Raft.LastIndex())
if n.Store.Raft.LastIndex() == 1 {
n.init()
}
@ -110,10 +118,14 @@ func (n *RaftNode) WatchStateChanges() {
if err := barrier.Error(); err != nil {
return
}
n.isReady = true
// On attend 30 secondes avant de démarrer le worker
time.Sleep(30 * time.Second)
log.Println("Démarrage du worker !")
worker.Start()
work.Start()
} else {
worker.Stop()
work.Stop()
}
case raft_hashicorp.LeaderObservation:

View File

@ -2,6 +2,7 @@ package worker
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log"
@ -26,7 +27,7 @@ type Worker struct {
store *raft.Store
config *config.Config
nodes schema.NodeStore
nodes schema.Node
log *zap.SugaredLogger
}
@ -60,9 +61,6 @@ func (w *Worker) Start() {
default:
log.Println("🔄 Contrôle périodique en cours...")
// Contrôler la connexion avec libvirt
w.handleLibvirtControl()
// Gérer les alertes du cluster
if err := w.handleClusterAlerts(); err != nil {
log.Printf("Erreur lors du traitement des alertes du cluster : %v", err)
@ -129,60 +127,6 @@ func (w *Worker) Migrate(wg *sync.WaitGroup, nodeID string, newNodeID string, do
w.store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domainID))*/
}
/*
On controle périodiquement l'accessibilité à libvirt, indépendamment du programme moniteur.
Cette vérification assure un double controle pour la HA.
*/
func (w *Worker) handleLibvirtControl() {
var nodes schema.NodeStore
cluster, err := w.store.Get("/etc/libvirt/cluster")
if err != nil {
w.log.Errorf("Erreur lors de la récupération des données de cluster: %v", err)
return
}
// Désérialisation des données du cluster
err = json.Unmarshal(cluster, &nodes)
if err != nil {
w.log.Errorf("Erreur lors de la désérialisation des données de cluster: %v", err)
return
}
for _, conf := range nodes {
// Créer une connexion à libvirt
c, err := libvirt.New(conf.IpManagement, w.config.LibvirtTLS)
if err != nil {
w.log.Warnf("Impossible de créer la connexion libvirt pour %s: %v", conf.IpManagement, err)
//conf.Alive = false
continue // Passer à l'élément suivant
}
defer c.Close() // Assurer la fermeture de la connexion à la fin
// Vérifier si le noeud est vivant
//alive, err := c.IsAlive()
if err != nil {
w.log.Warnf("Erreur lors de la vérification de la vie de %s: %v", conf.IpManagement, err)
//conf.Alive = false
} else {
//conf.Alive = alive
}
}
// Mise à jour des nodes dans le store
w.nodes = nodes
nodesUpdate, err := json.Marshal(nodes)
if err != nil {
w.log.Errorf("Erreur lors de la sérialisation des données de cluster: %v", err)
return
}
// Sauvegarder les données mises à jour
err = w.store.Set("/etc/libvirt/cluster", nodesUpdate)
if err != nil {
w.log.Errorf("Erreur lors de la mise à jour du store: %v", err)
}
}
func (w *Worker) handleClusterAlerts() error {
sched, err := scheduler.New()
if err != nil {
@ -231,65 +175,72 @@ func (w *Worker) handleNodeAlerts() error {
}
func (w *Worker) handleHAExecution(nodeID string) {
/*if w.nodes[nodeID].Alive {
// L'information n'est pas cohérente, on ne fait rien
w.log.Infof("L'agent moniteur ne répond pas, mais le noeud reste fonctionnel")
return
}
var cluster NodeStore
c, _ := w.store.Get("/etc/libvirt/cluster")
var cluster schema.Node
c, _ := w.store.Get("/nodes")
json.Unmarshal(c, &cluster)
listDoms, _ := w.store.Ls(fmt.Sprintf("/etc/libvirt/qemu/%s", nodeID), LsOptions{
Recursive: false,
Data: true,
})
// On controle l'accessibilité à libvirt
_, err := libvirt.New(cluster[nodeID].IpManagement, w.config.LibvirtTLS)
if err == nil {
w.log.Warnf("Le noeud %s accède toujours à libvirt, on avorte", nodeID)
return
}
s, err := scheduler.New()
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
return
}
res, err := s.GetTopNode(4)
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
return
}
mapSize := len(res)
listDoms, _ := w.store.Ls(fmt.Sprintf("/etc/libvirt/qemu/%s", nodeID), raft.LsOptions{
Recursive: false,
Data: true,
})
for domId := range listDoms {
randomINode := rand.Intn(mapSize)
currentDom, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId))
var cDomStore DomainStore
json.Unmarshal(currentDom, &cDomStore)
c, err := libvirt.New(cluster[res[randomINode].NodeID].IpManagement, w.store.conf.LibvirtTLS)
// On récupère la configuration
currentDom, _ := w.store.Get(fmt.Sprintf("/domain/%s", domId))
println(currentDom)
dom := schema.Domain{}
json.Unmarshal(currentDom, &dom)
currentDomAttachmentJson, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId))
currentDomAttachment := schema.DomainAttachment{}
json.Unmarshal(currentDomAttachmentJson, &currentDomAttachment)
// On se connecte au nouveau noeud
newConn, err := libvirt.New(cluster[res[randomINode].NodeID].IpManagement, w.config.LibvirtTLS)
if err != nil {
w.log.Errorf("Connexion error to libvirt %v", err)
continue
}
newDom, err := c.DomainDefineXML(cDomStore.Config)
config, _ := base64.StdEncoding.DecodeString(dom.Config)
newDom, err := newConn.DomainDefineXML(string(config))
if err != nil {
w.log.Errorf("Error create config %s on the node %s", domId, res[randomINode].NodeID, domId)
continue
} else {
w.store.Set(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", res[randomINode].NodeID, domId), currentDom)
w.store.Delete(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", nodeID, domId))
w.log.Errorf("Oups, impossible de définir la configuration sur le noeud %s, %v", randomINode, err)
return
}
var dStore DomainStore
data, _ := w.store.Get(fmt.Sprintf("/etc/libvirt/qemu/%s/%s", res[randomINode].NodeID, domId))
json.Unmarshal(data, &dStore)
if go_libvirt.DomainState(dStore.State) == go_libvirt.DOMAIN_RUNNING {
err = newDom.Create()
if err != nil {
w.log.Errorf("Error start domain %s on the node %s", domId, res[randomINode].NodeID, domId)
newDom.Undefine()
}
if currentDomAttachment.State == 1 {
err = newDom.Create()
if err != nil {
w.log.Errorf("Error start domain %s on the node %s", domId, res[randomINode].NodeID, domId)
return
}
}
w.log.Infof("HA of %s to %s for domain %s", nodeID, res[randomINode].NodeID, domId)
}*/
//_, err = c.DomainCreateXML(cDomStore.Config, go_libvirt.DOMAIN_START_VALIDATE)
}
}
func (w *Worker) handleCPULoadBalance(nodeID string) {

View File

@ -14,16 +14,16 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
go_libvirt "libvirt.org/go/libvirt"
"deevirt.fr/compute/pkg/api/libvirt"
pb "deevirt.fr/compute/pkg/api/proto"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/libvirt"
pb "deevirt.fr/compute/pkg/proto"
"deevirt.fr/compute/pkg/schema"
)
type qemu struct {
clientVirt *go_libvirt.Connect
config *config.Config
nodes schema.NodeStore
nodes schema.Node
}
func NewQemu(c *go_libvirt.Connect) qemu {
@ -67,6 +67,8 @@ func (q qemu) stonith(ctx context.Context) {
for _, dom := range doms {
dom.Destroy()
}
return
}
}
}
@ -115,7 +117,7 @@ func (q qemu) heartbeat() {
log.Println("🔌 Connexion fermée par le serveur")
break
} else {
nodeStore := schema.NodeStore{}
nodeStore := schema.Node{}
json.Unmarshal(resp.Nodes, &nodeStore)
q.nodes = nodeStore
}
@ -124,6 +126,7 @@ func (q qemu) heartbeat() {
for {
req := &pb.NodeAliveRequest{
NodeId: q.config.NodeID,
Timestamp: timestamppb.New(time.Now()),
}
@ -145,7 +148,7 @@ func (q qemu) heartbeat() {
}
time.Sleep(100 * time.Millisecond)
time.Sleep(1 * time.Second)
}
}

View File

@ -30,7 +30,7 @@ func GetNodes(c *clientv3.Client, cluster_id string) map[string]Node {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, _ := c.Get(ctx, "/cluster/"+cluster_id)
resp, _ := c.Get(ctx, "/deevirt/cluster/"+cluster_id)
var nodes map[string]Node

View File

@ -26,7 +26,8 @@ type NodeAliveRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
NodeId string `protobuf:"bytes,1,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"`
Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
}
func (x *NodeAliveRequest) Reset() {
@ -61,6 +62,13 @@ func (*NodeAliveRequest) Descriptor() ([]byte, []int) {
return file_proto_node_proto_rawDescGZIP(), []int{0}
}
func (x *NodeAliveRequest) GetNodeId() string {
if x != nil {
return x.NodeId
}
return ""
}
func (x *NodeAliveRequest) GetTimestamp() *timestamppb.Timestamp {
if x != nil {
return x.Timestamp
@ -121,21 +129,23 @@ var file_proto_node_proto_rawDesc = []byte{
0x0a, 0x10, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x07, 0x64, 0x65, 0x65, 0x76, 0x69, 0x72, 0x74, 0x1a, 0x1f, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d,
0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4c, 0x0a, 0x10,
0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x65, 0x0a, 0x10,
0x4e, 0x6f, 0x64, 0x65, 0x41, 0x6c, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52,
0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, 0x2d, 0x0a, 0x15, 0x4e, 0x6f,
0x64, 0x65, 0x41, 0x6c, 0x69, 0x76, 0x65, 0x51, 0x65, 0x6d, 0x75, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x6f, 0x64, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0c, 0x52, 0x05, 0x6e, 0x6f, 0x64, 0x65, 0x73, 0x32, 0x50, 0x0a, 0x04, 0x4e, 0x6f, 0x64,
0x65, 0x12, 0x48, 0x0a, 0x05, 0x41, 0x6c, 0x69, 0x76, 0x65, 0x12, 0x19, 0x2e, 0x64, 0x65, 0x65,
0x76, 0x69, 0x72, 0x74, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x6c, 0x69, 0x76, 0x65, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x64, 0x65, 0x65, 0x76, 0x69, 0x72, 0x74, 0x2e,
0x4e, 0x6f, 0x64, 0x65, 0x41, 0x6c, 0x69, 0x76, 0x65, 0x51, 0x65, 0x6d, 0x75, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e,
0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x12, 0x17, 0x0a, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d,
0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67,
0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54,
0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74,
0x61, 0x6d, 0x70, 0x22, 0x2d, 0x0a, 0x15, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x6c, 0x69, 0x76, 0x65,
0x51, 0x65, 0x6d, 0x75, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05,
0x6e, 0x6f, 0x64, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x6e, 0x6f, 0x64,
0x65, 0x73, 0x32, 0x50, 0x0a, 0x04, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x48, 0x0a, 0x05, 0x41, 0x6c,
0x69, 0x76, 0x65, 0x12, 0x19, 0x2e, 0x64, 0x65, 0x65, 0x76, 0x69, 0x72, 0x74, 0x2e, 0x4e, 0x6f,
0x64, 0x65, 0x41, 0x6c, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e,
0x2e, 0x64, 0x65, 0x65, 0x76, 0x69, 0x72, 0x74, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x6c, 0x69,
0x76, 0x65, 0x51, 0x65, 0x6d, 0x75, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00,
0x28, 0x01, 0x30, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@ -10,7 +10,8 @@ service Node {
}
message NodeAliveRequest {
google.protobuf.Timestamp timestamp = 1;
string node_id = 1;
google.protobuf.Timestamp timestamp = 2;
}
message NodeAliveQemuResponse {

View File

@ -5,14 +5,14 @@ import (
"encoding/json"
"fmt"
"io"
"regexp"
"time"
"github.com/hashicorp/raft"
clientv3 "go.etcd.io/etcd/client/v3"
)
type FSM struct {
type FSM Store
/*type FSM struct {
store *Store
client *clientv3.Client
}
@ -30,7 +30,7 @@ func NewFSM(endpoints []string, store *Store) (*FSM, error) {
store: store,
client: client,
}, nil
}
}*/
// Apply applies a Raft log entry to the key-value store.
func (f *FSM) Apply(l *raft.Log) interface{} {
@ -51,18 +51,13 @@ func (f *FSM) Apply(l *raft.Log) interface{} {
}
// On réplique sur etcd si ce n'est pas une reprise des logs et si le noeud est leader
if l.Index > f.store.lastIndex && f.store.Raft.State() == raft.Leader {
regex := regexp.MustCompile(`^/domain`)
match := regex.MatchString(c.Key)
if l.Index > f.lastIndex && f.Raft.State() == raft.Leader {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if match {
switch c.Op {
case "set":
f.client.Put(ctx, fmt.Sprintf("/deevirt/cluster/%s%s", f.store.conf.ClusterID, c.Key), string(c.Value))
case "delete":
f.client.Delete(ctx, fmt.Sprintf("/deevirt/cluster/%s%s", f.store.conf.ClusterID, c.Key))
}
switch c.Op {
case "set":
f.etcd.Put(ctx, fmt.Sprintf("/deevirt/cluster/%s%s", f.conf.ClusterID, c.Key), string(c.Value))
case "delete":
f.etcd.Delete(ctx, fmt.Sprintf("/deevirt/cluster/%s%s", f.conf.ClusterID, c.Key))
}
defer cancel()
}
@ -75,12 +70,12 @@ func (f *FSM) Apply(l *raft.Log) interface{} {
// Snapshot returns a snapshot of the key-value store.
func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
f.store.mu.Lock()
defer f.store.mu.Unlock()
f.mu.Lock()
defer f.mu.Unlock()
// Clone the map.
o := make(map[string][]byte)
for k, v := range f.store.m {
for k, v := range f.m {
o[k] = v
}
@ -97,21 +92,21 @@ func (f *FSM) Restore(rc io.ReadCloser) error {
// Set the state from the snapshot, no lock required according to
// Hashicorp docs.
f.store.m = o
f.m = o
return nil
}
func (f *FSM) applySet(key string, value []byte) interface{} {
f.store.mu.Lock()
defer f.store.mu.Unlock()
f.store.m[key] = value
f.mu.Lock()
defer f.mu.Unlock()
f.m[key] = value
return nil
}
func (f *FSM) applyDelete(key string) interface{} {
f.store.mu.Lock()
defer f.store.mu.Unlock()
delete(f.store.m, key)
f.mu.Lock()
defer f.mu.Unlock()
delete(f.m, key)
return nil
}

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
raftwal "github.com/hashicorp/raft-wal"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -48,6 +49,7 @@ type command struct {
type Store struct {
mu sync.Mutex
conf *config.Config // Configuration générale
etcd *clientv3.Client
m map[string][]byte // The key-value store for the system.
@ -88,8 +90,17 @@ func getTLSCredentials(conf *config.Config) credentials.TransportCredentials {
}
func New(conf *config.Config) *Store {
client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(conf.EtcdURI, ","),
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil
}
return &Store{
conf: conf,
etcd: client,
m: make(map[string][]byte),
logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
}
@ -146,12 +157,12 @@ func (s *Store) Open() (*Store, *transport.Manager, error) {
tm := transport.New(raft.ServerAddress(s.conf.AddressPrivate), dialOption)
fsm, err := NewFSM(strings.Split(s.conf.EtcdURI, ","), s)
/*fsm, err := NewFSM(strings.Split(s.conf.EtcdURI, ","), s)
if err != nil {
log.Fatalf("%v", err)
}
}*/
r, err := raft.NewRaft(c, fsm, logStore, stableStore, fss, tm.Transport())
r, err := raft.NewRaft(c, (*FSM)(s), logStore, stableStore, fss, tm.Transport())
if err != nil {
return nil, nil, fmt.Errorf("raft.NewRaft: %v", err)
}

View File

@ -16,7 +16,6 @@ type Domain struct {
*/
type DomainNode struct {
NodeId string `json:"nodeID"` // NodeID Owner
State int `json:"state"` // Son etat persistant
}
/*
@ -29,9 +28,9 @@ type DomainAgent struct {
/*
/etc/libvirt/{type}/{node_id}/{domain_id}
*/
type DomainLock struct {
LifeCycle int `json:"lifeycle"` // Son etat réel
Expiry *timestamppb.Timestamp `json:"expiry"` // Date d'expiration du verouillage
type DomainAttachment struct {
State int `json:"state"` // Son etat réel
Expiry *timestamppb.Timestamp `json:"expiry"` // Date d'expiration du verouillage
}
// Other

View File

@ -2,9 +2,9 @@ package schema
import "google.golang.org/protobuf/types/known/timestamppb"
type NodeStore map[string]*NodeStoreInfo
type Node map[string]*NodeConfig
type NodeStoreInfo struct {
type NodeConfig struct {
LastUpdate *timestamppb.Timestamp
IpManagement string
}