Compare commits

...

2 Commits

18 changed files with 887 additions and 511 deletions

View File

@ -1,8 +1,8 @@
package main package main
import ( import (
"deevirt.fr/compute/cmd/compute_qemu/events" "deevirt.fr/compute/cmd/monitor/events"
"deevirt.fr/compute/cmd/compute_qemu/metrics" "deevirt.fr/compute/cmd/monitor/metrics"
) )
func main() { func main() {

View File

@ -1,15 +1,19 @@
syntax = "proto3"; syntax = "proto3";
option go_package = "github.com/Jille/raft-grpc-transport/proto"; option go_package = "./proto";
import "google/protobuf/timestamp.proto"; import "google/protobuf/timestamp.proto";
package raft;
service RaftTransport { service RaftTransport {
// AppendEntriesPipeline opens an AppendEntries message stream. // AppendEntriesPipeline opens an AppendEntries message stream.
rpc AppendEntriesPipeline(stream AppendEntriesRequest) returns (stream AppendEntriesResponse) {} rpc AppendEntriesPipeline(stream AppendEntriesRequest) returns (stream AppendEntriesResponse) {}
rpc AppendEntriesChunkedPipeline(stream AppendEntriesChunkedRequest) returns (stream AppendEntriesResponse) {}
// AppendEntries performs a single append entries request / response. // AppendEntries performs a single append entries request / response.
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse) {} rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse) {}
// AppendEntries performs a single append entries request / response for request larger than the max grpc message size.
rpc AppendEntriesChunked(stream AppendEntriesChunkedRequest) returns (AppendEntriesResponse) {}
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election. // RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
rpc RequestVote(RequestVoteRequest) returns (RequestVoteResponse) {} rpc RequestVote(RequestVoteRequest) returns (RequestVoteResponse) {}
// TimeoutNow is used to start a leadership transfer to the target node. // TimeoutNow is used to start a leadership transfer to the target node.
@ -53,6 +57,11 @@ message AppendEntriesRequest {
uint64 leader_commit_index = 7; uint64 leader_commit_index = 7;
} }
message AppendEntriesChunkedRequest {
int64 remaining_bytes = 1; // number of bytes of the same request AFTER this chunk
bytes chunk = 2;
}
message AppendEntriesResponse { message AppendEntriesResponse {
RPCHeader rpc_header = 1; RPCHeader rpc_header = 1;
uint64 term = 2; uint64 term = 2;
@ -119,4 +128,4 @@ message RequestPreVoteResponse {
RPCHeader rpc_header = 1; RPCHeader rpc_header = 1;
uint64 term = 2; uint64 term = 2;
bool granted = 3; bool granted = 3;
} }

View File

@ -1,8 +1,8 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions: // versions:
// - protoc-gen-go-grpc v1.2.0 // - protoc-gen-go-grpc v1.5.1
// - protoc v3.21.3 // - protoc v3.14.0
// source: transport.proto // source: proto/raft_transport.proto
package proto package proto
@ -15,23 +15,37 @@ import (
// This is a compile-time assertion to ensure that this generated file // This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against. // is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later. // Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion7 const _ = grpc.SupportPackageIsVersion9
const (
RaftTransport_AppendEntriesPipeline_FullMethodName = "/raft.RaftTransport/AppendEntriesPipeline"
RaftTransport_AppendEntriesChunkedPipeline_FullMethodName = "/raft.RaftTransport/AppendEntriesChunkedPipeline"
RaftTransport_AppendEntries_FullMethodName = "/raft.RaftTransport/AppendEntries"
RaftTransport_AppendEntriesChunked_FullMethodName = "/raft.RaftTransport/AppendEntriesChunked"
RaftTransport_RequestVote_FullMethodName = "/raft.RaftTransport/RequestVote"
RaftTransport_TimeoutNow_FullMethodName = "/raft.RaftTransport/TimeoutNow"
RaftTransport_InstallSnapshot_FullMethodName = "/raft.RaftTransport/InstallSnapshot"
RaftTransport_RequestPreVote_FullMethodName = "/raft.RaftTransport/RequestPreVote"
)
// RaftTransportClient is the client API for RaftTransport service. // RaftTransportClient is the client API for RaftTransport service.
// //
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type RaftTransportClient interface { type RaftTransportClient interface {
// AppendEntriesPipeline opens an AppendEntries message stream. // AppendEntriesPipeline opens an AppendEntries message stream.
AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_AppendEntriesPipelineClient, error) AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AppendEntriesRequest, AppendEntriesResponse], error)
AppendEntriesChunkedPipeline(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse], error)
// AppendEntries performs a single append entries request / response. // AppendEntries performs a single append entries request / response.
AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error) AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error)
// AppendEntries performs a single append entries request / response for request larger than the max grpc message size.
AppendEntriesChunked(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse], error)
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election. // RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error) RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error)
// TimeoutNow is used to start a leadership transfer to the target node. // TimeoutNow is used to start a leadership transfer to the target node.
TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error) TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error)
// InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer. // InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.
InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_InstallSnapshotClient, error) InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[InstallSnapshotRequest, InstallSnapshotResponse], error)
// RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election. // RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election.
RequestPreVote(ctx context.Context, in *RequestPreVoteRequest, opts ...grpc.CallOption) (*RequestPreVoteResponse, error) RequestPreVote(ctx context.Context, in *RequestPreVoteRequest, opts ...grpc.CallOption) (*RequestPreVoteResponse, error)
} }
@ -44,49 +58,59 @@ func NewRaftTransportClient(cc grpc.ClientConnInterface) RaftTransportClient {
return &raftTransportClient{cc} return &raftTransportClient{cc}
} }
func (c *raftTransportClient) AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_AppendEntriesPipelineClient, error) { func (c *raftTransportClient) AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AppendEntriesRequest, AppendEntriesResponse], error) {
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[0], "/RaftTransport/AppendEntriesPipeline", opts...) cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[0], RaftTransport_AppendEntriesPipeline_FullMethodName, cOpts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
x := &raftTransportAppendEntriesPipelineClient{stream} x := &grpc.GenericClientStream[AppendEntriesRequest, AppendEntriesResponse]{ClientStream: stream}
return x, nil return x, nil
} }
type RaftTransport_AppendEntriesPipelineClient interface { // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
Send(*AppendEntriesRequest) error type RaftTransport_AppendEntriesPipelineClient = grpc.BidiStreamingClient[AppendEntriesRequest, AppendEntriesResponse]
Recv() (*AppendEntriesResponse, error)
grpc.ClientStream
}
type raftTransportAppendEntriesPipelineClient struct { func (c *raftTransportClient) AppendEntriesChunkedPipeline(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse], error) {
grpc.ClientStream cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
} stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[1], RaftTransport_AppendEntriesChunkedPipeline_FullMethodName, cOpts...)
if err != nil {
func (x *raftTransportAppendEntriesPipelineClient) Send(m *AppendEntriesRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *raftTransportAppendEntriesPipelineClient) Recv() (*AppendEntriesResponse, error) {
m := new(AppendEntriesResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err return nil, err
} }
return m, nil x := &grpc.GenericClientStream[AppendEntriesChunkedRequest, AppendEntriesResponse]{ClientStream: stream}
return x, nil
} }
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RaftTransport_AppendEntriesChunkedPipelineClient = grpc.BidiStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse]
func (c *raftTransportClient) AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error) { func (c *raftTransportClient) AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(AppendEntriesResponse) out := new(AppendEntriesResponse)
err := c.cc.Invoke(ctx, "/RaftTransport/AppendEntries", in, out, opts...) err := c.cc.Invoke(ctx, RaftTransport_AppendEntries_FullMethodName, in, out, cOpts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return out, nil return out, nil
} }
func (c *raftTransportClient) AppendEntriesChunked(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[2], RaftTransport_AppendEntriesChunked_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[AppendEntriesChunkedRequest, AppendEntriesResponse]{ClientStream: stream}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RaftTransport_AppendEntriesChunkedClient = grpc.ClientStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse]
func (c *raftTransportClient) RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error) { func (c *raftTransportClient) RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(RequestVoteResponse) out := new(RequestVoteResponse)
err := c.cc.Invoke(ctx, "/RaftTransport/RequestVote", in, out, opts...) err := c.cc.Invoke(ctx, RaftTransport_RequestVote_FullMethodName, in, out, cOpts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -94,51 +118,32 @@ func (c *raftTransportClient) RequestVote(ctx context.Context, in *RequestVoteRe
} }
func (c *raftTransportClient) TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error) { func (c *raftTransportClient) TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(TimeoutNowResponse) out := new(TimeoutNowResponse)
err := c.cc.Invoke(ctx, "/RaftTransport/TimeoutNow", in, out, opts...) err := c.cc.Invoke(ctx, RaftTransport_TimeoutNow_FullMethodName, in, out, cOpts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return out, nil return out, nil
} }
func (c *raftTransportClient) InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_InstallSnapshotClient, error) { func (c *raftTransportClient) InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[InstallSnapshotRequest, InstallSnapshotResponse], error) {
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[1], "/RaftTransport/InstallSnapshot", opts...) cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[3], RaftTransport_InstallSnapshot_FullMethodName, cOpts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
x := &raftTransportInstallSnapshotClient{stream} x := &grpc.GenericClientStream[InstallSnapshotRequest, InstallSnapshotResponse]{ClientStream: stream}
return x, nil return x, nil
} }
type RaftTransport_InstallSnapshotClient interface { // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
Send(*InstallSnapshotRequest) error type RaftTransport_InstallSnapshotClient = grpc.ClientStreamingClient[InstallSnapshotRequest, InstallSnapshotResponse]
CloseAndRecv() (*InstallSnapshotResponse, error)
grpc.ClientStream
}
type raftTransportInstallSnapshotClient struct {
grpc.ClientStream
}
func (x *raftTransportInstallSnapshotClient) Send(m *InstallSnapshotRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *raftTransportInstallSnapshotClient) CloseAndRecv() (*InstallSnapshotResponse, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(InstallSnapshotResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *raftTransportClient) RequestPreVote(ctx context.Context, in *RequestPreVoteRequest, opts ...grpc.CallOption) (*RequestPreVoteResponse, error) { func (c *raftTransportClient) RequestPreVote(ctx context.Context, in *RequestPreVoteRequest, opts ...grpc.CallOption) (*RequestPreVoteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(RequestPreVoteResponse) out := new(RequestPreVoteResponse)
err := c.cc.Invoke(ctx, "/RaftTransport/RequestPreVote", in, out, opts...) err := c.cc.Invoke(ctx, RaftTransport_RequestPreVote_FullMethodName, in, out, cOpts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -147,46 +152,59 @@ func (c *raftTransportClient) RequestPreVote(ctx context.Context, in *RequestPre
// RaftTransportServer is the server API for RaftTransport service. // RaftTransportServer is the server API for RaftTransport service.
// All implementations must embed UnimplementedRaftTransportServer // All implementations must embed UnimplementedRaftTransportServer
// for forward compatibility // for forward compatibility.
type RaftTransportServer interface { type RaftTransportServer interface {
// AppendEntriesPipeline opens an AppendEntries message stream. // AppendEntriesPipeline opens an AppendEntries message stream.
AppendEntriesPipeline(RaftTransport_AppendEntriesPipelineServer) error AppendEntriesPipeline(grpc.BidiStreamingServer[AppendEntriesRequest, AppendEntriesResponse]) error
AppendEntriesChunkedPipeline(grpc.BidiStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]) error
// AppendEntries performs a single append entries request / response. // AppendEntries performs a single append entries request / response.
AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error) AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error)
// AppendEntries performs a single append entries request / response for request larger than the max grpc message size.
AppendEntriesChunked(grpc.ClientStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]) error
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election. // RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error) RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error)
// TimeoutNow is used to start a leadership transfer to the target node. // TimeoutNow is used to start a leadership transfer to the target node.
TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error) TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error)
// InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer. // InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.
InstallSnapshot(RaftTransport_InstallSnapshotServer) error InstallSnapshot(grpc.ClientStreamingServer[InstallSnapshotRequest, InstallSnapshotResponse]) error
// RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election. // RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election.
RequestPreVote(context.Context, *RequestPreVoteRequest) (*RequestPreVoteResponse, error) RequestPreVote(context.Context, *RequestPreVoteRequest) (*RequestPreVoteResponse, error)
mustEmbedUnimplementedRaftTransportServer() mustEmbedUnimplementedRaftTransportServer()
} }
// UnimplementedRaftTransportServer must be embedded to have forward compatible implementations. // UnimplementedRaftTransportServer must be embedded to have
type UnimplementedRaftTransportServer struct { // forward compatible implementations.
} //
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedRaftTransportServer struct{}
func (UnimplementedRaftTransportServer) AppendEntriesPipeline(RaftTransport_AppendEntriesPipelineServer) error { func (UnimplementedRaftTransportServer) AppendEntriesPipeline(grpc.BidiStreamingServer[AppendEntriesRequest, AppendEntriesResponse]) error {
return status.Errorf(codes.Unimplemented, "method AppendEntriesPipeline not implemented") return status.Errorf(codes.Unimplemented, "method AppendEntriesPipeline not implemented")
} }
func (UnimplementedRaftTransportServer) AppendEntriesChunkedPipeline(grpc.BidiStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]) error {
return status.Errorf(codes.Unimplemented, "method AppendEntriesChunkedPipeline not implemented")
}
func (UnimplementedRaftTransportServer) AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error) { func (UnimplementedRaftTransportServer) AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AppendEntries not implemented") return nil, status.Errorf(codes.Unimplemented, "method AppendEntries not implemented")
} }
func (UnimplementedRaftTransportServer) AppendEntriesChunked(grpc.ClientStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]) error {
return status.Errorf(codes.Unimplemented, "method AppendEntriesChunked not implemented")
}
func (UnimplementedRaftTransportServer) RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error) { func (UnimplementedRaftTransportServer) RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RequestVote not implemented") return nil, status.Errorf(codes.Unimplemented, "method RequestVote not implemented")
} }
func (UnimplementedRaftTransportServer) TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error) { func (UnimplementedRaftTransportServer) TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method TimeoutNow not implemented") return nil, status.Errorf(codes.Unimplemented, "method TimeoutNow not implemented")
} }
func (UnimplementedRaftTransportServer) InstallSnapshot(RaftTransport_InstallSnapshotServer) error { func (UnimplementedRaftTransportServer) InstallSnapshot(grpc.ClientStreamingServer[InstallSnapshotRequest, InstallSnapshotResponse]) error {
return status.Errorf(codes.Unimplemented, "method InstallSnapshot not implemented") return status.Errorf(codes.Unimplemented, "method InstallSnapshot not implemented")
} }
func (UnimplementedRaftTransportServer) RequestPreVote(context.Context, *RequestPreVoteRequest) (*RequestPreVoteResponse, error) { func (UnimplementedRaftTransportServer) RequestPreVote(context.Context, *RequestPreVoteRequest) (*RequestPreVoteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RequestPreVote not implemented") return nil, status.Errorf(codes.Unimplemented, "method RequestPreVote not implemented")
} }
func (UnimplementedRaftTransportServer) mustEmbedUnimplementedRaftTransportServer() {} func (UnimplementedRaftTransportServer) mustEmbedUnimplementedRaftTransportServer() {}
func (UnimplementedRaftTransportServer) testEmbeddedByValue() {}
// UnsafeRaftTransportServer may be embedded to opt out of forward compatibility for this service. // UnsafeRaftTransportServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to RaftTransportServer will // Use of this interface is not recommended, as added methods to RaftTransportServer will
@ -196,34 +214,29 @@ type UnsafeRaftTransportServer interface {
} }
func RegisterRaftTransportServer(s grpc.ServiceRegistrar, srv RaftTransportServer) { func RegisterRaftTransportServer(s grpc.ServiceRegistrar, srv RaftTransportServer) {
// If the following call pancis, it indicates UnimplementedRaftTransportServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&RaftTransport_ServiceDesc, srv) s.RegisterService(&RaftTransport_ServiceDesc, srv)
} }
func _RaftTransport_AppendEntriesPipeline_Handler(srv interface{}, stream grpc.ServerStream) error { func _RaftTransport_AppendEntriesPipeline_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RaftTransportServer).AppendEntriesPipeline(&raftTransportAppendEntriesPipelineServer{stream}) return srv.(RaftTransportServer).AppendEntriesPipeline(&grpc.GenericServerStream[AppendEntriesRequest, AppendEntriesResponse]{ServerStream: stream})
} }
type RaftTransport_AppendEntriesPipelineServer interface { // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
Send(*AppendEntriesResponse) error type RaftTransport_AppendEntriesPipelineServer = grpc.BidiStreamingServer[AppendEntriesRequest, AppendEntriesResponse]
Recv() (*AppendEntriesRequest, error)
grpc.ServerStream func _RaftTransport_AppendEntriesChunkedPipeline_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RaftTransportServer).AppendEntriesChunkedPipeline(&grpc.GenericServerStream[AppendEntriesChunkedRequest, AppendEntriesResponse]{ServerStream: stream})
} }
type raftTransportAppendEntriesPipelineServer struct { // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
grpc.ServerStream type RaftTransport_AppendEntriesChunkedPipelineServer = grpc.BidiStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]
}
func (x *raftTransportAppendEntriesPipelineServer) Send(m *AppendEntriesResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *raftTransportAppendEntriesPipelineServer) Recv() (*AppendEntriesRequest, error) {
m := new(AppendEntriesRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _RaftTransport_AppendEntries_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _RaftTransport_AppendEntries_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AppendEntriesRequest) in := new(AppendEntriesRequest)
@ -235,7 +248,7 @@ func _RaftTransport_AppendEntries_Handler(srv interface{}, ctx context.Context,
} }
info := &grpc.UnaryServerInfo{ info := &grpc.UnaryServerInfo{
Server: srv, Server: srv,
FullMethod: "/RaftTransport/AppendEntries", FullMethod: RaftTransport_AppendEntries_FullMethodName,
} }
handler := func(ctx context.Context, req interface{}) (interface{}, error) { handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RaftTransportServer).AppendEntries(ctx, req.(*AppendEntriesRequest)) return srv.(RaftTransportServer).AppendEntries(ctx, req.(*AppendEntriesRequest))
@ -243,6 +256,13 @@ func _RaftTransport_AppendEntries_Handler(srv interface{}, ctx context.Context,
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _RaftTransport_AppendEntriesChunked_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RaftTransportServer).AppendEntriesChunked(&grpc.GenericServerStream[AppendEntriesChunkedRequest, AppendEntriesResponse]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RaftTransport_AppendEntriesChunkedServer = grpc.ClientStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]
func _RaftTransport_RequestVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _RaftTransport_RequestVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestVoteRequest) in := new(RequestVoteRequest)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@ -253,7 +273,7 @@ func _RaftTransport_RequestVote_Handler(srv interface{}, ctx context.Context, de
} }
info := &grpc.UnaryServerInfo{ info := &grpc.UnaryServerInfo{
Server: srv, Server: srv,
FullMethod: "/RaftTransport/RequestVote", FullMethod: RaftTransport_RequestVote_FullMethodName,
} }
handler := func(ctx context.Context, req interface{}) (interface{}, error) { handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RaftTransportServer).RequestVote(ctx, req.(*RequestVoteRequest)) return srv.(RaftTransportServer).RequestVote(ctx, req.(*RequestVoteRequest))
@ -271,7 +291,7 @@ func _RaftTransport_TimeoutNow_Handler(srv interface{}, ctx context.Context, dec
} }
info := &grpc.UnaryServerInfo{ info := &grpc.UnaryServerInfo{
Server: srv, Server: srv,
FullMethod: "/RaftTransport/TimeoutNow", FullMethod: RaftTransport_TimeoutNow_FullMethodName,
} }
handler := func(ctx context.Context, req interface{}) (interface{}, error) { handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RaftTransportServer).TimeoutNow(ctx, req.(*TimeoutNowRequest)) return srv.(RaftTransportServer).TimeoutNow(ctx, req.(*TimeoutNowRequest))
@ -280,30 +300,11 @@ func _RaftTransport_TimeoutNow_Handler(srv interface{}, ctx context.Context, dec
} }
func _RaftTransport_InstallSnapshot_Handler(srv interface{}, stream grpc.ServerStream) error { func _RaftTransport_InstallSnapshot_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RaftTransportServer).InstallSnapshot(&raftTransportInstallSnapshotServer{stream}) return srv.(RaftTransportServer).InstallSnapshot(&grpc.GenericServerStream[InstallSnapshotRequest, InstallSnapshotResponse]{ServerStream: stream})
} }
type RaftTransport_InstallSnapshotServer interface { // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
SendAndClose(*InstallSnapshotResponse) error type RaftTransport_InstallSnapshotServer = grpc.ClientStreamingServer[InstallSnapshotRequest, InstallSnapshotResponse]
Recv() (*InstallSnapshotRequest, error)
grpc.ServerStream
}
type raftTransportInstallSnapshotServer struct {
grpc.ServerStream
}
func (x *raftTransportInstallSnapshotServer) SendAndClose(m *InstallSnapshotResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *raftTransportInstallSnapshotServer) Recv() (*InstallSnapshotRequest, error) {
m := new(InstallSnapshotRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _RaftTransport_RequestPreVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _RaftTransport_RequestPreVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestPreVoteRequest) in := new(RequestPreVoteRequest)
@ -315,7 +316,7 @@ func _RaftTransport_RequestPreVote_Handler(srv interface{}, ctx context.Context,
} }
info := &grpc.UnaryServerInfo{ info := &grpc.UnaryServerInfo{
Server: srv, Server: srv,
FullMethod: "/RaftTransport/RequestPreVote", FullMethod: RaftTransport_RequestPreVote_FullMethodName,
} }
handler := func(ctx context.Context, req interface{}) (interface{}, error) { handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RaftTransportServer).RequestPreVote(ctx, req.(*RequestPreVoteRequest)) return srv.(RaftTransportServer).RequestPreVote(ctx, req.(*RequestPreVoteRequest))
@ -327,7 +328,7 @@ func _RaftTransport_RequestPreVote_Handler(srv interface{}, ctx context.Context,
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
var RaftTransport_ServiceDesc = grpc.ServiceDesc{ var RaftTransport_ServiceDesc = grpc.ServiceDesc{
ServiceName: "RaftTransport", ServiceName: "raft.RaftTransport",
HandlerType: (*RaftTransportServer)(nil), HandlerType: (*RaftTransportServer)(nil),
Methods: []grpc.MethodDesc{ Methods: []grpc.MethodDesc{
{ {
@ -354,11 +355,22 @@ var RaftTransport_ServiceDesc = grpc.ServiceDesc{
ServerStreams: true, ServerStreams: true,
ClientStreams: true, ClientStreams: true,
}, },
{
StreamName: "AppendEntriesChunkedPipeline",
Handler: _RaftTransport_AppendEntriesChunkedPipeline_Handler,
ServerStreams: true,
ClientStreams: true,
},
{
StreamName: "AppendEntriesChunked",
Handler: _RaftTransport_AppendEntriesChunked_Handler,
ClientStreams: true,
},
{ {
StreamName: "InstallSnapshot", StreamName: "InstallSnapshot",
Handler: _RaftTransport_InstallSnapshot_Handler, Handler: _RaftTransport_InstallSnapshot_Handler,
ClientStreams: true, ClientStreams: true,
}, },
}, },
Metadata: "transport.proto", Metadata: "proto/raft_transport.proto",
} }

View File

@ -2,31 +2,70 @@ package api
import ( import (
"context" "context"
"crypto/tls"
"crypto/x509"
"fmt" "fmt"
"log" "log"
"net" "net"
"os"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
pb "deevirt.fr/compute/pkg/api/proto" pb "deevirt.fr/compute/pkg/api/proto"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/raft" "deevirt.fr/compute/pkg/raft"
) )
func createGRPCServer(conf *config.Config) *grpc.Server {
if conf.Manager.TlsKey != "" {
cert, err := tls.LoadX509KeyPair(conf.Manager.TlsCert, conf.Manager.TlsKey)
if err != nil {
log.Fatalf("Erreur chargement du certificat: %v", err)
}
// Charger la CA (facultatif, pour la vérification des clients)
caCert, err := os.ReadFile(conf.Manager.TlsCert)
if err != nil {
log.Fatalf("Erreur chargement CA: %v", err)
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(caCert)
// Créer les credentials TLS
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientCAs: certPool,
ClientAuth: tls.RequireAndVerifyClientCert, // Authentification mutuelle (mTLS),
})
return grpc.NewServer(grpc.Creds(creds))
}
return grpc.NewServer()
}
func Server() { func Server() {
ctx := context.Background() ctx := context.Background()
// Récupération de la configuration deevirt
conf, err := config.New()
if err != nil {
log.Fatalf("failed load configuration: %v", err)
}
sock, err := net.Listen("tcp", fmt.Sprintf(":%d", 4480)) sock, err := net.Listen("tcp", fmt.Sprintf(":%d", 4480))
if err != nil { if err != nil {
log.Fatalf("failed to listen: %v", err) log.Fatalf("failed to listen: %v", err)
} }
r, tm, err := raft.New(ctx, 4480) r, tm, err := raft.New(ctx, conf, 4480)
if err != nil { if err != nil {
log.Fatalf("failed to start raft: %v", err) log.Fatalf("failed to start raft: %v", err)
} }
s := grpc.NewServer() s := createGRPCServer(conf)
pb.RegisterDomainServer(s, nil) pb.RegisterDomainServer(s, nil)
tm.Register(s) tm.Register(s)
//leaderhealth.Setup(r, s, []string{"Example"}) //leaderhealth.Setup(r, s, []string{"Example"})

View File

@ -2,15 +2,18 @@ package raft
import ( import (
"context" "context"
"crypto/tls"
"crypto/x509"
"fmt" "fmt"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
transport "github.com/Jille/raft-grpc-transport" transport "deevirt.fr/compute/pkg/raft/transport"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb" raftboltdb "github.com/hashicorp/raft-boltdb"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"deevirt.fr/compute/pkg/config" "deevirt.fr/compute/pkg/config"
etcd_client "deevirt.fr/compute/pkg/etcd" etcd_client "deevirt.fr/compute/pkg/etcd"
@ -29,16 +32,34 @@ type Peers struct {
Address string Address string
} }
func New(ctx context.Context, port int) (*raft.Raft, *transport.Manager, error) { func getTLSCredentials(conf *config.Config) credentials.TransportCredentials {
// Récupération de la configuration deevirt cert, err := tls.LoadX509KeyPair(conf.Manager.TlsCert, conf.Manager.TlsKey)
conf, err := config.New()
if err != nil { if err != nil {
return nil, nil, err log.Fatalf("Erreur chargement du certificat: %v", err)
} }
// Charger la CA (facultatif, pour la vérification des clients)
caCert, err := os.ReadFile(conf.Manager.TlsCert)
if err != nil {
log.Fatalf("Erreur chargement CA: %v", err)
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(caCert)
// Créer les credentials TLS
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientCAs: certPool,
InsecureSkipVerify: true,
})
return creds
}
func New(ctx context.Context, conf *config.Config, port int) (*raft.Raft, *transport.Manager, error) {
// Création du répertoire // Création du répertoire
baseDir := filepath.Join("/var/lib/deevirt/mgr/", conf.NodeID) baseDir := filepath.Join("/var/lib/deevirt/mgr/", conf.NodeID)
err = os.MkdirAll(baseDir, 0740) err := os.MkdirAll(baseDir, 0740)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -87,7 +108,13 @@ func New(ctx context.Context, port int) (*raft.Raft, *transport.Manager, error)
return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err) return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
} }
tm := transport.New(raft.ServerAddress(fmt.Sprintf("%s:%d", conf.AddressPrivate, port)), []grpc.DialOption{grpc.WithInsecure()}) dialOption := []grpc.DialOption{}
if conf.Manager.TlsKey != "" {
dialOption = append(dialOption, grpc.WithTransportCredentials(getTLSCredentials(conf)))
}
tm := transport.New(raft.ServerAddress(fmt.Sprintf("%s:%d", conf.AddressPrivate, port)), dialOption)
r, err := raft.NewRaft(c, nil, ldb, sdb, fss, tm.Transport()) r, err := raft.NewRaft(c, nil, ldb, sdb, fss, tm.Transport())
if err != nil { if err != nil {
@ -183,3 +210,84 @@ func (n *RaftNode) watchStateChanges() {
} }
} }
} }
/*func New(ctx context.Context, myID, myAddress string) (*raft.Raft, *transport.Manager, error) {
// Création du répertoire
baseDir := filepath.Join("/var/lib/deevirt/mgr/", myID)
err := os.MkdirAll(baseDir, 0740)
if err != nil {
return nil, nil, err
}
println(myAddress)
peers := []raft.Server{
{
ID: raft.ServerID("nodeA"),
Address: raft.ServerAddress("172.16.9.161:4410"),
},
{
ID: raft.ServerID("nodeB"),
Address: raft.ServerAddress("172.16.9.161:4411"),
},
}
c := raft.DefaultConfig()
c.LocalID = raft.ServerID(myID)
ldb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "logs.dat"))
if err != nil {
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
}
sdb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "stable.dat"))
if err != nil {
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err)
}
fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr)
if err != nil {
return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
}
tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{grpc.WithTransportCredentials(getTLSCredentials())})
r, err := raft.NewRaft(c, nil, ldb, sdb, fss, tm.Transport())
if err != nil {
return nil, nil, fmt.Errorf("raft.NewRaft: %v", err)
}
s, err := scheduler.New()
if err != nil {
return nil, nil, fmt.Errorf("scheduler: %v", err)
}
// Observer pour surveiller les changements d'état
stateCh := make(chan raft.Observation, 1) // Canal de type raft.Observation
r.RegisterObserver(raft.NewObserver(stateCh, true, nil))
node := &RaftNode{
Raft: r,
NodeID: myID,
StateCh: stateCh,
scheduler: s,
}
go node.watchStateChanges()
hasState, _ := checkIfStateExists(ldb)
if myAddress == "172.16.9.161:4410" && !hasState {
println("Démarrage du bootstrap ! ")
cfg := raft.Configuration{
Servers: peers,
}
f := r.BootstrapCluster(cfg)
if err := f.Error(); err != nil {
return nil, nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
}
}
return r, tm, nil
}*/

View File

@ -6,9 +6,11 @@ import (
"sync" "sync"
"time" "time"
pb "github.com/Jille/raft-grpc-transport/proto" pb "deevirt.fr/compute/pkg/api/proto"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
// These are calls from the Raft engine that we need to send out over gRPC. // These are calls from the Raft engine that we need to send out over gRPC.
@ -52,7 +54,7 @@ func (r raftAPI) getPeer(target raft.ServerAddress) (pb.RaftTransportClient, err
} }
defer c.mtx.Unlock() defer c.mtx.Unlock()
if c.clientConn == nil { if c.clientConn == nil {
conn, err := grpc.Dial(string(target), r.manager.dialOptions...) conn, err := grpc.NewClient(string(target), r.manager.dialOptions...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -74,7 +76,14 @@ func (r raftAPI) AppendEntries(id raft.ServerID, target raft.ServerAddress, args
ctx, cancel = context.WithTimeout(ctx, r.manager.heartbeatTimeout) ctx, cancel = context.WithTimeout(ctx, r.manager.heartbeatTimeout)
defer cancel() defer cancel()
} }
ret, err := c.AppendEntries(ctx, encodeAppendEntriesRequest(args)) appendEntriesRequest := encodeAppendEntriesRequest(args)
ret, err := c.AppendEntries(ctx, appendEntriesRequest)
if statusErr, ok := status.FromError(err); ok && statusErr.Code() == codes.ResourceExhausted {
chunkedRet, chunkedErr := r.appendEntriesChunked(ctx, r.manager.appendEntriesChunkSize, c, appendEntriesRequest)
if statusErr, ok := status.FromError(chunkedErr); ok && statusErr.Code() != codes.Unimplemented {
ret, err = chunkedRet, chunkedErr
}
}
if err != nil { if err != nil {
return err return err
} }
@ -82,6 +91,21 @@ func (r raftAPI) AppendEntries(id raft.ServerID, target raft.ServerAddress, args
return nil return nil
} }
// AppendEntries sends the appropriate RPC to the target node.
func (r raftAPI) appendEntriesChunked(ctx context.Context, chunkSize int, c pb.RaftTransportClient, appendEntriesRequest *pb.AppendEntriesRequest) (*pb.AppendEntriesResponse, error) {
stream, err := c.AppendEntriesChunked(ctx)
if err != nil {
return &pb.AppendEntriesResponse{}, err
}
defer stream.CloseSend()
if err := sendAppendEntriesChunkedRequest(chunkSize, stream, appendEntriesRequest); err != nil {
return &pb.AppendEntriesResponse{}, err
}
return stream.CloseAndRecv()
}
// RequestVote sends the appropriate RPC to the target node. // RequestVote sends the appropriate RPC to the target node.
func (r raftAPI) RequestVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestVoteRequest, resp *raft.RequestVoteResponse) error { func (r raftAPI) RequestVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestVoteRequest, resp *raft.RequestVoteResponse) error {
c, err := r.getPeer(target) c, err := r.getPeer(target)
@ -161,6 +185,11 @@ func (r raftAPI) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, re
return nil return nil
} }
type AppendEntriesPipelineInterface interface {
grpc.ClientStream
Recv() (*pb.AppendEntriesResponse, error)
}
// AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests. // AppendEntries requests.
func (r raftAPI) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error) { func (r raftAPI) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error) {
@ -170,38 +199,52 @@ func (r raftAPI) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddre
} }
ctx := context.TODO() ctx := context.TODO()
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
stream, err := c.AppendEntriesPipeline(ctx) var stream AppendEntriesPipelineInterface
stream, err = c.AppendEntriesChunkedPipeline(ctx)
if statusErr, ok := status.FromError(err); ok && statusErr.Code() == codes.Unimplemented {
stream, err = c.AppendEntriesPipeline(ctx)
}
if err != nil { if err != nil {
cancel() cancel()
return nil, err return nil, err
} }
rpa := raftPipelineAPI{ rpa := &raftPipelineAPI{
stream: stream, stream: stream,
cancel: cancel, appendEntriesChunkSize: r.manager.appendEntriesChunkSize,
inflightCh: make(chan *appendFuture, 20), cancel: cancel,
doneCh: make(chan raft.AppendFuture, 20), inflightCh: make(chan *appendFuture, 20),
doneCh: make(chan raft.AppendFuture, 20),
} }
go rpa.receiver() go rpa.receiver()
return rpa, nil return rpa, nil
} }
type raftPipelineAPI struct { type raftPipelineAPI struct {
stream pb.RaftTransport_AppendEntriesPipelineClient stream AppendEntriesPipelineInterface
cancel func() appendEntriesChunkSize int
inflightChMtx sync.Mutex cancel func()
inflightCh chan *appendFuture inflightChMtx sync.Mutex
doneCh chan raft.AppendFuture inflightCh chan *appendFuture
doneCh chan raft.AppendFuture
} }
// AppendEntries is used to add another request to the pipeline. // AppendEntries is used to add another request to the pipeline.
// The send may block which is an effective form of back-pressure. // The send may block which is an effective form of back-pressure.
func (r raftPipelineAPI) AppendEntries(req *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) (raft.AppendFuture, error) { func (r *raftPipelineAPI) AppendEntries(req *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) (raft.AppendFuture, error) {
af := &appendFuture{ af := &appendFuture{
start: time.Now(), start: time.Now(),
request: req, request: req,
done: make(chan struct{}), done: make(chan struct{}),
} }
if err := r.stream.Send(encodeAppendEntriesRequest(req)); err != nil { var err error
appendEntriesRequest := encodeAppendEntriesRequest(req)
switch stream := r.stream.(type) {
case pb.RaftTransport_AppendEntriesPipelineClient:
err = stream.Send(appendEntriesRequest)
case pb.RaftTransport_AppendEntriesChunkedPipelineClient:
err = sendAppendEntriesChunkedRequest(r.appendEntriesChunkSize, stream, appendEntriesRequest)
}
if err != nil {
return nil, err return nil, err
} }
r.inflightChMtx.Lock() r.inflightChMtx.Lock()
@ -216,12 +259,12 @@ func (r raftPipelineAPI) AppendEntries(req *raft.AppendEntriesRequest, resp *raf
// Consumer returns a channel that can be used to consume // Consumer returns a channel that can be used to consume
// response futures when they are ready. // response futures when they are ready.
func (r raftPipelineAPI) Consumer() <-chan raft.AppendFuture { func (r *raftPipelineAPI) Consumer() <-chan raft.AppendFuture {
return r.doneCh return r.doneCh
} }
// Close closes the pipeline and cancels all inflight RPCs // Close closes the pipeline and cancels all inflight RPCs
func (r raftPipelineAPI) Close() error { func (r *raftPipelineAPI) Close() error {
r.cancel() r.cancel()
r.inflightChMtx.Lock() r.inflightChMtx.Lock()
close(r.inflightCh) close(r.inflightCh)
@ -229,7 +272,7 @@ func (r raftPipelineAPI) Close() error {
return nil return nil
} }
func (r raftPipelineAPI) receiver() { func (r *raftPipelineAPI) receiver() {
for af := range r.inflightCh { for af := range r.inflightCh {
msg, err := r.stream.Recv() msg, err := r.stream.Recv()
if err != nil { if err != nil {
@ -319,8 +362,8 @@ func (r raftAPI) Disconnect(target raft.ServerAddress) {
r.manager.connectionsMtx.Unlock() r.manager.connectionsMtx.Unlock()
if ok { if ok {
c.mtx.Lock() c.mtx.Lock()
c.mtx.Unlock()
_ = c.clientConn.Close() _ = c.clientConn.Close()
c.mtx.Unlock()
} }
} }

View File

@ -0,0 +1,79 @@
package transport
import (
pb "deevirt.fr/compute/pkg/api/proto"
"google.golang.org/protobuf/proto"
)
type AppendEntriesChunkedRequestStreamSender interface {
Send(*pb.AppendEntriesChunkedRequest) error
}
func sendAppendEntriesChunkedRequest(chunkSize int, stream AppendEntriesChunkedRequestStreamSender, appendEntriesRequest *pb.AppendEntriesRequest) error {
reqBuf, err := proto.Marshal(appendEntriesRequest)
if err != nil {
return err
}
reqSize := len(reqBuf)
numChunks := reqSize / chunkSize
if reqSize%chunkSize != 0 {
numChunks++
}
remainingBytes := reqSize
for chunkIdx := 0; chunkIdx < numChunks; chunkIdx++ {
lowerBound := chunkIdx * chunkSize
upperBound := (chunkIdx + 1) * chunkSize
if reqSize < upperBound {
upperBound = reqSize
}
remainingBytes -= upperBound - lowerBound
chunk := &pb.AppendEntriesChunkedRequest{
RemainingBytes: int64(remainingBytes),
Chunk: reqBuf[lowerBound:upperBound],
}
if err := stream.Send(chunk); err != nil {
return err
}
}
return nil
}
type AppendEntriesChunkedRequestStreamReceiver interface {
Recv() (*pb.AppendEntriesChunkedRequest, error)
}
func receiveAppendEntriesChunkedRequest(stream AppendEntriesChunkedRequestStreamReceiver) (*pb.AppendEntriesRequest, error) {
var reqBuf []byte
chunk, err := stream.Recv()
if err != nil {
return &pb.AppendEntriesRequest{}, err
}
if chunk.RemainingBytes == 0 {
reqBuf = chunk.Chunk
} else {
reqBuf = make([]byte, len(chunk.Chunk)+int(chunk.RemainingBytes))
lowerBound := copy(reqBuf, chunk.Chunk)
for chunk.RemainingBytes > 0 {
chunk, err = stream.Recv()
if err != nil {
return &pb.AppendEntriesRequest{}, err
}
lowerBound += copy(reqBuf[lowerBound:], chunk.Chunk)
}
}
appendEntriesRequest := new(pb.AppendEntriesRequest)
if err := proto.Unmarshal(reqBuf, appendEntriesRequest); err != nil {
return &pb.AppendEntriesRequest{}, err
}
return appendEntriesRequest, nil
}

View File

@ -1,7 +1,7 @@
package transport package transport
import ( import (
pb "github.com/Jille/raft-grpc-transport/proto" pb "deevirt.fr/compute/pkg/api/proto"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
) )

View File

@ -4,7 +4,7 @@ import (
"context" "context"
"io" "io"
pb "github.com/Jille/raft-grpc-transport/proto" pb "deevirt.fr/compute/pkg/api/proto"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
) )
@ -60,6 +60,20 @@ func (g gRPCAPI) AppendEntries(ctx context.Context, req *pb.AppendEntriesRequest
return encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse)), nil return encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse)), nil
} }
func (g gRPCAPI) AppendEntriesChunked(stream pb.RaftTransport_AppendEntriesChunkedServer) error {
appendEntriesRequest, err := receiveAppendEntriesChunkedRequest(stream)
if err != nil {
return err
}
resp, err := g.handleRPC(decodeAppendEntriesRequest(appendEntriesRequest), nil)
if err != nil {
return err
}
return stream.SendAndClose(encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse)))
}
func (g gRPCAPI) RequestVote(ctx context.Context, req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) { func (g gRPCAPI) RequestVote(ctx context.Context, req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) {
resp, err := g.handleRPC(decodeRequestVoteRequest(req), nil) resp, err := g.handleRPC(decodeRequestVoteRequest(req), nil)
if err != nil { if err != nil {
@ -137,10 +151,26 @@ func (g gRPCAPI) AppendEntriesPipeline(s pb.RaftTransport_AppendEntriesPipelineS
} }
} }
func (g gRPCAPI) AppendEntriesChunkedPipeline(s pb.RaftTransport_AppendEntriesChunkedPipelineServer) error {
for {
msg, err := receiveAppendEntriesChunkedRequest(s)
if err != nil {
return err
}
resp, err := g.handleRPC(decodeAppendEntriesRequest(msg), nil)
if err != nil {
return err
}
if err := s.Send(encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse))); err != nil {
return err
}
}
}
func isHeartbeat(command interface{}) bool { func isHeartbeat(command interface{}) bool {
req, ok := command.(*raft.AppendEntriesRequest) req, ok := command.(*raft.AppendEntriesRequest)
if !ok { if !ok {
return false return false
} }
return req.Term != 0 && len(req.Leader) != 0 && req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && len(req.Entries) == 0 && req.LeaderCommitIndex == 0 return req.Term != 0 && len(req.Addr) != 0 && req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && len(req.Entries) == 0 && req.LeaderCommitIndex == 0
} }

View File

@ -0,0 +1,23 @@
package transport
import "time"
type Option func(m *Manager)
// WithHeartbeatTimeout configures the transport to not wait for more than d
// for a heartbeat to be executed by a remote peer.
func WithHeartbeatTimeout(d time.Duration) Option {
return func(m *Manager) {
m.heartbeatTimeout = d
}
}
// WithAppendEntriesChunkSize configures the chunk size to use when switching
// to chunked AppendEntries. The default value is 4MB (the gRPC default), but
// as there is no way to auto-discover that value, it's up to the developer
// to configure this, if the default value is not appropriate.
func WithAppendEntriesChunkSize(v int) Option {
return func(m *Manager) {
m.appendEntriesChunkSize = v
}
}

View File

@ -1,16 +1,16 @@
package transport package transport
import ( import (
pb "github.com/Jille/raft-grpc-transport/proto" pb "deevirt.fr/compute/pkg/api/proto"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
) )
func encodeAppendEntriesRequest(s *raft.AppendEntriesRequest) *pb.AppendEntriesRequest { func encodeAppendEntriesRequest(s *raft.AppendEntriesRequest) *pb.AppendEntriesRequest {
return &pb.AppendEntriesRequest{ return &pb.AppendEntriesRequest{
RpcHeader: encodeRPCHeader(s.RPCHeader), RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term, Term: s.Term,
Leader: s.Leader, //Leader: s.Leader,
PrevLogEntry: s.PrevLogEntry, PrevLogEntry: s.PrevLogEntry,
PrevLogTerm: s.PrevLogTerm, PrevLogTerm: s.PrevLogTerm,
Entries: encodeLogs(s.Entries), Entries: encodeLogs(s.Entries),
@ -76,9 +76,9 @@ func encodeAppendEntriesResponse(s *raft.AppendEntriesResponse) *pb.AppendEntrie
func encodeRequestVoteRequest(s *raft.RequestVoteRequest) *pb.RequestVoteRequest { func encodeRequestVoteRequest(s *raft.RequestVoteRequest) *pb.RequestVoteRequest {
return &pb.RequestVoteRequest{ return &pb.RequestVoteRequest{
RpcHeader: encodeRPCHeader(s.RPCHeader), RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term, Term: s.Term,
Candidate: s.Candidate, //Candidate: s.Candidate,
LastLogIndex: s.LastLogIndex, LastLogIndex: s.LastLogIndex,
LastLogTerm: s.LastLogTerm, LastLogTerm: s.LastLogTerm,
LeadershipTransfer: s.LeadershipTransfer, LeadershipTransfer: s.LeadershipTransfer,

View File

@ -5,7 +5,7 @@ import (
"sync" "sync"
"time" "time"
pb "github.com/Jille/raft-grpc-transport/proto" pb "deevirt.fr/compute/pkg/api/proto"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -25,8 +25,9 @@ type Manager struct {
heartbeatFuncMtx sync.Mutex heartbeatFuncMtx sync.Mutex
heartbeatTimeout time.Duration heartbeatTimeout time.Duration
connectionsMtx sync.Mutex connectionsMtx sync.Mutex
connections map[raft.ServerAddress]*conn connections map[raft.ServerAddress]*conn
appendEntriesChunkSize int
shutdown bool shutdown bool
shutdownCh chan struct{} shutdownCh chan struct{}
@ -39,8 +40,9 @@ func New(localAddress raft.ServerAddress, dialOptions []grpc.DialOption, options
localAddress: localAddress, localAddress: localAddress,
dialOptions: dialOptions, dialOptions: dialOptions,
rpcChan: make(chan raft.RPC), rpcChan: make(chan raft.RPC),
connections: map[raft.ServerAddress]*conn{}, connections: map[raft.ServerAddress]*conn{},
appendEntriesChunkSize: 4*1024*1024 - 10, // same as gRPC default value (minus some overhead)
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
} }
@ -81,8 +83,8 @@ func (m *Manager) disconnectAll() error {
for k, conn := range m.connections { for k, conn := range m.connections {
// Lock conn.mtx to ensure Dial() is complete // Lock conn.mtx to ensure Dial() is complete
conn.mtx.Lock() conn.mtx.Lock()
conn.mtx.Unlock()
closeErr := conn.clientConn.Close() closeErr := conn.clientConn.Close()
conn.mtx.Unlock()
if closeErr != nil { if closeErr != nil {
err = multierror.Append(err, closeErr) err = multierror.Append(err, closeErr)
} }

View File

@ -1,25 +0,0 @@
BSD 2-Clause License
Copyright (c) 2020, Jille Timmermans
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,20 +0,0 @@
# raft-grpc-transport
[![Godoc](https://godoc.org/github.com/Jille/raft-grpc-transport?status.svg)](https://godoc.org/github.com/Jille/raft-grpc-transport)
This library provides a [Transport](https://godoc.org/github.com/hashicorp/raft#Transport) for https://github.com/hashicorp/raft over gRPC.
One benefit of this is that gRPC is easy to multiplex over a single port.
## Usage
```go
// ...
tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{grpc.WithInsecure()})
s := grpc.NewServer()
tm.Register(s)
r, err := raft.NewRaft(..., tm.Transport())
// ...
```
Want more example code? Check out main.go at https://github.com/Jille/raft-grpc-example

View File

@ -1,13 +0,0 @@
package transport
import "time"
type Option func(m *Manager)
// WithHeartbeatTimeout configures the transport to not wait for more than d
// for a heartbeat to be executed by a remote peer.
func WithHeartbeatTimeout(d time.Duration) Option {
return func(m *Manager) {
m.heartbeatTimeout = d
}
}

View File

@ -1,6 +0,0 @@
transport.pb.go: transport.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative transport.proto
force:
rm -f transport.pb.go
make transport.pb.go

2
vendor/modules.txt vendored
View File

@ -1,7 +1,5 @@
# github.com/Jille/raft-grpc-transport v1.6.1 # github.com/Jille/raft-grpc-transport v1.6.1
## explicit; go 1.13 ## explicit; go 1.13
github.com/Jille/raft-grpc-transport
github.com/Jille/raft-grpc-transport/proto
# github.com/armon/go-metrics v0.4.1 # github.com/armon/go-metrics v0.4.1
## explicit; go 1.12 ## explicit; go 1.12
github.com/armon/go-metrics github.com/armon/go-metrics