Implémentation d'une base de données de type, clé=>valeur
This commit is contained in:
parent
43e5cdb63a
commit
d5722d9c8b
@ -2,22 +2,30 @@ package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"deevirt.fr/compute/pkg/api/proto"
|
||||
"deevirt.fr/compute/pkg/config"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"deevirt.fr/compute/pkg/raft"
|
||||
)
|
||||
|
||||
type Domain struct {
|
||||
Config *config.Config
|
||||
Etcd *clientv3.Client
|
||||
Store *raft.Store
|
||||
proto.UnimplementedDomainServer
|
||||
}
|
||||
|
||||
func (d *Domain) List(ctx context.Context, in *proto.DomainListAllRequest) (*proto.DomainListAllResponse, error) {
|
||||
test, _ := d.Store.Get("test")
|
||||
print(test)
|
||||
return &proto.DomainListAllResponse{}, nil
|
||||
}
|
||||
|
||||
func (d *Domain) Get(ctx context.Context, in *proto.DomainListRequest) (*proto.DomainListResponse, error) {
|
||||
d.Store.Set("test", "test")
|
||||
return &proto.DomainListResponse{}, nil
|
||||
}
|
||||
|
||||
/*func (d *Domain) List(ctx context.Context, in *proto.DomainListAllRequest) (*proto.DomainListAllResponse, error) {
|
||||
var domains = []*proto.DomainListResponse{}
|
||||
|
||||
ctx_etcd, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
@ -77,17 +85,18 @@ func (d *Domain) Create(ctx context.Context, in *proto.DomainCreateRequest) (*pr
|
||||
}
|
||||
defer conn.Close()*/
|
||||
|
||||
/*
|
||||
/*
|
||||
|
||||
async def Create(self, request, context):
|
||||
yield domain_pb2.DomainCreateResponse(progress=40)
|
||||
async with Libvirt() as libvirt:
|
||||
if await libvirt.define(request.config.decode()):
|
||||
yield domain_pb2.DomainCreateResponse(progress=100)
|
||||
else:
|
||||
context.set_code(grpc.StatusCode.ALREADY_EXISTS)
|
||||
async def Create(self, request, context):
|
||||
yield domain_pb2.DomainCreateResponse(progress=40)
|
||||
async with Libvirt() as libvirt:
|
||||
if await libvirt.define(request.config.decode()):
|
||||
yield domain_pb2.DomainCreateResponse(progress=100)
|
||||
else:
|
||||
context.set_code(grpc.StatusCode.ALREADY_EXISTS)
|
||||
|
||||
*/
|
||||
*/
|
||||
|
||||
return &proto.DomainCreateResponse{}, nil
|
||||
/*return &proto.DomainCreateResponse{}, nil
|
||||
}
|
||||
*/
|
||||
|
@ -1,7 +1,6 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
@ -47,7 +46,7 @@ func createGRPCServer(conf *config.Config) *grpc.Server {
|
||||
}
|
||||
|
||||
func Server() {
|
||||
ctx := context.Background()
|
||||
//ctx := context.Background()
|
||||
|
||||
// Récupération de la configuration deevirt
|
||||
conf, err := config.New()
|
||||
@ -60,16 +59,21 @@ func Server() {
|
||||
log.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
|
||||
r, tm, err := raft.New(ctx, conf, 4480)
|
||||
r := raft.New(conf, 4480)
|
||||
|
||||
tm, err := r.Open()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start raft: %v", err)
|
||||
}
|
||||
|
||||
s := createGRPCServer(conf)
|
||||
pb.RegisterDomainServer(s, nil)
|
||||
pb.RegisterDomainServer(s, &Domain{
|
||||
Config: conf,
|
||||
Store: r,
|
||||
})
|
||||
tm.Register(s)
|
||||
//leaderhealth.Setup(r, s, []string{"Example"})
|
||||
raft.Register(s, r)
|
||||
raft.Register(s, r.Raft)
|
||||
reflection.Register(s)
|
||||
if err := s.Serve(sock); err != nil {
|
||||
log.Fatalf("failed to serve: %v", err)
|
||||
|
98
pkg/raft/fsm.go
Normal file
98
pkg/raft/fsm.go
Normal file
@ -0,0 +1,98 @@
|
||||
package raft
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
type Fsm Store
|
||||
|
||||
// Apply applies a Raft log entry to the key-value store.
|
||||
func (f *Fsm) Apply(l *raft.Log) interface{} {
|
||||
var c command
|
||||
if err := json.Unmarshal(l.Data, &c); err != nil {
|
||||
panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error()))
|
||||
}
|
||||
|
||||
switch c.Op {
|
||||
case "set":
|
||||
return f.applySet(c.Key, c.Value)
|
||||
case "delete":
|
||||
return f.applyDelete(c.Key)
|
||||
default:
|
||||
panic(fmt.Sprintf("unrecognized command op: %s", c.Op))
|
||||
}
|
||||
}
|
||||
|
||||
// Snapshot returns a snapshot of the key-value store.
|
||||
func (f *Fsm) Snapshot() (raft.FSMSnapshot, error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
// Clone the map.
|
||||
o := make(map[string]string)
|
||||
for k, v := range f.m {
|
||||
o[k] = v
|
||||
}
|
||||
return &fsmSnapshot{store: o}, nil
|
||||
}
|
||||
|
||||
// Restore stores the key-value store to a previous state.
|
||||
func (f *Fsm) Restore(rc io.ReadCloser) error {
|
||||
o := make(map[string]string)
|
||||
if err := json.NewDecoder(rc).Decode(&o); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set the state from the snapshot, no lock required according to
|
||||
// Hashicorp docs.
|
||||
f.m = o
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Fsm) applySet(key, value string) interface{} {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.m[key] = value
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Fsm) applyDelete(key string) interface{} {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
delete(f.m, key)
|
||||
return nil
|
||||
}
|
||||
|
||||
type fsmSnapshot struct {
|
||||
store map[string]string
|
||||
}
|
||||
|
||||
func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
|
||||
err := func() error {
|
||||
// Encode data.
|
||||
b, err := json.Marshal(f.store)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write data to sink.
|
||||
if _, err := sink.Write(b); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Close the sink.
|
||||
return sink.Close()
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
sink.Cancel()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *fsmSnapshot) Release() {}
|
@ -1,13 +1,15 @@
|
||||
package raft
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
transport "deevirt.fr/compute/pkg/raft/transport"
|
||||
"github.com/hashicorp/raft"
|
||||
@ -56,27 +58,59 @@ func getTLSCredentials(conf *config.Config) credentials.TransportCredentials {
|
||||
return creds
|
||||
}
|
||||
|
||||
func New(ctx context.Context, conf *config.Config, port int) (*raft.Raft, *transport.Manager, error) {
|
||||
const (
|
||||
retainSnapshotCount = 2
|
||||
raftTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
type command struct {
|
||||
Op string `json:"op,omitempty"`
|
||||
Key string `json:"key,omitempty"`
|
||||
Value string `json:"value,omitempty"`
|
||||
}
|
||||
|
||||
type Store struct {
|
||||
mu sync.Mutex
|
||||
conf *config.Config // Configuration générale
|
||||
port int // Port de communication (identique au serveur GRPC)
|
||||
|
||||
m map[string]string // The key-value store for the system.
|
||||
|
||||
Raft *raft.Raft // The consensus mechanism
|
||||
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
func New(conf *config.Config, port int) *Store {
|
||||
return &Store{
|
||||
conf: conf,
|
||||
port: port,
|
||||
m: make(map[string]string),
|
||||
logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) Open() (*transport.Manager, error) {
|
||||
// Création du répertoire
|
||||
baseDir := filepath.Join("/var/lib/deevirt/mgr/", conf.NodeID)
|
||||
baseDir := filepath.Join("/var/lib/deevirt/mgr/", s.conf.NodeID)
|
||||
err := os.MkdirAll(baseDir, 0740)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Récupération des Noeuds ID
|
||||
etcd, err := etcd_client.New(conf.EtcdURI)
|
||||
etcd, err := etcd_client.New(s.conf.EtcdURI)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
defer etcd.Close()
|
||||
|
||||
peers := []raft.Server{}
|
||||
|
||||
for key, value := range etcd_client.GetNodes(etcd, conf.ClusterID) {
|
||||
for key, value := range etcd_client.GetNodes(etcd, s.conf.ClusterID) {
|
||||
var p string
|
||||
|
||||
for _, peer := range conf.Manager.Peers {
|
||||
for _, peer := range s.conf.Manager.Peers {
|
||||
if peer == value.IpManagement {
|
||||
p = peer
|
||||
}
|
||||
@ -85,45 +119,41 @@ func New(ctx context.Context, conf *config.Config, port int) (*raft.Raft, *trans
|
||||
if p != "" {
|
||||
peers = append(peers, raft.Server{
|
||||
ID: raft.ServerID(key),
|
||||
Address: raft.ServerAddress(fmt.Sprintf("%s:%d", p, port)),
|
||||
Address: raft.ServerAddress(fmt.Sprintf("%s:%d", p, s.port)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
c := raft.DefaultConfig()
|
||||
c.LocalID = raft.ServerID(conf.NodeID)
|
||||
c.LocalID = raft.ServerID(s.conf.NodeID)
|
||||
|
||||
ldb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "logs.dat"))
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
|
||||
return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
|
||||
}
|
||||
|
||||
/*sdb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "stable.dat"))
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err)
|
||||
}*/
|
||||
|
||||
fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
|
||||
return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
|
||||
}
|
||||
|
||||
dialOption := []grpc.DialOption{}
|
||||
|
||||
if conf.Manager.TlsKey != "" {
|
||||
dialOption = append(dialOption, grpc.WithTransportCredentials(getTLSCredentials(conf)))
|
||||
if s.conf.Manager.TlsKey != "" {
|
||||
dialOption = append(dialOption, grpc.WithTransportCredentials(getTLSCredentials(s.conf)))
|
||||
}
|
||||
|
||||
tm := transport.New(raft.ServerAddress(fmt.Sprintf("%s:%d", conf.AddressPrivate, port)), dialOption)
|
||||
tm := transport.New(raft.ServerAddress(fmt.Sprintf("%s:%d", s.conf.AddressPrivate, s.port)), dialOption)
|
||||
|
||||
r, err := raft.NewRaft(c, nil, ldb, ldb, fss, tm.Transport())
|
||||
r, err := raft.NewRaft(c, (*Fsm)(s), ldb, ldb, fss, tm.Transport())
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("raft.NewRaft: %v", err)
|
||||
return nil, fmt.Errorf("raft.NewRaft: %v", err)
|
||||
}
|
||||
s.Raft = r
|
||||
|
||||
s, err := scheduler.New()
|
||||
sched, err := scheduler.New()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("scheduler: %v", err)
|
||||
return nil, fmt.Errorf("scheduler: %v", err)
|
||||
}
|
||||
|
||||
// Observer pour surveiller les changements d'état
|
||||
@ -132,16 +162,16 @@ func New(ctx context.Context, conf *config.Config, port int) (*raft.Raft, *trans
|
||||
|
||||
node := &RaftNode{
|
||||
Raft: r,
|
||||
NodeID: conf.NodeID,
|
||||
NodeID: s.conf.NodeID,
|
||||
StateCh: stateCh,
|
||||
scheduler: s,
|
||||
scheduler: sched,
|
||||
}
|
||||
|
||||
go node.watchStateChanges()
|
||||
|
||||
hasState, _ := checkIfStateExists(ldb)
|
||||
|
||||
if conf.Manager.Peers[0] == conf.AddressPrivate && !hasState {
|
||||
if s.conf.Manager.Peers[0] == s.conf.AddressPrivate && !hasState {
|
||||
println("Démarrage du bootstrap ! ")
|
||||
|
||||
cfg := raft.Configuration{
|
||||
@ -149,11 +179,57 @@ func New(ctx context.Context, conf *config.Config, port int) (*raft.Raft, *trans
|
||||
}
|
||||
f := r.BootstrapCluster(cfg)
|
||||
if err := f.Error(); err != nil {
|
||||
return nil, nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
|
||||
return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return r, tm, nil
|
||||
return tm, nil
|
||||
}
|
||||
|
||||
// Get returns the value for the given key.
|
||||
func (s *Store) Get(key string) (string, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.m[key], nil
|
||||
}
|
||||
|
||||
// Set sets the value for the given key.
|
||||
func (s *Store) Set(key, value string) error {
|
||||
if s.Raft.State() != raft.Leader {
|
||||
return fmt.Errorf("not leader")
|
||||
}
|
||||
|
||||
c := &command{
|
||||
Op: "set",
|
||||
Key: key,
|
||||
Value: value,
|
||||
}
|
||||
b, err := json.Marshal(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f := s.Raft.Apply(b, raftTimeout)
|
||||
return f.Error()
|
||||
}
|
||||
|
||||
// Delete deletes the given key.
|
||||
func (s *Store) Delete(key string) error {
|
||||
if s.Raft.State() != raft.Leader {
|
||||
return fmt.Errorf("not leader")
|
||||
}
|
||||
|
||||
c := &command{
|
||||
Op: "delete",
|
||||
Key: key,
|
||||
}
|
||||
b, err := json.Marshal(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f := s.Raft.Apply(b, raftTimeout)
|
||||
return f.Error()
|
||||
}
|
||||
|
||||
// Vérifie si l'état Raft existe déjà
|
Loading…
x
Reference in New Issue
Block a user