143 lines
2.7 KiB
Go
143 lines
2.7 KiB
Go
package raft
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/hashicorp/raft"
|
|
)
|
|
|
|
type FSM Store
|
|
|
|
/*type FSM struct {
|
|
store *Store
|
|
client *clientv3.Client
|
|
}
|
|
|
|
func NewFSM(endpoints []string, store *Store) (*FSM, error) {
|
|
// Se connecter au cluster etcd
|
|
client, err := clientv3.New(clientv3.Config{
|
|
Endpoints: endpoints,
|
|
DialTimeout: 5 * time.Second,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &FSM{
|
|
store: store,
|
|
client: client,
|
|
}, nil
|
|
}*/
|
|
|
|
// Apply applies a Raft log entry to the key-value store.
|
|
func (f *FSM) Apply(l *raft.Log) interface{} {
|
|
switch l.Type {
|
|
case raft.LogCommand:
|
|
var c command
|
|
if err := json.Unmarshal(l.Data, &c); err != nil {
|
|
panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error()))
|
|
}
|
|
|
|
switch c.Op {
|
|
case "set":
|
|
f.applySet(c.Key, c.Value)
|
|
case "delete":
|
|
f.applyDelete(c.Key)
|
|
default:
|
|
panic(fmt.Sprintf("unrecognized command op: %s", c.Op))
|
|
}
|
|
|
|
// On réplique sur etcd si ce n'est pas une reprise des logs et si le noeud est leader
|
|
if l.Index > f.lastIndex && f.Raft.State() == raft.Leader {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
switch c.Op {
|
|
case "set":
|
|
f.etcd.Put(ctx, fmt.Sprintf("/deevirt/cluster/%s%s", f.conf.ClusterID, c.Key), string(c.Value))
|
|
case "delete":
|
|
f.etcd.Delete(ctx, fmt.Sprintf("/deevirt/cluster/%s%s", f.conf.ClusterID, c.Key))
|
|
}
|
|
defer cancel()
|
|
}
|
|
default:
|
|
println(l.Type.String())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Snapshot returns a snapshot of the key-value store.
|
|
func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
|
|
// Clone the map.
|
|
o := make(map[string][]byte)
|
|
for k, v := range f.m {
|
|
o[k] = v
|
|
}
|
|
|
|
return &fsmSnapshot{store: o}, nil
|
|
}
|
|
|
|
// Restore stores the key-value store to a previous state.
|
|
func (f *FSM) Restore(rc io.ReadCloser) error {
|
|
o := make(map[string][]byte)
|
|
|
|
if err := json.NewDecoder(rc).Decode(&o); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Set the state from the snapshot, no lock required according to
|
|
// Hashicorp docs.
|
|
f.m = o
|
|
return nil
|
|
}
|
|
|
|
func (f *FSM) applySet(key string, value []byte) interface{} {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
f.m[key] = value
|
|
return nil
|
|
}
|
|
|
|
func (f *FSM) applyDelete(key string) interface{} {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
delete(f.m, key)
|
|
return nil
|
|
}
|
|
|
|
type fsmSnapshot struct {
|
|
store map[string][]byte
|
|
}
|
|
|
|
func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
|
|
err := func() error {
|
|
// Encode data.
|
|
b, err := json.Marshal(f.store)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Write data to sink.
|
|
if _, err := sink.Write(b); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Close the sink.
|
|
return sink.Close()
|
|
}()
|
|
|
|
if err != nil {
|
|
sink.Cancel()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (f *fsmSnapshot) Release() {
|
|
}
|