From d5722d9c8bec725f1c1e7b5f69b890bc6a2b24ed Mon Sep 17 00:00:00 2001 From: Mickael BOURNEUF Date: Mon, 17 Feb 2025 21:21:24 +0100 Subject: [PATCH] =?UTF-8?q?Impl=C3=A9mentation=20d'une=20base=20de=20donn?= =?UTF-8?q?=C3=A9es=20de=20type,=20cl=C3=A9=3D>valeur?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/api/domain.go | 39 ++++++---- pkg/api/server.go | 14 ++-- pkg/raft/fsm.go | 98 ++++++++++++++++++++++++ pkg/raft/{node.go => store.go} | 134 ++++++++++++++++++++++++++------- 4 files changed, 236 insertions(+), 49 deletions(-) create mode 100644 pkg/raft/fsm.go rename pkg/raft/{node.go => store.go} (69%) diff --git a/pkg/api/domain.go b/pkg/api/domain.go index 047d25d..5727b03 100644 --- a/pkg/api/domain.go +++ b/pkg/api/domain.go @@ -2,22 +2,30 @@ package api import ( "context" - "regexp" - "strconv" - "time" "deevirt.fr/compute/pkg/api/proto" "deevirt.fr/compute/pkg/config" - clientv3 "go.etcd.io/etcd/client/v3" + "deevirt.fr/compute/pkg/raft" ) type Domain struct { Config *config.Config - Etcd *clientv3.Client + Store *raft.Store proto.UnimplementedDomainServer } func (d *Domain) List(ctx context.Context, in *proto.DomainListAllRequest) (*proto.DomainListAllResponse, error) { + test, _ := d.Store.Get("test") + print(test) + return &proto.DomainListAllResponse{}, nil +} + +func (d *Domain) Get(ctx context.Context, in *proto.DomainListRequest) (*proto.DomainListResponse, error) { + d.Store.Set("test", "test") + return &proto.DomainListResponse{}, nil +} + +/*func (d *Domain) List(ctx context.Context, in *proto.DomainListAllRequest) (*proto.DomainListAllResponse, error) { var domains = []*proto.DomainListResponse{} ctx_etcd, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -77,17 +85,18 @@ func (d *Domain) Create(ctx context.Context, in *proto.DomainCreateRequest) (*pr } defer conn.Close()*/ - /* +/* - async def Create(self, request, context): - yield domain_pb2.DomainCreateResponse(progress=40) - async with Libvirt() as libvirt: - if await libvirt.define(request.config.decode()): - yield domain_pb2.DomainCreateResponse(progress=100) - else: - context.set_code(grpc.StatusCode.ALREADY_EXISTS) + async def Create(self, request, context): + yield domain_pb2.DomainCreateResponse(progress=40) + async with Libvirt() as libvirt: + if await libvirt.define(request.config.decode()): + yield domain_pb2.DomainCreateResponse(progress=100) + else: + context.set_code(grpc.StatusCode.ALREADY_EXISTS) - */ +*/ - return &proto.DomainCreateResponse{}, nil +/*return &proto.DomainCreateResponse{}, nil } +*/ diff --git a/pkg/api/server.go b/pkg/api/server.go index 4e5b4cb..0ad6586 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -1,7 +1,6 @@ package api import ( - "context" "crypto/tls" "crypto/x509" "fmt" @@ -47,7 +46,7 @@ func createGRPCServer(conf *config.Config) *grpc.Server { } func Server() { - ctx := context.Background() + //ctx := context.Background() // Récupération de la configuration deevirt conf, err := config.New() @@ -60,16 +59,21 @@ func Server() { log.Fatalf("failed to listen: %v", err) } - r, tm, err := raft.New(ctx, conf, 4480) + r := raft.New(conf, 4480) + + tm, err := r.Open() if err != nil { log.Fatalf("failed to start raft: %v", err) } s := createGRPCServer(conf) - pb.RegisterDomainServer(s, nil) + pb.RegisterDomainServer(s, &Domain{ + Config: conf, + Store: r, + }) tm.Register(s) //leaderhealth.Setup(r, s, []string{"Example"}) - raft.Register(s, r) + raft.Register(s, r.Raft) reflection.Register(s) if err := s.Serve(sock); err != nil { log.Fatalf("failed to serve: %v", err) diff --git a/pkg/raft/fsm.go b/pkg/raft/fsm.go new file mode 100644 index 0000000..9aeaf59 --- /dev/null +++ b/pkg/raft/fsm.go @@ -0,0 +1,98 @@ +package raft + +import ( + "encoding/json" + "fmt" + "io" + + "github.com/hashicorp/raft" +) + +type Fsm Store + +// Apply applies a Raft log entry to the key-value store. +func (f *Fsm) Apply(l *raft.Log) interface{} { + var c command + if err := json.Unmarshal(l.Data, &c); err != nil { + panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error())) + } + + switch c.Op { + case "set": + return f.applySet(c.Key, c.Value) + case "delete": + return f.applyDelete(c.Key) + default: + panic(fmt.Sprintf("unrecognized command op: %s", c.Op)) + } +} + +// Snapshot returns a snapshot of the key-value store. +func (f *Fsm) Snapshot() (raft.FSMSnapshot, error) { + f.mu.Lock() + defer f.mu.Unlock() + + // Clone the map. + o := make(map[string]string) + for k, v := range f.m { + o[k] = v + } + return &fsmSnapshot{store: o}, nil +} + +// Restore stores the key-value store to a previous state. +func (f *Fsm) Restore(rc io.ReadCloser) error { + o := make(map[string]string) + if err := json.NewDecoder(rc).Decode(&o); err != nil { + return err + } + + // Set the state from the snapshot, no lock required according to + // Hashicorp docs. + f.m = o + return nil +} + +func (f *Fsm) applySet(key, value string) interface{} { + f.mu.Lock() + defer f.mu.Unlock() + f.m[key] = value + return nil +} + +func (f *Fsm) applyDelete(key string) interface{} { + f.mu.Lock() + defer f.mu.Unlock() + delete(f.m, key) + return nil +} + +type fsmSnapshot struct { + store map[string]string +} + +func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error { + err := func() error { + // Encode data. + b, err := json.Marshal(f.store) + if err != nil { + return err + } + + // Write data to sink. + if _, err := sink.Write(b); err != nil { + return err + } + + // Close the sink. + return sink.Close() + }() + + if err != nil { + sink.Cancel() + } + + return err +} + +func (f *fsmSnapshot) Release() {} diff --git a/pkg/raft/node.go b/pkg/raft/store.go similarity index 69% rename from pkg/raft/node.go rename to pkg/raft/store.go index 62aea9f..631c27f 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/store.go @@ -1,13 +1,15 @@ package raft import ( - "context" "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "log" "os" "path/filepath" + "sync" + "time" transport "deevirt.fr/compute/pkg/raft/transport" "github.com/hashicorp/raft" @@ -56,27 +58,59 @@ func getTLSCredentials(conf *config.Config) credentials.TransportCredentials { return creds } -func New(ctx context.Context, conf *config.Config, port int) (*raft.Raft, *transport.Manager, error) { +const ( + retainSnapshotCount = 2 + raftTimeout = 10 * time.Second +) + +type command struct { + Op string `json:"op,omitempty"` + Key string `json:"key,omitempty"` + Value string `json:"value,omitempty"` +} + +type Store struct { + mu sync.Mutex + conf *config.Config // Configuration générale + port int // Port de communication (identique au serveur GRPC) + + m map[string]string // The key-value store for the system. + + Raft *raft.Raft // The consensus mechanism + + logger *log.Logger +} + +func New(conf *config.Config, port int) *Store { + return &Store{ + conf: conf, + port: port, + m: make(map[string]string), + logger: log.New(os.Stderr, "[store] ", log.LstdFlags), + } +} + +func (s *Store) Open() (*transport.Manager, error) { // Création du répertoire - baseDir := filepath.Join("/var/lib/deevirt/mgr/", conf.NodeID) + baseDir := filepath.Join("/var/lib/deevirt/mgr/", s.conf.NodeID) err := os.MkdirAll(baseDir, 0740) if err != nil { - return nil, nil, err + return nil, err } // Récupération des Noeuds ID - etcd, err := etcd_client.New(conf.EtcdURI) + etcd, err := etcd_client.New(s.conf.EtcdURI) if err != nil { - return nil, nil, err + return nil, err } defer etcd.Close() peers := []raft.Server{} - for key, value := range etcd_client.GetNodes(etcd, conf.ClusterID) { + for key, value := range etcd_client.GetNodes(etcd, s.conf.ClusterID) { var p string - for _, peer := range conf.Manager.Peers { + for _, peer := range s.conf.Manager.Peers { if peer == value.IpManagement { p = peer } @@ -85,45 +119,41 @@ func New(ctx context.Context, conf *config.Config, port int) (*raft.Raft, *trans if p != "" { peers = append(peers, raft.Server{ ID: raft.ServerID(key), - Address: raft.ServerAddress(fmt.Sprintf("%s:%d", p, port)), + Address: raft.ServerAddress(fmt.Sprintf("%s:%d", p, s.port)), }) } } c := raft.DefaultConfig() - c.LocalID = raft.ServerID(conf.NodeID) + c.LocalID = raft.ServerID(s.conf.NodeID) ldb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "logs.dat")) if err != nil { - return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err) + return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err) } - /*sdb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "stable.dat")) - if err != nil { - return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err) - }*/ - fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr) if err != nil { - return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err) + return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err) } dialOption := []grpc.DialOption{} - if conf.Manager.TlsKey != "" { - dialOption = append(dialOption, grpc.WithTransportCredentials(getTLSCredentials(conf))) + if s.conf.Manager.TlsKey != "" { + dialOption = append(dialOption, grpc.WithTransportCredentials(getTLSCredentials(s.conf))) } - tm := transport.New(raft.ServerAddress(fmt.Sprintf("%s:%d", conf.AddressPrivate, port)), dialOption) + tm := transport.New(raft.ServerAddress(fmt.Sprintf("%s:%d", s.conf.AddressPrivate, s.port)), dialOption) - r, err := raft.NewRaft(c, nil, ldb, ldb, fss, tm.Transport()) + r, err := raft.NewRaft(c, (*Fsm)(s), ldb, ldb, fss, tm.Transport()) if err != nil { - return nil, nil, fmt.Errorf("raft.NewRaft: %v", err) + return nil, fmt.Errorf("raft.NewRaft: %v", err) } + s.Raft = r - s, err := scheduler.New() + sched, err := scheduler.New() if err != nil { - return nil, nil, fmt.Errorf("scheduler: %v", err) + return nil, fmt.Errorf("scheduler: %v", err) } // Observer pour surveiller les changements d'état @@ -132,16 +162,16 @@ func New(ctx context.Context, conf *config.Config, port int) (*raft.Raft, *trans node := &RaftNode{ Raft: r, - NodeID: conf.NodeID, + NodeID: s.conf.NodeID, StateCh: stateCh, - scheduler: s, + scheduler: sched, } go node.watchStateChanges() hasState, _ := checkIfStateExists(ldb) - if conf.Manager.Peers[0] == conf.AddressPrivate && !hasState { + if s.conf.Manager.Peers[0] == s.conf.AddressPrivate && !hasState { println("Démarrage du bootstrap ! ") cfg := raft.Configuration{ @@ -149,11 +179,57 @@ func New(ctx context.Context, conf *config.Config, port int) (*raft.Raft, *trans } f := r.BootstrapCluster(cfg) if err := f.Error(); err != nil { - return nil, nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) + return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) } } - return r, tm, nil + return tm, nil +} + +// Get returns the value for the given key. +func (s *Store) Get(key string) (string, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.m[key], nil +} + +// Set sets the value for the given key. +func (s *Store) Set(key, value string) error { + if s.Raft.State() != raft.Leader { + return fmt.Errorf("not leader") + } + + c := &command{ + Op: "set", + Key: key, + Value: value, + } + b, err := json.Marshal(c) + if err != nil { + return err + } + + f := s.Raft.Apply(b, raftTimeout) + return f.Error() +} + +// Delete deletes the given key. +func (s *Store) Delete(key string) error { + if s.Raft.State() != raft.Leader { + return fmt.Errorf("not leader") + } + + c := &command{ + Op: "delete", + Key: key, + } + b, err := json.Marshal(c) + if err != nil { + return err + } + + f := s.Raft.Apply(b, raftTimeout) + return f.Error() } // Vérifie si l'état Raft existe déjà