Compare commits
2 Commits
4edb3ff0eb
...
6cb2813f94
Author | SHA1 | Date | |
---|---|---|---|
6cb2813f94 | |||
7843d0ba75 |
@ -1,8 +1,8 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"deevirt.fr/compute/cmd/compute_qemu/events"
|
"deevirt.fr/compute/cmd/monitor/events"
|
||||||
"deevirt.fr/compute/cmd/compute_qemu/metrics"
|
"deevirt.fr/compute/cmd/monitor/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -1,15 +1,19 @@
|
|||||||
syntax = "proto3";
|
syntax = "proto3";
|
||||||
|
|
||||||
option go_package = "github.com/Jille/raft-grpc-transport/proto";
|
option go_package = "./proto";
|
||||||
|
|
||||||
import "google/protobuf/timestamp.proto";
|
import "google/protobuf/timestamp.proto";
|
||||||
|
package raft;
|
||||||
|
|
||||||
service RaftTransport {
|
service RaftTransport {
|
||||||
// AppendEntriesPipeline opens an AppendEntries message stream.
|
// AppendEntriesPipeline opens an AppendEntries message stream.
|
||||||
rpc AppendEntriesPipeline(stream AppendEntriesRequest) returns (stream AppendEntriesResponse) {}
|
rpc AppendEntriesPipeline(stream AppendEntriesRequest) returns (stream AppendEntriesResponse) {}
|
||||||
|
rpc AppendEntriesChunkedPipeline(stream AppendEntriesChunkedRequest) returns (stream AppendEntriesResponse) {}
|
||||||
|
|
||||||
// AppendEntries performs a single append entries request / response.
|
// AppendEntries performs a single append entries request / response.
|
||||||
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse) {}
|
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse) {}
|
||||||
|
// AppendEntries performs a single append entries request / response for request larger than the max grpc message size.
|
||||||
|
rpc AppendEntriesChunked(stream AppendEntriesChunkedRequest) returns (AppendEntriesResponse) {}
|
||||||
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
|
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
|
||||||
rpc RequestVote(RequestVoteRequest) returns (RequestVoteResponse) {}
|
rpc RequestVote(RequestVoteRequest) returns (RequestVoteResponse) {}
|
||||||
// TimeoutNow is used to start a leadership transfer to the target node.
|
// TimeoutNow is used to start a leadership transfer to the target node.
|
||||||
@ -53,6 +57,11 @@ message AppendEntriesRequest {
|
|||||||
uint64 leader_commit_index = 7;
|
uint64 leader_commit_index = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message AppendEntriesChunkedRequest {
|
||||||
|
int64 remaining_bytes = 1; // number of bytes of the same request AFTER this chunk
|
||||||
|
bytes chunk = 2;
|
||||||
|
}
|
||||||
|
|
||||||
message AppendEntriesResponse {
|
message AppendEntriesResponse {
|
||||||
RPCHeader rpc_header = 1;
|
RPCHeader rpc_header = 1;
|
||||||
uint64 term = 2;
|
uint64 term = 2;
|
||||||
@ -119,4 +128,4 @@ message RequestPreVoteResponse {
|
|||||||
RPCHeader rpc_header = 1;
|
RPCHeader rpc_header = 1;
|
||||||
uint64 term = 2;
|
uint64 term = 2;
|
||||||
bool granted = 3;
|
bool granted = 3;
|
||||||
}
|
}
|
@ -1,8 +1,8 @@
|
|||||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||||
// versions:
|
// versions:
|
||||||
// - protoc-gen-go-grpc v1.2.0
|
// - protoc-gen-go-grpc v1.5.1
|
||||||
// - protoc v3.21.3
|
// - protoc v3.14.0
|
||||||
// source: transport.proto
|
// source: proto/raft_transport.proto
|
||||||
|
|
||||||
package proto
|
package proto
|
||||||
|
|
||||||
@ -15,23 +15,37 @@ import (
|
|||||||
|
|
||||||
// This is a compile-time assertion to ensure that this generated file
|
// This is a compile-time assertion to ensure that this generated file
|
||||||
// is compatible with the grpc package it is being compiled against.
|
// is compatible with the grpc package it is being compiled against.
|
||||||
// Requires gRPC-Go v1.32.0 or later.
|
// Requires gRPC-Go v1.64.0 or later.
|
||||||
const _ = grpc.SupportPackageIsVersion7
|
const _ = grpc.SupportPackageIsVersion9
|
||||||
|
|
||||||
|
const (
|
||||||
|
RaftTransport_AppendEntriesPipeline_FullMethodName = "/raft.RaftTransport/AppendEntriesPipeline"
|
||||||
|
RaftTransport_AppendEntriesChunkedPipeline_FullMethodName = "/raft.RaftTransport/AppendEntriesChunkedPipeline"
|
||||||
|
RaftTransport_AppendEntries_FullMethodName = "/raft.RaftTransport/AppendEntries"
|
||||||
|
RaftTransport_AppendEntriesChunked_FullMethodName = "/raft.RaftTransport/AppendEntriesChunked"
|
||||||
|
RaftTransport_RequestVote_FullMethodName = "/raft.RaftTransport/RequestVote"
|
||||||
|
RaftTransport_TimeoutNow_FullMethodName = "/raft.RaftTransport/TimeoutNow"
|
||||||
|
RaftTransport_InstallSnapshot_FullMethodName = "/raft.RaftTransport/InstallSnapshot"
|
||||||
|
RaftTransport_RequestPreVote_FullMethodName = "/raft.RaftTransport/RequestPreVote"
|
||||||
|
)
|
||||||
|
|
||||||
// RaftTransportClient is the client API for RaftTransport service.
|
// RaftTransportClient is the client API for RaftTransport service.
|
||||||
//
|
//
|
||||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||||
type RaftTransportClient interface {
|
type RaftTransportClient interface {
|
||||||
// AppendEntriesPipeline opens an AppendEntries message stream.
|
// AppendEntriesPipeline opens an AppendEntries message stream.
|
||||||
AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_AppendEntriesPipelineClient, error)
|
AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AppendEntriesRequest, AppendEntriesResponse], error)
|
||||||
|
AppendEntriesChunkedPipeline(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse], error)
|
||||||
// AppendEntries performs a single append entries request / response.
|
// AppendEntries performs a single append entries request / response.
|
||||||
AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error)
|
AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error)
|
||||||
|
// AppendEntries performs a single append entries request / response for request larger than the max grpc message size.
|
||||||
|
AppendEntriesChunked(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse], error)
|
||||||
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
|
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
|
||||||
RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error)
|
RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error)
|
||||||
// TimeoutNow is used to start a leadership transfer to the target node.
|
// TimeoutNow is used to start a leadership transfer to the target node.
|
||||||
TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error)
|
TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error)
|
||||||
// InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.
|
// InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.
|
||||||
InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_InstallSnapshotClient, error)
|
InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[InstallSnapshotRequest, InstallSnapshotResponse], error)
|
||||||
// RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election.
|
// RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election.
|
||||||
RequestPreVote(ctx context.Context, in *RequestPreVoteRequest, opts ...grpc.CallOption) (*RequestPreVoteResponse, error)
|
RequestPreVote(ctx context.Context, in *RequestPreVoteRequest, opts ...grpc.CallOption) (*RequestPreVoteResponse, error)
|
||||||
}
|
}
|
||||||
@ -44,49 +58,59 @@ func NewRaftTransportClient(cc grpc.ClientConnInterface) RaftTransportClient {
|
|||||||
return &raftTransportClient{cc}
|
return &raftTransportClient{cc}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *raftTransportClient) AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_AppendEntriesPipelineClient, error) {
|
func (c *raftTransportClient) AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AppendEntriesRequest, AppendEntriesResponse], error) {
|
||||||
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[0], "/RaftTransport/AppendEntriesPipeline", opts...)
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
|
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[0], RaftTransport_AppendEntriesPipeline_FullMethodName, cOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
x := &raftTransportAppendEntriesPipelineClient{stream}
|
x := &grpc.GenericClientStream[AppendEntriesRequest, AppendEntriesResponse]{ClientStream: stream}
|
||||||
return x, nil
|
return x, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type RaftTransport_AppendEntriesPipelineClient interface {
|
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||||
Send(*AppendEntriesRequest) error
|
type RaftTransport_AppendEntriesPipelineClient = grpc.BidiStreamingClient[AppendEntriesRequest, AppendEntriesResponse]
|
||||||
Recv() (*AppendEntriesResponse, error)
|
|
||||||
grpc.ClientStream
|
|
||||||
}
|
|
||||||
|
|
||||||
type raftTransportAppendEntriesPipelineClient struct {
|
func (c *raftTransportClient) AppendEntriesChunkedPipeline(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse], error) {
|
||||||
grpc.ClientStream
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
}
|
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[1], RaftTransport_AppendEntriesChunkedPipeline_FullMethodName, cOpts...)
|
||||||
|
if err != nil {
|
||||||
func (x *raftTransportAppendEntriesPipelineClient) Send(m *AppendEntriesRequest) error {
|
|
||||||
return x.ClientStream.SendMsg(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *raftTransportAppendEntriesPipelineClient) Recv() (*AppendEntriesResponse, error) {
|
|
||||||
m := new(AppendEntriesResponse)
|
|
||||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return m, nil
|
x := &grpc.GenericClientStream[AppendEntriesChunkedRequest, AppendEntriesResponse]{ClientStream: stream}
|
||||||
|
return x, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||||
|
type RaftTransport_AppendEntriesChunkedPipelineClient = grpc.BidiStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse]
|
||||||
|
|
||||||
func (c *raftTransportClient) AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error) {
|
func (c *raftTransportClient) AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error) {
|
||||||
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
out := new(AppendEntriesResponse)
|
out := new(AppendEntriesResponse)
|
||||||
err := c.cc.Invoke(ctx, "/RaftTransport/AppendEntries", in, out, opts...)
|
err := c.cc.Invoke(ctx, RaftTransport_AppendEntries_FullMethodName, in, out, cOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *raftTransportClient) AppendEntriesChunked(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse], error) {
|
||||||
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
|
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[2], RaftTransport_AppendEntriesChunked_FullMethodName, cOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
x := &grpc.GenericClientStream[AppendEntriesChunkedRequest, AppendEntriesResponse]{ClientStream: stream}
|
||||||
|
return x, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||||
|
type RaftTransport_AppendEntriesChunkedClient = grpc.ClientStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse]
|
||||||
|
|
||||||
func (c *raftTransportClient) RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error) {
|
func (c *raftTransportClient) RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error) {
|
||||||
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
out := new(RequestVoteResponse)
|
out := new(RequestVoteResponse)
|
||||||
err := c.cc.Invoke(ctx, "/RaftTransport/RequestVote", in, out, opts...)
|
err := c.cc.Invoke(ctx, RaftTransport_RequestVote_FullMethodName, in, out, cOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -94,51 +118,32 @@ func (c *raftTransportClient) RequestVote(ctx context.Context, in *RequestVoteRe
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *raftTransportClient) TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error) {
|
func (c *raftTransportClient) TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error) {
|
||||||
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
out := new(TimeoutNowResponse)
|
out := new(TimeoutNowResponse)
|
||||||
err := c.cc.Invoke(ctx, "/RaftTransport/TimeoutNow", in, out, opts...)
|
err := c.cc.Invoke(ctx, RaftTransport_TimeoutNow_FullMethodName, in, out, cOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *raftTransportClient) InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_InstallSnapshotClient, error) {
|
func (c *raftTransportClient) InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[InstallSnapshotRequest, InstallSnapshotResponse], error) {
|
||||||
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[1], "/RaftTransport/InstallSnapshot", opts...)
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
|
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[3], RaftTransport_InstallSnapshot_FullMethodName, cOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
x := &raftTransportInstallSnapshotClient{stream}
|
x := &grpc.GenericClientStream[InstallSnapshotRequest, InstallSnapshotResponse]{ClientStream: stream}
|
||||||
return x, nil
|
return x, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type RaftTransport_InstallSnapshotClient interface {
|
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||||
Send(*InstallSnapshotRequest) error
|
type RaftTransport_InstallSnapshotClient = grpc.ClientStreamingClient[InstallSnapshotRequest, InstallSnapshotResponse]
|
||||||
CloseAndRecv() (*InstallSnapshotResponse, error)
|
|
||||||
grpc.ClientStream
|
|
||||||
}
|
|
||||||
|
|
||||||
type raftTransportInstallSnapshotClient struct {
|
|
||||||
grpc.ClientStream
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *raftTransportInstallSnapshotClient) Send(m *InstallSnapshotRequest) error {
|
|
||||||
return x.ClientStream.SendMsg(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *raftTransportInstallSnapshotClient) CloseAndRecv() (*InstallSnapshotResponse, error) {
|
|
||||||
if err := x.ClientStream.CloseSend(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
m := new(InstallSnapshotResponse)
|
|
||||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *raftTransportClient) RequestPreVote(ctx context.Context, in *RequestPreVoteRequest, opts ...grpc.CallOption) (*RequestPreVoteResponse, error) {
|
func (c *raftTransportClient) RequestPreVote(ctx context.Context, in *RequestPreVoteRequest, opts ...grpc.CallOption) (*RequestPreVoteResponse, error) {
|
||||||
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
out := new(RequestPreVoteResponse)
|
out := new(RequestPreVoteResponse)
|
||||||
err := c.cc.Invoke(ctx, "/RaftTransport/RequestPreVote", in, out, opts...)
|
err := c.cc.Invoke(ctx, RaftTransport_RequestPreVote_FullMethodName, in, out, cOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -147,46 +152,59 @@ func (c *raftTransportClient) RequestPreVote(ctx context.Context, in *RequestPre
|
|||||||
|
|
||||||
// RaftTransportServer is the server API for RaftTransport service.
|
// RaftTransportServer is the server API for RaftTransport service.
|
||||||
// All implementations must embed UnimplementedRaftTransportServer
|
// All implementations must embed UnimplementedRaftTransportServer
|
||||||
// for forward compatibility
|
// for forward compatibility.
|
||||||
type RaftTransportServer interface {
|
type RaftTransportServer interface {
|
||||||
// AppendEntriesPipeline opens an AppendEntries message stream.
|
// AppendEntriesPipeline opens an AppendEntries message stream.
|
||||||
AppendEntriesPipeline(RaftTransport_AppendEntriesPipelineServer) error
|
AppendEntriesPipeline(grpc.BidiStreamingServer[AppendEntriesRequest, AppendEntriesResponse]) error
|
||||||
|
AppendEntriesChunkedPipeline(grpc.BidiStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]) error
|
||||||
// AppendEntries performs a single append entries request / response.
|
// AppendEntries performs a single append entries request / response.
|
||||||
AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error)
|
AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error)
|
||||||
|
// AppendEntries performs a single append entries request / response for request larger than the max grpc message size.
|
||||||
|
AppendEntriesChunked(grpc.ClientStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]) error
|
||||||
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
|
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
|
||||||
RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error)
|
RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error)
|
||||||
// TimeoutNow is used to start a leadership transfer to the target node.
|
// TimeoutNow is used to start a leadership transfer to the target node.
|
||||||
TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error)
|
TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error)
|
||||||
// InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.
|
// InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.
|
||||||
InstallSnapshot(RaftTransport_InstallSnapshotServer) error
|
InstallSnapshot(grpc.ClientStreamingServer[InstallSnapshotRequest, InstallSnapshotResponse]) error
|
||||||
// RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election.
|
// RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election.
|
||||||
RequestPreVote(context.Context, *RequestPreVoteRequest) (*RequestPreVoteResponse, error)
|
RequestPreVote(context.Context, *RequestPreVoteRequest) (*RequestPreVoteResponse, error)
|
||||||
mustEmbedUnimplementedRaftTransportServer()
|
mustEmbedUnimplementedRaftTransportServer()
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnimplementedRaftTransportServer must be embedded to have forward compatible implementations.
|
// UnimplementedRaftTransportServer must be embedded to have
|
||||||
type UnimplementedRaftTransportServer struct {
|
// forward compatible implementations.
|
||||||
}
|
//
|
||||||
|
// NOTE: this should be embedded by value instead of pointer to avoid a nil
|
||||||
|
// pointer dereference when methods are called.
|
||||||
|
type UnimplementedRaftTransportServer struct{}
|
||||||
|
|
||||||
func (UnimplementedRaftTransportServer) AppendEntriesPipeline(RaftTransport_AppendEntriesPipelineServer) error {
|
func (UnimplementedRaftTransportServer) AppendEntriesPipeline(grpc.BidiStreamingServer[AppendEntriesRequest, AppendEntriesResponse]) error {
|
||||||
return status.Errorf(codes.Unimplemented, "method AppendEntriesPipeline not implemented")
|
return status.Errorf(codes.Unimplemented, "method AppendEntriesPipeline not implemented")
|
||||||
}
|
}
|
||||||
|
func (UnimplementedRaftTransportServer) AppendEntriesChunkedPipeline(grpc.BidiStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]) error {
|
||||||
|
return status.Errorf(codes.Unimplemented, "method AppendEntriesChunkedPipeline not implemented")
|
||||||
|
}
|
||||||
func (UnimplementedRaftTransportServer) AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error) {
|
func (UnimplementedRaftTransportServer) AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error) {
|
||||||
return nil, status.Errorf(codes.Unimplemented, "method AppendEntries not implemented")
|
return nil, status.Errorf(codes.Unimplemented, "method AppendEntries not implemented")
|
||||||
}
|
}
|
||||||
|
func (UnimplementedRaftTransportServer) AppendEntriesChunked(grpc.ClientStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]) error {
|
||||||
|
return status.Errorf(codes.Unimplemented, "method AppendEntriesChunked not implemented")
|
||||||
|
}
|
||||||
func (UnimplementedRaftTransportServer) RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error) {
|
func (UnimplementedRaftTransportServer) RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error) {
|
||||||
return nil, status.Errorf(codes.Unimplemented, "method RequestVote not implemented")
|
return nil, status.Errorf(codes.Unimplemented, "method RequestVote not implemented")
|
||||||
}
|
}
|
||||||
func (UnimplementedRaftTransportServer) TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error) {
|
func (UnimplementedRaftTransportServer) TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error) {
|
||||||
return nil, status.Errorf(codes.Unimplemented, "method TimeoutNow not implemented")
|
return nil, status.Errorf(codes.Unimplemented, "method TimeoutNow not implemented")
|
||||||
}
|
}
|
||||||
func (UnimplementedRaftTransportServer) InstallSnapshot(RaftTransport_InstallSnapshotServer) error {
|
func (UnimplementedRaftTransportServer) InstallSnapshot(grpc.ClientStreamingServer[InstallSnapshotRequest, InstallSnapshotResponse]) error {
|
||||||
return status.Errorf(codes.Unimplemented, "method InstallSnapshot not implemented")
|
return status.Errorf(codes.Unimplemented, "method InstallSnapshot not implemented")
|
||||||
}
|
}
|
||||||
func (UnimplementedRaftTransportServer) RequestPreVote(context.Context, *RequestPreVoteRequest) (*RequestPreVoteResponse, error) {
|
func (UnimplementedRaftTransportServer) RequestPreVote(context.Context, *RequestPreVoteRequest) (*RequestPreVoteResponse, error) {
|
||||||
return nil, status.Errorf(codes.Unimplemented, "method RequestPreVote not implemented")
|
return nil, status.Errorf(codes.Unimplemented, "method RequestPreVote not implemented")
|
||||||
}
|
}
|
||||||
func (UnimplementedRaftTransportServer) mustEmbedUnimplementedRaftTransportServer() {}
|
func (UnimplementedRaftTransportServer) mustEmbedUnimplementedRaftTransportServer() {}
|
||||||
|
func (UnimplementedRaftTransportServer) testEmbeddedByValue() {}
|
||||||
|
|
||||||
// UnsafeRaftTransportServer may be embedded to opt out of forward compatibility for this service.
|
// UnsafeRaftTransportServer may be embedded to opt out of forward compatibility for this service.
|
||||||
// Use of this interface is not recommended, as added methods to RaftTransportServer will
|
// Use of this interface is not recommended, as added methods to RaftTransportServer will
|
||||||
@ -196,34 +214,29 @@ type UnsafeRaftTransportServer interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RegisterRaftTransportServer(s grpc.ServiceRegistrar, srv RaftTransportServer) {
|
func RegisterRaftTransportServer(s grpc.ServiceRegistrar, srv RaftTransportServer) {
|
||||||
|
// If the following call pancis, it indicates UnimplementedRaftTransportServer was
|
||||||
|
// embedded by pointer and is nil. This will cause panics if an
|
||||||
|
// unimplemented method is ever invoked, so we test this at initialization
|
||||||
|
// time to prevent it from happening at runtime later due to I/O.
|
||||||
|
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
|
||||||
|
t.testEmbeddedByValue()
|
||||||
|
}
|
||||||
s.RegisterService(&RaftTransport_ServiceDesc, srv)
|
s.RegisterService(&RaftTransport_ServiceDesc, srv)
|
||||||
}
|
}
|
||||||
|
|
||||||
func _RaftTransport_AppendEntriesPipeline_Handler(srv interface{}, stream grpc.ServerStream) error {
|
func _RaftTransport_AppendEntriesPipeline_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||||
return srv.(RaftTransportServer).AppendEntriesPipeline(&raftTransportAppendEntriesPipelineServer{stream})
|
return srv.(RaftTransportServer).AppendEntriesPipeline(&grpc.GenericServerStream[AppendEntriesRequest, AppendEntriesResponse]{ServerStream: stream})
|
||||||
}
|
}
|
||||||
|
|
||||||
type RaftTransport_AppendEntriesPipelineServer interface {
|
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||||
Send(*AppendEntriesResponse) error
|
type RaftTransport_AppendEntriesPipelineServer = grpc.BidiStreamingServer[AppendEntriesRequest, AppendEntriesResponse]
|
||||||
Recv() (*AppendEntriesRequest, error)
|
|
||||||
grpc.ServerStream
|
func _RaftTransport_AppendEntriesChunkedPipeline_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||||
|
return srv.(RaftTransportServer).AppendEntriesChunkedPipeline(&grpc.GenericServerStream[AppendEntriesChunkedRequest, AppendEntriesResponse]{ServerStream: stream})
|
||||||
}
|
}
|
||||||
|
|
||||||
type raftTransportAppendEntriesPipelineServer struct {
|
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||||
grpc.ServerStream
|
type RaftTransport_AppendEntriesChunkedPipelineServer = grpc.BidiStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]
|
||||||
}
|
|
||||||
|
|
||||||
func (x *raftTransportAppendEntriesPipelineServer) Send(m *AppendEntriesResponse) error {
|
|
||||||
return x.ServerStream.SendMsg(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *raftTransportAppendEntriesPipelineServer) Recv() (*AppendEntriesRequest, error) {
|
|
||||||
m := new(AppendEntriesRequest)
|
|
||||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func _RaftTransport_AppendEntries_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
func _RaftTransport_AppendEntries_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
in := new(AppendEntriesRequest)
|
in := new(AppendEntriesRequest)
|
||||||
@ -235,7 +248,7 @@ func _RaftTransport_AppendEntries_Handler(srv interface{}, ctx context.Context,
|
|||||||
}
|
}
|
||||||
info := &grpc.UnaryServerInfo{
|
info := &grpc.UnaryServerInfo{
|
||||||
Server: srv,
|
Server: srv,
|
||||||
FullMethod: "/RaftTransport/AppendEntries",
|
FullMethod: RaftTransport_AppendEntries_FullMethodName,
|
||||||
}
|
}
|
||||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
return srv.(RaftTransportServer).AppendEntries(ctx, req.(*AppendEntriesRequest))
|
return srv.(RaftTransportServer).AppendEntries(ctx, req.(*AppendEntriesRequest))
|
||||||
@ -243,6 +256,13 @@ func _RaftTransport_AppendEntries_Handler(srv interface{}, ctx context.Context,
|
|||||||
return interceptor(ctx, in, info, handler)
|
return interceptor(ctx, in, info, handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func _RaftTransport_AppendEntriesChunked_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||||
|
return srv.(RaftTransportServer).AppendEntriesChunked(&grpc.GenericServerStream[AppendEntriesChunkedRequest, AppendEntriesResponse]{ServerStream: stream})
|
||||||
|
}
|
||||||
|
|
||||||
|
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||||
|
type RaftTransport_AppendEntriesChunkedServer = grpc.ClientStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]
|
||||||
|
|
||||||
func _RaftTransport_RequestVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
func _RaftTransport_RequestVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
in := new(RequestVoteRequest)
|
in := new(RequestVoteRequest)
|
||||||
if err := dec(in); err != nil {
|
if err := dec(in); err != nil {
|
||||||
@ -253,7 +273,7 @@ func _RaftTransport_RequestVote_Handler(srv interface{}, ctx context.Context, de
|
|||||||
}
|
}
|
||||||
info := &grpc.UnaryServerInfo{
|
info := &grpc.UnaryServerInfo{
|
||||||
Server: srv,
|
Server: srv,
|
||||||
FullMethod: "/RaftTransport/RequestVote",
|
FullMethod: RaftTransport_RequestVote_FullMethodName,
|
||||||
}
|
}
|
||||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
return srv.(RaftTransportServer).RequestVote(ctx, req.(*RequestVoteRequest))
|
return srv.(RaftTransportServer).RequestVote(ctx, req.(*RequestVoteRequest))
|
||||||
@ -271,7 +291,7 @@ func _RaftTransport_TimeoutNow_Handler(srv interface{}, ctx context.Context, dec
|
|||||||
}
|
}
|
||||||
info := &grpc.UnaryServerInfo{
|
info := &grpc.UnaryServerInfo{
|
||||||
Server: srv,
|
Server: srv,
|
||||||
FullMethod: "/RaftTransport/TimeoutNow",
|
FullMethod: RaftTransport_TimeoutNow_FullMethodName,
|
||||||
}
|
}
|
||||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
return srv.(RaftTransportServer).TimeoutNow(ctx, req.(*TimeoutNowRequest))
|
return srv.(RaftTransportServer).TimeoutNow(ctx, req.(*TimeoutNowRequest))
|
||||||
@ -280,30 +300,11 @@ func _RaftTransport_TimeoutNow_Handler(srv interface{}, ctx context.Context, dec
|
|||||||
}
|
}
|
||||||
|
|
||||||
func _RaftTransport_InstallSnapshot_Handler(srv interface{}, stream grpc.ServerStream) error {
|
func _RaftTransport_InstallSnapshot_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||||
return srv.(RaftTransportServer).InstallSnapshot(&raftTransportInstallSnapshotServer{stream})
|
return srv.(RaftTransportServer).InstallSnapshot(&grpc.GenericServerStream[InstallSnapshotRequest, InstallSnapshotResponse]{ServerStream: stream})
|
||||||
}
|
}
|
||||||
|
|
||||||
type RaftTransport_InstallSnapshotServer interface {
|
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||||
SendAndClose(*InstallSnapshotResponse) error
|
type RaftTransport_InstallSnapshotServer = grpc.ClientStreamingServer[InstallSnapshotRequest, InstallSnapshotResponse]
|
||||||
Recv() (*InstallSnapshotRequest, error)
|
|
||||||
grpc.ServerStream
|
|
||||||
}
|
|
||||||
|
|
||||||
type raftTransportInstallSnapshotServer struct {
|
|
||||||
grpc.ServerStream
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *raftTransportInstallSnapshotServer) SendAndClose(m *InstallSnapshotResponse) error {
|
|
||||||
return x.ServerStream.SendMsg(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *raftTransportInstallSnapshotServer) Recv() (*InstallSnapshotRequest, error) {
|
|
||||||
m := new(InstallSnapshotRequest)
|
|
||||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func _RaftTransport_RequestPreVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
func _RaftTransport_RequestPreVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
in := new(RequestPreVoteRequest)
|
in := new(RequestPreVoteRequest)
|
||||||
@ -315,7 +316,7 @@ func _RaftTransport_RequestPreVote_Handler(srv interface{}, ctx context.Context,
|
|||||||
}
|
}
|
||||||
info := &grpc.UnaryServerInfo{
|
info := &grpc.UnaryServerInfo{
|
||||||
Server: srv,
|
Server: srv,
|
||||||
FullMethod: "/RaftTransport/RequestPreVote",
|
FullMethod: RaftTransport_RequestPreVote_FullMethodName,
|
||||||
}
|
}
|
||||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
return srv.(RaftTransportServer).RequestPreVote(ctx, req.(*RequestPreVoteRequest))
|
return srv.(RaftTransportServer).RequestPreVote(ctx, req.(*RequestPreVoteRequest))
|
||||||
@ -327,7 +328,7 @@ func _RaftTransport_RequestPreVote_Handler(srv interface{}, ctx context.Context,
|
|||||||
// It's only intended for direct use with grpc.RegisterService,
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
// and not to be introspected or modified (even as a copy)
|
// and not to be introspected or modified (even as a copy)
|
||||||
var RaftTransport_ServiceDesc = grpc.ServiceDesc{
|
var RaftTransport_ServiceDesc = grpc.ServiceDesc{
|
||||||
ServiceName: "RaftTransport",
|
ServiceName: "raft.RaftTransport",
|
||||||
HandlerType: (*RaftTransportServer)(nil),
|
HandlerType: (*RaftTransportServer)(nil),
|
||||||
Methods: []grpc.MethodDesc{
|
Methods: []grpc.MethodDesc{
|
||||||
{
|
{
|
||||||
@ -354,11 +355,22 @@ var RaftTransport_ServiceDesc = grpc.ServiceDesc{
|
|||||||
ServerStreams: true,
|
ServerStreams: true,
|
||||||
ClientStreams: true,
|
ClientStreams: true,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
StreamName: "AppendEntriesChunkedPipeline",
|
||||||
|
Handler: _RaftTransport_AppendEntriesChunkedPipeline_Handler,
|
||||||
|
ServerStreams: true,
|
||||||
|
ClientStreams: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StreamName: "AppendEntriesChunked",
|
||||||
|
Handler: _RaftTransport_AppendEntriesChunked_Handler,
|
||||||
|
ClientStreams: true,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
StreamName: "InstallSnapshot",
|
StreamName: "InstallSnapshot",
|
||||||
Handler: _RaftTransport_InstallSnapshot_Handler,
|
Handler: _RaftTransport_InstallSnapshot_Handler,
|
||||||
ClientStreams: true,
|
ClientStreams: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Metadata: "transport.proto",
|
Metadata: "proto/raft_transport.proto",
|
||||||
}
|
}
|
@ -2,31 +2,70 @@ package api
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials"
|
||||||
"google.golang.org/grpc/reflection"
|
"google.golang.org/grpc/reflection"
|
||||||
|
|
||||||
pb "deevirt.fr/compute/pkg/api/proto"
|
pb "deevirt.fr/compute/pkg/api/proto"
|
||||||
|
"deevirt.fr/compute/pkg/config"
|
||||||
"deevirt.fr/compute/pkg/raft"
|
"deevirt.fr/compute/pkg/raft"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func createGRPCServer(conf *config.Config) *grpc.Server {
|
||||||
|
if conf.Manager.TlsKey != "" {
|
||||||
|
cert, err := tls.LoadX509KeyPair(conf.Manager.TlsCert, conf.Manager.TlsKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Erreur chargement du certificat: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Charger la CA (facultatif, pour la vérification des clients)
|
||||||
|
caCert, err := os.ReadFile(conf.Manager.TlsCert)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Erreur chargement CA: %v", err)
|
||||||
|
}
|
||||||
|
certPool := x509.NewCertPool()
|
||||||
|
certPool.AppendCertsFromPEM(caCert)
|
||||||
|
|
||||||
|
// Créer les credentials TLS
|
||||||
|
creds := credentials.NewTLS(&tls.Config{
|
||||||
|
Certificates: []tls.Certificate{cert},
|
||||||
|
ClientCAs: certPool,
|
||||||
|
ClientAuth: tls.RequireAndVerifyClientCert, // Authentification mutuelle (mTLS),
|
||||||
|
})
|
||||||
|
|
||||||
|
return grpc.NewServer(grpc.Creds(creds))
|
||||||
|
}
|
||||||
|
|
||||||
|
return grpc.NewServer()
|
||||||
|
}
|
||||||
|
|
||||||
func Server() {
|
func Server() {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Récupération de la configuration deevirt
|
||||||
|
conf, err := config.New()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("failed load configuration: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
sock, err := net.Listen("tcp", fmt.Sprintf(":%d", 4480))
|
sock, err := net.Listen("tcp", fmt.Sprintf(":%d", 4480))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to listen: %v", err)
|
log.Fatalf("failed to listen: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
r, tm, err := raft.New(ctx, 4480)
|
r, tm, err := raft.New(ctx, conf, 4480)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to start raft: %v", err)
|
log.Fatalf("failed to start raft: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s := grpc.NewServer()
|
s := createGRPCServer(conf)
|
||||||
pb.RegisterDomainServer(s, nil)
|
pb.RegisterDomainServer(s, nil)
|
||||||
tm.Register(s)
|
tm.Register(s)
|
||||||
//leaderhealth.Setup(r, s, []string{"Example"})
|
//leaderhealth.Setup(r, s, []string{"Example"})
|
||||||
|
122
pkg/raft/node.go
122
pkg/raft/node.go
@ -2,15 +2,18 @@ package raft
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
transport "github.com/Jille/raft-grpc-transport"
|
transport "deevirt.fr/compute/pkg/raft/transport"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
raftboltdb "github.com/hashicorp/raft-boltdb"
|
raftboltdb "github.com/hashicorp/raft-boltdb"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials"
|
||||||
|
|
||||||
"deevirt.fr/compute/pkg/config"
|
"deevirt.fr/compute/pkg/config"
|
||||||
etcd_client "deevirt.fr/compute/pkg/etcd"
|
etcd_client "deevirt.fr/compute/pkg/etcd"
|
||||||
@ -29,16 +32,34 @@ type Peers struct {
|
|||||||
Address string
|
Address string
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ctx context.Context, port int) (*raft.Raft, *transport.Manager, error) {
|
func getTLSCredentials(conf *config.Config) credentials.TransportCredentials {
|
||||||
// Récupération de la configuration deevirt
|
cert, err := tls.LoadX509KeyPair(conf.Manager.TlsCert, conf.Manager.TlsKey)
|
||||||
conf, err := config.New()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
log.Fatalf("Erreur chargement du certificat: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Charger la CA (facultatif, pour la vérification des clients)
|
||||||
|
caCert, err := os.ReadFile(conf.Manager.TlsCert)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Erreur chargement CA: %v", err)
|
||||||
|
}
|
||||||
|
certPool := x509.NewCertPool()
|
||||||
|
certPool.AppendCertsFromPEM(caCert)
|
||||||
|
|
||||||
|
// Créer les credentials TLS
|
||||||
|
creds := credentials.NewTLS(&tls.Config{
|
||||||
|
Certificates: []tls.Certificate{cert},
|
||||||
|
ClientCAs: certPool,
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
return creds
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(ctx context.Context, conf *config.Config, port int) (*raft.Raft, *transport.Manager, error) {
|
||||||
// Création du répertoire
|
// Création du répertoire
|
||||||
baseDir := filepath.Join("/var/lib/deevirt/mgr/", conf.NodeID)
|
baseDir := filepath.Join("/var/lib/deevirt/mgr/", conf.NodeID)
|
||||||
err = os.MkdirAll(baseDir, 0740)
|
err := os.MkdirAll(baseDir, 0740)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -87,7 +108,13 @@ func New(ctx context.Context, port int) (*raft.Raft, *transport.Manager, error)
|
|||||||
return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
|
return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tm := transport.New(raft.ServerAddress(fmt.Sprintf("%s:%d", conf.AddressPrivate, port)), []grpc.DialOption{grpc.WithInsecure()})
|
dialOption := []grpc.DialOption{}
|
||||||
|
|
||||||
|
if conf.Manager.TlsKey != "" {
|
||||||
|
dialOption = append(dialOption, grpc.WithTransportCredentials(getTLSCredentials(conf)))
|
||||||
|
}
|
||||||
|
|
||||||
|
tm := transport.New(raft.ServerAddress(fmt.Sprintf("%s:%d", conf.AddressPrivate, port)), dialOption)
|
||||||
|
|
||||||
r, err := raft.NewRaft(c, nil, ldb, sdb, fss, tm.Transport())
|
r, err := raft.NewRaft(c, nil, ldb, sdb, fss, tm.Transport())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -183,3 +210,84 @@ func (n *RaftNode) watchStateChanges() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*func New(ctx context.Context, myID, myAddress string) (*raft.Raft, *transport.Manager, error) {
|
||||||
|
// Création du répertoire
|
||||||
|
baseDir := filepath.Join("/var/lib/deevirt/mgr/", myID)
|
||||||
|
err := os.MkdirAll(baseDir, 0740)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
println(myAddress)
|
||||||
|
|
||||||
|
peers := []raft.Server{
|
||||||
|
{
|
||||||
|
ID: raft.ServerID("nodeA"),
|
||||||
|
Address: raft.ServerAddress("172.16.9.161:4410"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: raft.ServerID("nodeB"),
|
||||||
|
Address: raft.ServerAddress("172.16.9.161:4411"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
c := raft.DefaultConfig()
|
||||||
|
c.LocalID = raft.ServerID(myID)
|
||||||
|
|
||||||
|
ldb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "logs.dat"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sdb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "stable.dat"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{grpc.WithTransportCredentials(getTLSCredentials())})
|
||||||
|
|
||||||
|
r, err := raft.NewRaft(c, nil, ldb, sdb, fss, tm.Transport())
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("raft.NewRaft: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s, err := scheduler.New()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("scheduler: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Observer pour surveiller les changements d'état
|
||||||
|
stateCh := make(chan raft.Observation, 1) // Canal de type raft.Observation
|
||||||
|
r.RegisterObserver(raft.NewObserver(stateCh, true, nil))
|
||||||
|
|
||||||
|
node := &RaftNode{
|
||||||
|
Raft: r,
|
||||||
|
NodeID: myID,
|
||||||
|
StateCh: stateCh,
|
||||||
|
scheduler: s,
|
||||||
|
}
|
||||||
|
|
||||||
|
go node.watchStateChanges()
|
||||||
|
|
||||||
|
hasState, _ := checkIfStateExists(ldb)
|
||||||
|
|
||||||
|
if myAddress == "172.16.9.161:4410" && !hasState {
|
||||||
|
println("Démarrage du bootstrap ! ")
|
||||||
|
|
||||||
|
cfg := raft.Configuration{
|
||||||
|
Servers: peers,
|
||||||
|
}
|
||||||
|
f := r.BootstrapCluster(cfg)
|
||||||
|
if err := f.Error(); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return r, tm, nil
|
||||||
|
}*/
|
||||||
|
@ -6,9 +6,11 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
pb "github.com/Jille/raft-grpc-transport/proto"
|
pb "deevirt.fr/compute/pkg/api/proto"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
// These are calls from the Raft engine that we need to send out over gRPC.
|
// These are calls from the Raft engine that we need to send out over gRPC.
|
||||||
@ -52,7 +54,7 @@ func (r raftAPI) getPeer(target raft.ServerAddress) (pb.RaftTransportClient, err
|
|||||||
}
|
}
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
if c.clientConn == nil {
|
if c.clientConn == nil {
|
||||||
conn, err := grpc.Dial(string(target), r.manager.dialOptions...)
|
conn, err := grpc.NewClient(string(target), r.manager.dialOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -74,7 +76,14 @@ func (r raftAPI) AppendEntries(id raft.ServerID, target raft.ServerAddress, args
|
|||||||
ctx, cancel = context.WithTimeout(ctx, r.manager.heartbeatTimeout)
|
ctx, cancel = context.WithTimeout(ctx, r.manager.heartbeatTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
ret, err := c.AppendEntries(ctx, encodeAppendEntriesRequest(args))
|
appendEntriesRequest := encodeAppendEntriesRequest(args)
|
||||||
|
ret, err := c.AppendEntries(ctx, appendEntriesRequest)
|
||||||
|
if statusErr, ok := status.FromError(err); ok && statusErr.Code() == codes.ResourceExhausted {
|
||||||
|
chunkedRet, chunkedErr := r.appendEntriesChunked(ctx, r.manager.appendEntriesChunkSize, c, appendEntriesRequest)
|
||||||
|
if statusErr, ok := status.FromError(chunkedErr); ok && statusErr.Code() != codes.Unimplemented {
|
||||||
|
ret, err = chunkedRet, chunkedErr
|
||||||
|
}
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -82,6 +91,21 @@ func (r raftAPI) AppendEntries(id raft.ServerID, target raft.ServerAddress, args
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AppendEntries sends the appropriate RPC to the target node.
|
||||||
|
func (r raftAPI) appendEntriesChunked(ctx context.Context, chunkSize int, c pb.RaftTransportClient, appendEntriesRequest *pb.AppendEntriesRequest) (*pb.AppendEntriesResponse, error) {
|
||||||
|
stream, err := c.AppendEntriesChunked(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return &pb.AppendEntriesResponse{}, err
|
||||||
|
}
|
||||||
|
defer stream.CloseSend()
|
||||||
|
|
||||||
|
if err := sendAppendEntriesChunkedRequest(chunkSize, stream, appendEntriesRequest); err != nil {
|
||||||
|
return &pb.AppendEntriesResponse{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream.CloseAndRecv()
|
||||||
|
}
|
||||||
|
|
||||||
// RequestVote sends the appropriate RPC to the target node.
|
// RequestVote sends the appropriate RPC to the target node.
|
||||||
func (r raftAPI) RequestVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestVoteRequest, resp *raft.RequestVoteResponse) error {
|
func (r raftAPI) RequestVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestVoteRequest, resp *raft.RequestVoteResponse) error {
|
||||||
c, err := r.getPeer(target)
|
c, err := r.getPeer(target)
|
||||||
@ -161,6 +185,11 @@ func (r raftAPI) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, re
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type AppendEntriesPipelineInterface interface {
|
||||||
|
grpc.ClientStream
|
||||||
|
Recv() (*pb.AppendEntriesResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
// AppendEntriesPipeline returns an interface that can be used to pipeline
|
// AppendEntriesPipeline returns an interface that can be used to pipeline
|
||||||
// AppendEntries requests.
|
// AppendEntries requests.
|
||||||
func (r raftAPI) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error) {
|
func (r raftAPI) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error) {
|
||||||
@ -170,38 +199,52 @@ func (r raftAPI) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddre
|
|||||||
}
|
}
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
stream, err := c.AppendEntriesPipeline(ctx)
|
var stream AppendEntriesPipelineInterface
|
||||||
|
stream, err = c.AppendEntriesChunkedPipeline(ctx)
|
||||||
|
if statusErr, ok := status.FromError(err); ok && statusErr.Code() == codes.Unimplemented {
|
||||||
|
stream, err = c.AppendEntriesPipeline(ctx)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
rpa := raftPipelineAPI{
|
rpa := &raftPipelineAPI{
|
||||||
stream: stream,
|
stream: stream,
|
||||||
cancel: cancel,
|
appendEntriesChunkSize: r.manager.appendEntriesChunkSize,
|
||||||
inflightCh: make(chan *appendFuture, 20),
|
cancel: cancel,
|
||||||
doneCh: make(chan raft.AppendFuture, 20),
|
inflightCh: make(chan *appendFuture, 20),
|
||||||
|
doneCh: make(chan raft.AppendFuture, 20),
|
||||||
}
|
}
|
||||||
go rpa.receiver()
|
go rpa.receiver()
|
||||||
return rpa, nil
|
return rpa, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type raftPipelineAPI struct {
|
type raftPipelineAPI struct {
|
||||||
stream pb.RaftTransport_AppendEntriesPipelineClient
|
stream AppendEntriesPipelineInterface
|
||||||
cancel func()
|
appendEntriesChunkSize int
|
||||||
inflightChMtx sync.Mutex
|
cancel func()
|
||||||
inflightCh chan *appendFuture
|
inflightChMtx sync.Mutex
|
||||||
doneCh chan raft.AppendFuture
|
inflightCh chan *appendFuture
|
||||||
|
doneCh chan raft.AppendFuture
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendEntries is used to add another request to the pipeline.
|
// AppendEntries is used to add another request to the pipeline.
|
||||||
// The send may block which is an effective form of back-pressure.
|
// The send may block which is an effective form of back-pressure.
|
||||||
func (r raftPipelineAPI) AppendEntries(req *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) (raft.AppendFuture, error) {
|
func (r *raftPipelineAPI) AppendEntries(req *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) (raft.AppendFuture, error) {
|
||||||
af := &appendFuture{
|
af := &appendFuture{
|
||||||
start: time.Now(),
|
start: time.Now(),
|
||||||
request: req,
|
request: req,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
if err := r.stream.Send(encodeAppendEntriesRequest(req)); err != nil {
|
var err error
|
||||||
|
appendEntriesRequest := encodeAppendEntriesRequest(req)
|
||||||
|
switch stream := r.stream.(type) {
|
||||||
|
case pb.RaftTransport_AppendEntriesPipelineClient:
|
||||||
|
err = stream.Send(appendEntriesRequest)
|
||||||
|
case pb.RaftTransport_AppendEntriesChunkedPipelineClient:
|
||||||
|
err = sendAppendEntriesChunkedRequest(r.appendEntriesChunkSize, stream, appendEntriesRequest)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
r.inflightChMtx.Lock()
|
r.inflightChMtx.Lock()
|
||||||
@ -216,12 +259,12 @@ func (r raftPipelineAPI) AppendEntries(req *raft.AppendEntriesRequest, resp *raf
|
|||||||
|
|
||||||
// Consumer returns a channel that can be used to consume
|
// Consumer returns a channel that can be used to consume
|
||||||
// response futures when they are ready.
|
// response futures when they are ready.
|
||||||
func (r raftPipelineAPI) Consumer() <-chan raft.AppendFuture {
|
func (r *raftPipelineAPI) Consumer() <-chan raft.AppendFuture {
|
||||||
return r.doneCh
|
return r.doneCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the pipeline and cancels all inflight RPCs
|
// Close closes the pipeline and cancels all inflight RPCs
|
||||||
func (r raftPipelineAPI) Close() error {
|
func (r *raftPipelineAPI) Close() error {
|
||||||
r.cancel()
|
r.cancel()
|
||||||
r.inflightChMtx.Lock()
|
r.inflightChMtx.Lock()
|
||||||
close(r.inflightCh)
|
close(r.inflightCh)
|
||||||
@ -229,7 +272,7 @@ func (r raftPipelineAPI) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r raftPipelineAPI) receiver() {
|
func (r *raftPipelineAPI) receiver() {
|
||||||
for af := range r.inflightCh {
|
for af := range r.inflightCh {
|
||||||
msg, err := r.stream.Recv()
|
msg, err := r.stream.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -319,8 +362,8 @@ func (r raftAPI) Disconnect(target raft.ServerAddress) {
|
|||||||
r.manager.connectionsMtx.Unlock()
|
r.manager.connectionsMtx.Unlock()
|
||||||
if ok {
|
if ok {
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
c.mtx.Unlock()
|
|
||||||
_ = c.clientConn.Close()
|
_ = c.clientConn.Close()
|
||||||
|
c.mtx.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
79
pkg/raft/transport/chunking.go
Normal file
79
pkg/raft/transport/chunking.go
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
package transport
|
||||||
|
|
||||||
|
import (
|
||||||
|
pb "deevirt.fr/compute/pkg/api/proto"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AppendEntriesChunkedRequestStreamSender interface {
|
||||||
|
Send(*pb.AppendEntriesChunkedRequest) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendAppendEntriesChunkedRequest(chunkSize int, stream AppendEntriesChunkedRequestStreamSender, appendEntriesRequest *pb.AppendEntriesRequest) error {
|
||||||
|
reqBuf, err := proto.Marshal(appendEntriesRequest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reqSize := len(reqBuf)
|
||||||
|
numChunks := reqSize / chunkSize
|
||||||
|
if reqSize%chunkSize != 0 {
|
||||||
|
numChunks++
|
||||||
|
}
|
||||||
|
|
||||||
|
remainingBytes := reqSize
|
||||||
|
for chunkIdx := 0; chunkIdx < numChunks; chunkIdx++ {
|
||||||
|
lowerBound := chunkIdx * chunkSize
|
||||||
|
upperBound := (chunkIdx + 1) * chunkSize
|
||||||
|
if reqSize < upperBound {
|
||||||
|
upperBound = reqSize
|
||||||
|
}
|
||||||
|
|
||||||
|
remainingBytes -= upperBound - lowerBound
|
||||||
|
chunk := &pb.AppendEntriesChunkedRequest{
|
||||||
|
RemainingBytes: int64(remainingBytes),
|
||||||
|
Chunk: reqBuf[lowerBound:upperBound],
|
||||||
|
}
|
||||||
|
if err := stream.Send(chunk); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type AppendEntriesChunkedRequestStreamReceiver interface {
|
||||||
|
Recv() (*pb.AppendEntriesChunkedRequest, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func receiveAppendEntriesChunkedRequest(stream AppendEntriesChunkedRequestStreamReceiver) (*pb.AppendEntriesRequest, error) {
|
||||||
|
var reqBuf []byte
|
||||||
|
|
||||||
|
chunk, err := stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return &pb.AppendEntriesRequest{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if chunk.RemainingBytes == 0 {
|
||||||
|
reqBuf = chunk.Chunk
|
||||||
|
} else {
|
||||||
|
reqBuf = make([]byte, len(chunk.Chunk)+int(chunk.RemainingBytes))
|
||||||
|
lowerBound := copy(reqBuf, chunk.Chunk)
|
||||||
|
|
||||||
|
for chunk.RemainingBytes > 0 {
|
||||||
|
chunk, err = stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return &pb.AppendEntriesRequest{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lowerBound += copy(reqBuf[lowerBound:], chunk.Chunk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
appendEntriesRequest := new(pb.AppendEntriesRequest)
|
||||||
|
if err := proto.Unmarshal(reqBuf, appendEntriesRequest); err != nil {
|
||||||
|
return &pb.AppendEntriesRequest{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return appendEntriesRequest, nil
|
||||||
|
}
|
@ -1,7 +1,7 @@
|
|||||||
package transport
|
package transport
|
||||||
|
|
||||||
import (
|
import (
|
||||||
pb "github.com/Jille/raft-grpc-transport/proto"
|
pb "deevirt.fr/compute/pkg/api/proto"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
)
|
)
|
||||||
|
|
@ -4,7 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
pb "github.com/Jille/raft-grpc-transport/proto"
|
pb "deevirt.fr/compute/pkg/api/proto"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -60,6 +60,20 @@ func (g gRPCAPI) AppendEntries(ctx context.Context, req *pb.AppendEntriesRequest
|
|||||||
return encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse)), nil
|
return encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g gRPCAPI) AppendEntriesChunked(stream pb.RaftTransport_AppendEntriesChunkedServer) error {
|
||||||
|
appendEntriesRequest, err := receiveAppendEntriesChunkedRequest(stream)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := g.handleRPC(decodeAppendEntriesRequest(appendEntriesRequest), nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream.SendAndClose(encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse)))
|
||||||
|
}
|
||||||
|
|
||||||
func (g gRPCAPI) RequestVote(ctx context.Context, req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) {
|
func (g gRPCAPI) RequestVote(ctx context.Context, req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) {
|
||||||
resp, err := g.handleRPC(decodeRequestVoteRequest(req), nil)
|
resp, err := g.handleRPC(decodeRequestVoteRequest(req), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -137,10 +151,26 @@ func (g gRPCAPI) AppendEntriesPipeline(s pb.RaftTransport_AppendEntriesPipelineS
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g gRPCAPI) AppendEntriesChunkedPipeline(s pb.RaftTransport_AppendEntriesChunkedPipelineServer) error {
|
||||||
|
for {
|
||||||
|
msg, err := receiveAppendEntriesChunkedRequest(s)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp, err := g.handleRPC(decodeAppendEntriesRequest(msg), nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := s.Send(encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func isHeartbeat(command interface{}) bool {
|
func isHeartbeat(command interface{}) bool {
|
||||||
req, ok := command.(*raft.AppendEntriesRequest)
|
req, ok := command.(*raft.AppendEntriesRequest)
|
||||||
if !ok {
|
if !ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return req.Term != 0 && len(req.Leader) != 0 && req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && len(req.Entries) == 0 && req.LeaderCommitIndex == 0
|
return req.Term != 0 && len(req.Addr) != 0 && req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && len(req.Entries) == 0 && req.LeaderCommitIndex == 0
|
||||||
}
|
}
|
23
pkg/raft/transport/options.go
Normal file
23
pkg/raft/transport/options.go
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
package transport
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type Option func(m *Manager)
|
||||||
|
|
||||||
|
// WithHeartbeatTimeout configures the transport to not wait for more than d
|
||||||
|
// for a heartbeat to be executed by a remote peer.
|
||||||
|
func WithHeartbeatTimeout(d time.Duration) Option {
|
||||||
|
return func(m *Manager) {
|
||||||
|
m.heartbeatTimeout = d
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithAppendEntriesChunkSize configures the chunk size to use when switching
|
||||||
|
// to chunked AppendEntries. The default value is 4MB (the gRPC default), but
|
||||||
|
// as there is no way to auto-discover that value, it's up to the developer
|
||||||
|
// to configure this, if the default value is not appropriate.
|
||||||
|
func WithAppendEntriesChunkSize(v int) Option {
|
||||||
|
return func(m *Manager) {
|
||||||
|
m.appendEntriesChunkSize = v
|
||||||
|
}
|
||||||
|
}
|
@ -1,16 +1,16 @@
|
|||||||
package transport
|
package transport
|
||||||
|
|
||||||
import (
|
import (
|
||||||
pb "github.com/Jille/raft-grpc-transport/proto"
|
pb "deevirt.fr/compute/pkg/api/proto"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func encodeAppendEntriesRequest(s *raft.AppendEntriesRequest) *pb.AppendEntriesRequest {
|
func encodeAppendEntriesRequest(s *raft.AppendEntriesRequest) *pb.AppendEntriesRequest {
|
||||||
return &pb.AppendEntriesRequest{
|
return &pb.AppendEntriesRequest{
|
||||||
RpcHeader: encodeRPCHeader(s.RPCHeader),
|
RpcHeader: encodeRPCHeader(s.RPCHeader),
|
||||||
Term: s.Term,
|
Term: s.Term,
|
||||||
Leader: s.Leader,
|
//Leader: s.Leader,
|
||||||
PrevLogEntry: s.PrevLogEntry,
|
PrevLogEntry: s.PrevLogEntry,
|
||||||
PrevLogTerm: s.PrevLogTerm,
|
PrevLogTerm: s.PrevLogTerm,
|
||||||
Entries: encodeLogs(s.Entries),
|
Entries: encodeLogs(s.Entries),
|
||||||
@ -76,9 +76,9 @@ func encodeAppendEntriesResponse(s *raft.AppendEntriesResponse) *pb.AppendEntrie
|
|||||||
|
|
||||||
func encodeRequestVoteRequest(s *raft.RequestVoteRequest) *pb.RequestVoteRequest {
|
func encodeRequestVoteRequest(s *raft.RequestVoteRequest) *pb.RequestVoteRequest {
|
||||||
return &pb.RequestVoteRequest{
|
return &pb.RequestVoteRequest{
|
||||||
RpcHeader: encodeRPCHeader(s.RPCHeader),
|
RpcHeader: encodeRPCHeader(s.RPCHeader),
|
||||||
Term: s.Term,
|
Term: s.Term,
|
||||||
Candidate: s.Candidate,
|
//Candidate: s.Candidate,
|
||||||
LastLogIndex: s.LastLogIndex,
|
LastLogIndex: s.LastLogIndex,
|
||||||
LastLogTerm: s.LastLogTerm,
|
LastLogTerm: s.LastLogTerm,
|
||||||
LeadershipTransfer: s.LeadershipTransfer,
|
LeadershipTransfer: s.LeadershipTransfer,
|
@ -5,7 +5,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
pb "github.com/Jille/raft-grpc-transport/proto"
|
pb "deevirt.fr/compute/pkg/api/proto"
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
@ -25,8 +25,9 @@ type Manager struct {
|
|||||||
heartbeatFuncMtx sync.Mutex
|
heartbeatFuncMtx sync.Mutex
|
||||||
heartbeatTimeout time.Duration
|
heartbeatTimeout time.Duration
|
||||||
|
|
||||||
connectionsMtx sync.Mutex
|
connectionsMtx sync.Mutex
|
||||||
connections map[raft.ServerAddress]*conn
|
connections map[raft.ServerAddress]*conn
|
||||||
|
appendEntriesChunkSize int
|
||||||
|
|
||||||
shutdown bool
|
shutdown bool
|
||||||
shutdownCh chan struct{}
|
shutdownCh chan struct{}
|
||||||
@ -39,8 +40,9 @@ func New(localAddress raft.ServerAddress, dialOptions []grpc.DialOption, options
|
|||||||
localAddress: localAddress,
|
localAddress: localAddress,
|
||||||
dialOptions: dialOptions,
|
dialOptions: dialOptions,
|
||||||
|
|
||||||
rpcChan: make(chan raft.RPC),
|
rpcChan: make(chan raft.RPC),
|
||||||
connections: map[raft.ServerAddress]*conn{},
|
connections: map[raft.ServerAddress]*conn{},
|
||||||
|
appendEntriesChunkSize: 4*1024*1024 - 10, // same as gRPC default value (minus some overhead)
|
||||||
|
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
@ -81,8 +83,8 @@ func (m *Manager) disconnectAll() error {
|
|||||||
for k, conn := range m.connections {
|
for k, conn := range m.connections {
|
||||||
// Lock conn.mtx to ensure Dial() is complete
|
// Lock conn.mtx to ensure Dial() is complete
|
||||||
conn.mtx.Lock()
|
conn.mtx.Lock()
|
||||||
conn.mtx.Unlock()
|
|
||||||
closeErr := conn.clientConn.Close()
|
closeErr := conn.clientConn.Close()
|
||||||
|
conn.mtx.Unlock()
|
||||||
if closeErr != nil {
|
if closeErr != nil {
|
||||||
err = multierror.Append(err, closeErr)
|
err = multierror.Append(err, closeErr)
|
||||||
}
|
}
|
25
vendor/github.com/Jille/raft-grpc-transport/LICENSE
generated
vendored
25
vendor/github.com/Jille/raft-grpc-transport/LICENSE
generated
vendored
@ -1,25 +0,0 @@
|
|||||||
BSD 2-Clause License
|
|
||||||
|
|
||||||
Copyright (c) 2020, Jille Timmermans
|
|
||||||
All rights reserved.
|
|
||||||
|
|
||||||
Redistribution and use in source and binary forms, with or without
|
|
||||||
modification, are permitted provided that the following conditions are met:
|
|
||||||
|
|
||||||
1. Redistributions of source code must retain the above copyright notice, this
|
|
||||||
list of conditions and the following disclaimer.
|
|
||||||
|
|
||||||
2. Redistributions in binary form must reproduce the above copyright notice,
|
|
||||||
this list of conditions and the following disclaimer in the documentation
|
|
||||||
and/or other materials provided with the distribution.
|
|
||||||
|
|
||||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
||||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
||||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
||||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
|
||||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
||||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
|
||||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
||||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
|
||||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
||||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
20
vendor/github.com/Jille/raft-grpc-transport/README.md
generated
vendored
20
vendor/github.com/Jille/raft-grpc-transport/README.md
generated
vendored
@ -1,20 +0,0 @@
|
|||||||
# raft-grpc-transport
|
|
||||||
|
|
||||||
[](https://godoc.org/github.com/Jille/raft-grpc-transport)
|
|
||||||
|
|
||||||
This library provides a [Transport](https://godoc.org/github.com/hashicorp/raft#Transport) for https://github.com/hashicorp/raft over gRPC.
|
|
||||||
|
|
||||||
One benefit of this is that gRPC is easy to multiplex over a single port.
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
|
|
||||||
```go
|
|
||||||
// ...
|
|
||||||
tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{grpc.WithInsecure()})
|
|
||||||
s := grpc.NewServer()
|
|
||||||
tm.Register(s)
|
|
||||||
r, err := raft.NewRaft(..., tm.Transport())
|
|
||||||
// ...
|
|
||||||
```
|
|
||||||
|
|
||||||
Want more example code? Check out main.go at https://github.com/Jille/raft-grpc-example
|
|
13
vendor/github.com/Jille/raft-grpc-transport/options.go
generated
vendored
13
vendor/github.com/Jille/raft-grpc-transport/options.go
generated
vendored
@ -1,13 +0,0 @@
|
|||||||
package transport
|
|
||||||
|
|
||||||
import "time"
|
|
||||||
|
|
||||||
type Option func(m *Manager)
|
|
||||||
|
|
||||||
// WithHeartbeatTimeout configures the transport to not wait for more than d
|
|
||||||
// for a heartbeat to be executed by a remote peer.
|
|
||||||
func WithHeartbeatTimeout(d time.Duration) Option {
|
|
||||||
return func(m *Manager) {
|
|
||||||
m.heartbeatTimeout = d
|
|
||||||
}
|
|
||||||
}
|
|
6
vendor/github.com/Jille/raft-grpc-transport/proto/Makefile
generated
vendored
6
vendor/github.com/Jille/raft-grpc-transport/proto/Makefile
generated
vendored
@ -1,6 +0,0 @@
|
|||||||
transport.pb.go: transport.proto
|
|
||||||
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative transport.proto
|
|
||||||
|
|
||||||
force:
|
|
||||||
rm -f transport.pb.go
|
|
||||||
make transport.pb.go
|
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@ -1,7 +1,5 @@
|
|||||||
# github.com/Jille/raft-grpc-transport v1.6.1
|
# github.com/Jille/raft-grpc-transport v1.6.1
|
||||||
## explicit; go 1.13
|
## explicit; go 1.13
|
||||||
github.com/Jille/raft-grpc-transport
|
|
||||||
github.com/Jille/raft-grpc-transport/proto
|
|
||||||
# github.com/armon/go-metrics v0.4.1
|
# github.com/armon/go-metrics v0.4.1
|
||||||
## explicit; go 1.12
|
## explicit; go 1.12
|
||||||
github.com/armon/go-metrics
|
github.com/armon/go-metrics
|
||||||
|
Loading…
x
Reference in New Issue
Block a user