Compare commits

..

2 Commits

18 changed files with 887 additions and 511 deletions

View File

@ -1,8 +1,8 @@
package main
import (
"deevirt.fr/compute/cmd/compute_qemu/events"
"deevirt.fr/compute/cmd/compute_qemu/metrics"
"deevirt.fr/compute/cmd/monitor/events"
"deevirt.fr/compute/cmd/monitor/metrics"
)
func main() {

View File

@ -1,15 +1,19 @@
syntax = "proto3";
option go_package = "github.com/Jille/raft-grpc-transport/proto";
option go_package = "./proto";
import "google/protobuf/timestamp.proto";
package raft;
service RaftTransport {
// AppendEntriesPipeline opens an AppendEntries message stream.
rpc AppendEntriesPipeline(stream AppendEntriesRequest) returns (stream AppendEntriesResponse) {}
rpc AppendEntriesChunkedPipeline(stream AppendEntriesChunkedRequest) returns (stream AppendEntriesResponse) {}
// AppendEntries performs a single append entries request / response.
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse) {}
// AppendEntries performs a single append entries request / response for request larger than the max grpc message size.
rpc AppendEntriesChunked(stream AppendEntriesChunkedRequest) returns (AppendEntriesResponse) {}
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
rpc RequestVote(RequestVoteRequest) returns (RequestVoteResponse) {}
// TimeoutNow is used to start a leadership transfer to the target node.
@ -53,6 +57,11 @@ message AppendEntriesRequest {
uint64 leader_commit_index = 7;
}
message AppendEntriesChunkedRequest {
int64 remaining_bytes = 1; // number of bytes of the same request AFTER this chunk
bytes chunk = 2;
}
message AppendEntriesResponse {
RPCHeader rpc_header = 1;
uint64 term = 2;
@ -119,4 +128,4 @@ message RequestPreVoteResponse {
RPCHeader rpc_header = 1;
uint64 term = 2;
bool granted = 3;
}
}

View File

@ -1,8 +1,8 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.3
// source: transport.proto
// - protoc-gen-go-grpc v1.5.1
// - protoc v3.14.0
// source: proto/raft_transport.proto
package proto
@ -15,23 +15,37 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
RaftTransport_AppendEntriesPipeline_FullMethodName = "/raft.RaftTransport/AppendEntriesPipeline"
RaftTransport_AppendEntriesChunkedPipeline_FullMethodName = "/raft.RaftTransport/AppendEntriesChunkedPipeline"
RaftTransport_AppendEntries_FullMethodName = "/raft.RaftTransport/AppendEntries"
RaftTransport_AppendEntriesChunked_FullMethodName = "/raft.RaftTransport/AppendEntriesChunked"
RaftTransport_RequestVote_FullMethodName = "/raft.RaftTransport/RequestVote"
RaftTransport_TimeoutNow_FullMethodName = "/raft.RaftTransport/TimeoutNow"
RaftTransport_InstallSnapshot_FullMethodName = "/raft.RaftTransport/InstallSnapshot"
RaftTransport_RequestPreVote_FullMethodName = "/raft.RaftTransport/RequestPreVote"
)
// RaftTransportClient is the client API for RaftTransport service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type RaftTransportClient interface {
// AppendEntriesPipeline opens an AppendEntries message stream.
AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_AppendEntriesPipelineClient, error)
AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AppendEntriesRequest, AppendEntriesResponse], error)
AppendEntriesChunkedPipeline(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse], error)
// AppendEntries performs a single append entries request / response.
AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error)
// AppendEntries performs a single append entries request / response for request larger than the max grpc message size.
AppendEntriesChunked(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse], error)
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error)
// TimeoutNow is used to start a leadership transfer to the target node.
TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error)
// InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.
InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_InstallSnapshotClient, error)
InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[InstallSnapshotRequest, InstallSnapshotResponse], error)
// RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election.
RequestPreVote(ctx context.Context, in *RequestPreVoteRequest, opts ...grpc.CallOption) (*RequestPreVoteResponse, error)
}
@ -44,49 +58,59 @@ func NewRaftTransportClient(cc grpc.ClientConnInterface) RaftTransportClient {
return &raftTransportClient{cc}
}
func (c *raftTransportClient) AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_AppendEntriesPipelineClient, error) {
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[0], "/RaftTransport/AppendEntriesPipeline", opts...)
func (c *raftTransportClient) AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AppendEntriesRequest, AppendEntriesResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[0], RaftTransport_AppendEntriesPipeline_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &raftTransportAppendEntriesPipelineClient{stream}
x := &grpc.GenericClientStream[AppendEntriesRequest, AppendEntriesResponse]{ClientStream: stream}
return x, nil
}
type RaftTransport_AppendEntriesPipelineClient interface {
Send(*AppendEntriesRequest) error
Recv() (*AppendEntriesResponse, error)
grpc.ClientStream
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RaftTransport_AppendEntriesPipelineClient = grpc.BidiStreamingClient[AppendEntriesRequest, AppendEntriesResponse]
type raftTransportAppendEntriesPipelineClient struct {
grpc.ClientStream
}
func (x *raftTransportAppendEntriesPipelineClient) Send(m *AppendEntriesRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *raftTransportAppendEntriesPipelineClient) Recv() (*AppendEntriesResponse, error) {
m := new(AppendEntriesResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
func (c *raftTransportClient) AppendEntriesChunkedPipeline(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[1], RaftTransport_AppendEntriesChunkedPipeline_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
return m, nil
x := &grpc.GenericClientStream[AppendEntriesChunkedRequest, AppendEntriesResponse]{ClientStream: stream}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RaftTransport_AppendEntriesChunkedPipelineClient = grpc.BidiStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse]
func (c *raftTransportClient) AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(AppendEntriesResponse)
err := c.cc.Invoke(ctx, "/RaftTransport/AppendEntries", in, out, opts...)
err := c.cc.Invoke(ctx, RaftTransport_AppendEntries_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *raftTransportClient) AppendEntriesChunked(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[2], RaftTransport_AppendEntriesChunked_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[AppendEntriesChunkedRequest, AppendEntriesResponse]{ClientStream: stream}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RaftTransport_AppendEntriesChunkedClient = grpc.ClientStreamingClient[AppendEntriesChunkedRequest, AppendEntriesResponse]
func (c *raftTransportClient) RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(RequestVoteResponse)
err := c.cc.Invoke(ctx, "/RaftTransport/RequestVote", in, out, opts...)
err := c.cc.Invoke(ctx, RaftTransport_RequestVote_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@ -94,51 +118,32 @@ func (c *raftTransportClient) RequestVote(ctx context.Context, in *RequestVoteRe
}
func (c *raftTransportClient) TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(TimeoutNowResponse)
err := c.cc.Invoke(ctx, "/RaftTransport/TimeoutNow", in, out, opts...)
err := c.cc.Invoke(ctx, RaftTransport_TimeoutNow_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *raftTransportClient) InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_InstallSnapshotClient, error) {
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[1], "/RaftTransport/InstallSnapshot", opts...)
func (c *raftTransportClient) InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[InstallSnapshotRequest, InstallSnapshotResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[3], RaftTransport_InstallSnapshot_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &raftTransportInstallSnapshotClient{stream}
x := &grpc.GenericClientStream[InstallSnapshotRequest, InstallSnapshotResponse]{ClientStream: stream}
return x, nil
}
type RaftTransport_InstallSnapshotClient interface {
Send(*InstallSnapshotRequest) error
CloseAndRecv() (*InstallSnapshotResponse, error)
grpc.ClientStream
}
type raftTransportInstallSnapshotClient struct {
grpc.ClientStream
}
func (x *raftTransportInstallSnapshotClient) Send(m *InstallSnapshotRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *raftTransportInstallSnapshotClient) CloseAndRecv() (*InstallSnapshotResponse, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(InstallSnapshotResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RaftTransport_InstallSnapshotClient = grpc.ClientStreamingClient[InstallSnapshotRequest, InstallSnapshotResponse]
func (c *raftTransportClient) RequestPreVote(ctx context.Context, in *RequestPreVoteRequest, opts ...grpc.CallOption) (*RequestPreVoteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(RequestPreVoteResponse)
err := c.cc.Invoke(ctx, "/RaftTransport/RequestPreVote", in, out, opts...)
err := c.cc.Invoke(ctx, RaftTransport_RequestPreVote_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@ -147,46 +152,59 @@ func (c *raftTransportClient) RequestPreVote(ctx context.Context, in *RequestPre
// RaftTransportServer is the server API for RaftTransport service.
// All implementations must embed UnimplementedRaftTransportServer
// for forward compatibility
// for forward compatibility.
type RaftTransportServer interface {
// AppendEntriesPipeline opens an AppendEntries message stream.
AppendEntriesPipeline(RaftTransport_AppendEntriesPipelineServer) error
AppendEntriesPipeline(grpc.BidiStreamingServer[AppendEntriesRequest, AppendEntriesResponse]) error
AppendEntriesChunkedPipeline(grpc.BidiStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]) error
// AppendEntries performs a single append entries request / response.
AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error)
// AppendEntries performs a single append entries request / response for request larger than the max grpc message size.
AppendEntriesChunked(grpc.ClientStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]) error
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error)
// TimeoutNow is used to start a leadership transfer to the target node.
TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error)
// InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.
InstallSnapshot(RaftTransport_InstallSnapshotServer) error
InstallSnapshot(grpc.ClientStreamingServer[InstallSnapshotRequest, InstallSnapshotResponse]) error
// RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election.
RequestPreVote(context.Context, *RequestPreVoteRequest) (*RequestPreVoteResponse, error)
mustEmbedUnimplementedRaftTransportServer()
}
// UnimplementedRaftTransportServer must be embedded to have forward compatible implementations.
type UnimplementedRaftTransportServer struct {
}
// UnimplementedRaftTransportServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedRaftTransportServer struct{}
func (UnimplementedRaftTransportServer) AppendEntriesPipeline(RaftTransport_AppendEntriesPipelineServer) error {
func (UnimplementedRaftTransportServer) AppendEntriesPipeline(grpc.BidiStreamingServer[AppendEntriesRequest, AppendEntriesResponse]) error {
return status.Errorf(codes.Unimplemented, "method AppendEntriesPipeline not implemented")
}
func (UnimplementedRaftTransportServer) AppendEntriesChunkedPipeline(grpc.BidiStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]) error {
return status.Errorf(codes.Unimplemented, "method AppendEntriesChunkedPipeline not implemented")
}
func (UnimplementedRaftTransportServer) AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AppendEntries not implemented")
}
func (UnimplementedRaftTransportServer) AppendEntriesChunked(grpc.ClientStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]) error {
return status.Errorf(codes.Unimplemented, "method AppendEntriesChunked not implemented")
}
func (UnimplementedRaftTransportServer) RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RequestVote not implemented")
}
func (UnimplementedRaftTransportServer) TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method TimeoutNow not implemented")
}
func (UnimplementedRaftTransportServer) InstallSnapshot(RaftTransport_InstallSnapshotServer) error {
func (UnimplementedRaftTransportServer) InstallSnapshot(grpc.ClientStreamingServer[InstallSnapshotRequest, InstallSnapshotResponse]) error {
return status.Errorf(codes.Unimplemented, "method InstallSnapshot not implemented")
}
func (UnimplementedRaftTransportServer) RequestPreVote(context.Context, *RequestPreVoteRequest) (*RequestPreVoteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RequestPreVote not implemented")
}
func (UnimplementedRaftTransportServer) mustEmbedUnimplementedRaftTransportServer() {}
func (UnimplementedRaftTransportServer) testEmbeddedByValue() {}
// UnsafeRaftTransportServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to RaftTransportServer will
@ -196,34 +214,29 @@ type UnsafeRaftTransportServer interface {
}
func RegisterRaftTransportServer(s grpc.ServiceRegistrar, srv RaftTransportServer) {
// If the following call pancis, it indicates UnimplementedRaftTransportServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&RaftTransport_ServiceDesc, srv)
}
func _RaftTransport_AppendEntriesPipeline_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RaftTransportServer).AppendEntriesPipeline(&raftTransportAppendEntriesPipelineServer{stream})
return srv.(RaftTransportServer).AppendEntriesPipeline(&grpc.GenericServerStream[AppendEntriesRequest, AppendEntriesResponse]{ServerStream: stream})
}
type RaftTransport_AppendEntriesPipelineServer interface {
Send(*AppendEntriesResponse) error
Recv() (*AppendEntriesRequest, error)
grpc.ServerStream
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RaftTransport_AppendEntriesPipelineServer = grpc.BidiStreamingServer[AppendEntriesRequest, AppendEntriesResponse]
func _RaftTransport_AppendEntriesChunkedPipeline_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RaftTransportServer).AppendEntriesChunkedPipeline(&grpc.GenericServerStream[AppendEntriesChunkedRequest, AppendEntriesResponse]{ServerStream: stream})
}
type raftTransportAppendEntriesPipelineServer struct {
grpc.ServerStream
}
func (x *raftTransportAppendEntriesPipelineServer) Send(m *AppendEntriesResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *raftTransportAppendEntriesPipelineServer) Recv() (*AppendEntriesRequest, error) {
m := new(AppendEntriesRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RaftTransport_AppendEntriesChunkedPipelineServer = grpc.BidiStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]
func _RaftTransport_AppendEntries_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AppendEntriesRequest)
@ -235,7 +248,7 @@ func _RaftTransport_AppendEntries_Handler(srv interface{}, ctx context.Context,
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/RaftTransport/AppendEntries",
FullMethod: RaftTransport_AppendEntries_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RaftTransportServer).AppendEntries(ctx, req.(*AppendEntriesRequest))
@ -243,6 +256,13 @@ func _RaftTransport_AppendEntries_Handler(srv interface{}, ctx context.Context,
return interceptor(ctx, in, info, handler)
}
func _RaftTransport_AppendEntriesChunked_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RaftTransportServer).AppendEntriesChunked(&grpc.GenericServerStream[AppendEntriesChunkedRequest, AppendEntriesResponse]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RaftTransport_AppendEntriesChunkedServer = grpc.ClientStreamingServer[AppendEntriesChunkedRequest, AppendEntriesResponse]
func _RaftTransport_RequestVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestVoteRequest)
if err := dec(in); err != nil {
@ -253,7 +273,7 @@ func _RaftTransport_RequestVote_Handler(srv interface{}, ctx context.Context, de
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/RaftTransport/RequestVote",
FullMethod: RaftTransport_RequestVote_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RaftTransportServer).RequestVote(ctx, req.(*RequestVoteRequest))
@ -271,7 +291,7 @@ func _RaftTransport_TimeoutNow_Handler(srv interface{}, ctx context.Context, dec
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/RaftTransport/TimeoutNow",
FullMethod: RaftTransport_TimeoutNow_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RaftTransportServer).TimeoutNow(ctx, req.(*TimeoutNowRequest))
@ -280,30 +300,11 @@ func _RaftTransport_TimeoutNow_Handler(srv interface{}, ctx context.Context, dec
}
func _RaftTransport_InstallSnapshot_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RaftTransportServer).InstallSnapshot(&raftTransportInstallSnapshotServer{stream})
return srv.(RaftTransportServer).InstallSnapshot(&grpc.GenericServerStream[InstallSnapshotRequest, InstallSnapshotResponse]{ServerStream: stream})
}
type RaftTransport_InstallSnapshotServer interface {
SendAndClose(*InstallSnapshotResponse) error
Recv() (*InstallSnapshotRequest, error)
grpc.ServerStream
}
type raftTransportInstallSnapshotServer struct {
grpc.ServerStream
}
func (x *raftTransportInstallSnapshotServer) SendAndClose(m *InstallSnapshotResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *raftTransportInstallSnapshotServer) Recv() (*InstallSnapshotRequest, error) {
m := new(InstallSnapshotRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RaftTransport_InstallSnapshotServer = grpc.ClientStreamingServer[InstallSnapshotRequest, InstallSnapshotResponse]
func _RaftTransport_RequestPreVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestPreVoteRequest)
@ -315,7 +316,7 @@ func _RaftTransport_RequestPreVote_Handler(srv interface{}, ctx context.Context,
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/RaftTransport/RequestPreVote",
FullMethod: RaftTransport_RequestPreVote_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RaftTransportServer).RequestPreVote(ctx, req.(*RequestPreVoteRequest))
@ -327,7 +328,7 @@ func _RaftTransport_RequestPreVote_Handler(srv interface{}, ctx context.Context,
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var RaftTransport_ServiceDesc = grpc.ServiceDesc{
ServiceName: "RaftTransport",
ServiceName: "raft.RaftTransport",
HandlerType: (*RaftTransportServer)(nil),
Methods: []grpc.MethodDesc{
{
@ -354,11 +355,22 @@ var RaftTransport_ServiceDesc = grpc.ServiceDesc{
ServerStreams: true,
ClientStreams: true,
},
{
StreamName: "AppendEntriesChunkedPipeline",
Handler: _RaftTransport_AppendEntriesChunkedPipeline_Handler,
ServerStreams: true,
ClientStreams: true,
},
{
StreamName: "AppendEntriesChunked",
Handler: _RaftTransport_AppendEntriesChunked_Handler,
ClientStreams: true,
},
{
StreamName: "InstallSnapshot",
Handler: _RaftTransport_InstallSnapshot_Handler,
ClientStreams: true,
},
},
Metadata: "transport.proto",
Metadata: "proto/raft_transport.proto",
}

View File

@ -2,31 +2,70 @@ package api
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"net"
"os"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"
pb "deevirt.fr/compute/pkg/api/proto"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/raft"
)
func createGRPCServer(conf *config.Config) *grpc.Server {
if conf.Manager.TlsKey != "" {
cert, err := tls.LoadX509KeyPair(conf.Manager.TlsCert, conf.Manager.TlsKey)
if err != nil {
log.Fatalf("Erreur chargement du certificat: %v", err)
}
// Charger la CA (facultatif, pour la vérification des clients)
caCert, err := os.ReadFile(conf.Manager.TlsCert)
if err != nil {
log.Fatalf("Erreur chargement CA: %v", err)
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(caCert)
// Créer les credentials TLS
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientCAs: certPool,
ClientAuth: tls.RequireAndVerifyClientCert, // Authentification mutuelle (mTLS),
})
return grpc.NewServer(grpc.Creds(creds))
}
return grpc.NewServer()
}
func Server() {
ctx := context.Background()
// Récupération de la configuration deevirt
conf, err := config.New()
if err != nil {
log.Fatalf("failed load configuration: %v", err)
}
sock, err := net.Listen("tcp", fmt.Sprintf(":%d", 4480))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
r, tm, err := raft.New(ctx, 4480)
r, tm, err := raft.New(ctx, conf, 4480)
if err != nil {
log.Fatalf("failed to start raft: %v", err)
}
s := grpc.NewServer()
s := createGRPCServer(conf)
pb.RegisterDomainServer(s, nil)
tm.Register(s)
//leaderhealth.Setup(r, s, []string{"Example"})

View File

@ -2,15 +2,18 @@ package raft
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"os"
"path/filepath"
transport "github.com/Jille/raft-grpc-transport"
transport "deevirt.fr/compute/pkg/raft/transport"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"deevirt.fr/compute/pkg/config"
etcd_client "deevirt.fr/compute/pkg/etcd"
@ -29,16 +32,34 @@ type Peers struct {
Address string
}
func New(ctx context.Context, port int) (*raft.Raft, *transport.Manager, error) {
// Récupération de la configuration deevirt
conf, err := config.New()
func getTLSCredentials(conf *config.Config) credentials.TransportCredentials {
cert, err := tls.LoadX509KeyPair(conf.Manager.TlsCert, conf.Manager.TlsKey)
if err != nil {
return nil, nil, err
log.Fatalf("Erreur chargement du certificat: %v", err)
}
// Charger la CA (facultatif, pour la vérification des clients)
caCert, err := os.ReadFile(conf.Manager.TlsCert)
if err != nil {
log.Fatalf("Erreur chargement CA: %v", err)
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(caCert)
// Créer les credentials TLS
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientCAs: certPool,
InsecureSkipVerify: true,
})
return creds
}
func New(ctx context.Context, conf *config.Config, port int) (*raft.Raft, *transport.Manager, error) {
// Création du répertoire
baseDir := filepath.Join("/var/lib/deevirt/mgr/", conf.NodeID)
err = os.MkdirAll(baseDir, 0740)
err := os.MkdirAll(baseDir, 0740)
if err != nil {
return nil, nil, err
}
@ -87,7 +108,13 @@ func New(ctx context.Context, port int) (*raft.Raft, *transport.Manager, error)
return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
}
tm := transport.New(raft.ServerAddress(fmt.Sprintf("%s:%d", conf.AddressPrivate, port)), []grpc.DialOption{grpc.WithInsecure()})
dialOption := []grpc.DialOption{}
if conf.Manager.TlsKey != "" {
dialOption = append(dialOption, grpc.WithTransportCredentials(getTLSCredentials(conf)))
}
tm := transport.New(raft.ServerAddress(fmt.Sprintf("%s:%d", conf.AddressPrivate, port)), dialOption)
r, err := raft.NewRaft(c, nil, ldb, sdb, fss, tm.Transport())
if err != nil {
@ -183,3 +210,84 @@ func (n *RaftNode) watchStateChanges() {
}
}
}
/*func New(ctx context.Context, myID, myAddress string) (*raft.Raft, *transport.Manager, error) {
// Création du répertoire
baseDir := filepath.Join("/var/lib/deevirt/mgr/", myID)
err := os.MkdirAll(baseDir, 0740)
if err != nil {
return nil, nil, err
}
println(myAddress)
peers := []raft.Server{
{
ID: raft.ServerID("nodeA"),
Address: raft.ServerAddress("172.16.9.161:4410"),
},
{
ID: raft.ServerID("nodeB"),
Address: raft.ServerAddress("172.16.9.161:4411"),
},
}
c := raft.DefaultConfig()
c.LocalID = raft.ServerID(myID)
ldb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "logs.dat"))
if err != nil {
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
}
sdb, err := raftboltdb.NewBoltStore(filepath.Join(baseDir, "stable.dat"))
if err != nil {
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err)
}
fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr)
if err != nil {
return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
}
tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{grpc.WithTransportCredentials(getTLSCredentials())})
r, err := raft.NewRaft(c, nil, ldb, sdb, fss, tm.Transport())
if err != nil {
return nil, nil, fmt.Errorf("raft.NewRaft: %v", err)
}
s, err := scheduler.New()
if err != nil {
return nil, nil, fmt.Errorf("scheduler: %v", err)
}
// Observer pour surveiller les changements d'état
stateCh := make(chan raft.Observation, 1) // Canal de type raft.Observation
r.RegisterObserver(raft.NewObserver(stateCh, true, nil))
node := &RaftNode{
Raft: r,
NodeID: myID,
StateCh: stateCh,
scheduler: s,
}
go node.watchStateChanges()
hasState, _ := checkIfStateExists(ldb)
if myAddress == "172.16.9.161:4410" && !hasState {
println("Démarrage du bootstrap ! ")
cfg := raft.Configuration{
Servers: peers,
}
f := r.BootstrapCluster(cfg)
if err := f.Error(); err != nil {
return nil, nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
}
}
return r, tm, nil
}*/

View File

@ -6,9 +6,11 @@ import (
"sync"
"time"
pb "github.com/Jille/raft-grpc-transport/proto"
pb "deevirt.fr/compute/pkg/api/proto"
"github.com/hashicorp/raft"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// These are calls from the Raft engine that we need to send out over gRPC.
@ -52,7 +54,7 @@ func (r raftAPI) getPeer(target raft.ServerAddress) (pb.RaftTransportClient, err
}
defer c.mtx.Unlock()
if c.clientConn == nil {
conn, err := grpc.Dial(string(target), r.manager.dialOptions...)
conn, err := grpc.NewClient(string(target), r.manager.dialOptions...)
if err != nil {
return nil, err
}
@ -74,7 +76,14 @@ func (r raftAPI) AppendEntries(id raft.ServerID, target raft.ServerAddress, args
ctx, cancel = context.WithTimeout(ctx, r.manager.heartbeatTimeout)
defer cancel()
}
ret, err := c.AppendEntries(ctx, encodeAppendEntriesRequest(args))
appendEntriesRequest := encodeAppendEntriesRequest(args)
ret, err := c.AppendEntries(ctx, appendEntriesRequest)
if statusErr, ok := status.FromError(err); ok && statusErr.Code() == codes.ResourceExhausted {
chunkedRet, chunkedErr := r.appendEntriesChunked(ctx, r.manager.appendEntriesChunkSize, c, appendEntriesRequest)
if statusErr, ok := status.FromError(chunkedErr); ok && statusErr.Code() != codes.Unimplemented {
ret, err = chunkedRet, chunkedErr
}
}
if err != nil {
return err
}
@ -82,6 +91,21 @@ func (r raftAPI) AppendEntries(id raft.ServerID, target raft.ServerAddress, args
return nil
}
// AppendEntries sends the appropriate RPC to the target node.
func (r raftAPI) appendEntriesChunked(ctx context.Context, chunkSize int, c pb.RaftTransportClient, appendEntriesRequest *pb.AppendEntriesRequest) (*pb.AppendEntriesResponse, error) {
stream, err := c.AppendEntriesChunked(ctx)
if err != nil {
return &pb.AppendEntriesResponse{}, err
}
defer stream.CloseSend()
if err := sendAppendEntriesChunkedRequest(chunkSize, stream, appendEntriesRequest); err != nil {
return &pb.AppendEntriesResponse{}, err
}
return stream.CloseAndRecv()
}
// RequestVote sends the appropriate RPC to the target node.
func (r raftAPI) RequestVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestVoteRequest, resp *raft.RequestVoteResponse) error {
c, err := r.getPeer(target)
@ -161,6 +185,11 @@ func (r raftAPI) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, re
return nil
}
type AppendEntriesPipelineInterface interface {
grpc.ClientStream
Recv() (*pb.AppendEntriesResponse, error)
}
// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
func (r raftAPI) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error) {
@ -170,38 +199,52 @@ func (r raftAPI) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddre
}
ctx := context.TODO()
ctx, cancel := context.WithCancel(ctx)
stream, err := c.AppendEntriesPipeline(ctx)
var stream AppendEntriesPipelineInterface
stream, err = c.AppendEntriesChunkedPipeline(ctx)
if statusErr, ok := status.FromError(err); ok && statusErr.Code() == codes.Unimplemented {
stream, err = c.AppendEntriesPipeline(ctx)
}
if err != nil {
cancel()
return nil, err
}
rpa := raftPipelineAPI{
stream: stream,
cancel: cancel,
inflightCh: make(chan *appendFuture, 20),
doneCh: make(chan raft.AppendFuture, 20),
rpa := &raftPipelineAPI{
stream: stream,
appendEntriesChunkSize: r.manager.appendEntriesChunkSize,
cancel: cancel,
inflightCh: make(chan *appendFuture, 20),
doneCh: make(chan raft.AppendFuture, 20),
}
go rpa.receiver()
return rpa, nil
}
type raftPipelineAPI struct {
stream pb.RaftTransport_AppendEntriesPipelineClient
cancel func()
inflightChMtx sync.Mutex
inflightCh chan *appendFuture
doneCh chan raft.AppendFuture
stream AppendEntriesPipelineInterface
appendEntriesChunkSize int
cancel func()
inflightChMtx sync.Mutex
inflightCh chan *appendFuture
doneCh chan raft.AppendFuture
}
// AppendEntries is used to add another request to the pipeline.
// The send may block which is an effective form of back-pressure.
func (r raftPipelineAPI) AppendEntries(req *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) (raft.AppendFuture, error) {
func (r *raftPipelineAPI) AppendEntries(req *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) (raft.AppendFuture, error) {
af := &appendFuture{
start: time.Now(),
request: req,
done: make(chan struct{}),
}
if err := r.stream.Send(encodeAppendEntriesRequest(req)); err != nil {
var err error
appendEntriesRequest := encodeAppendEntriesRequest(req)
switch stream := r.stream.(type) {
case pb.RaftTransport_AppendEntriesPipelineClient:
err = stream.Send(appendEntriesRequest)
case pb.RaftTransport_AppendEntriesChunkedPipelineClient:
err = sendAppendEntriesChunkedRequest(r.appendEntriesChunkSize, stream, appendEntriesRequest)
}
if err != nil {
return nil, err
}
r.inflightChMtx.Lock()
@ -216,12 +259,12 @@ func (r raftPipelineAPI) AppendEntries(req *raft.AppendEntriesRequest, resp *raf
// Consumer returns a channel that can be used to consume
// response futures when they are ready.
func (r raftPipelineAPI) Consumer() <-chan raft.AppendFuture {
func (r *raftPipelineAPI) Consumer() <-chan raft.AppendFuture {
return r.doneCh
}
// Close closes the pipeline and cancels all inflight RPCs
func (r raftPipelineAPI) Close() error {
func (r *raftPipelineAPI) Close() error {
r.cancel()
r.inflightChMtx.Lock()
close(r.inflightCh)
@ -229,7 +272,7 @@ func (r raftPipelineAPI) Close() error {
return nil
}
func (r raftPipelineAPI) receiver() {
func (r *raftPipelineAPI) receiver() {
for af := range r.inflightCh {
msg, err := r.stream.Recv()
if err != nil {
@ -319,8 +362,8 @@ func (r raftAPI) Disconnect(target raft.ServerAddress) {
r.manager.connectionsMtx.Unlock()
if ok {
c.mtx.Lock()
c.mtx.Unlock()
_ = c.clientConn.Close()
c.mtx.Unlock()
}
}

View File

@ -0,0 +1,79 @@
package transport
import (
pb "deevirt.fr/compute/pkg/api/proto"
"google.golang.org/protobuf/proto"
)
type AppendEntriesChunkedRequestStreamSender interface {
Send(*pb.AppendEntriesChunkedRequest) error
}
func sendAppendEntriesChunkedRequest(chunkSize int, stream AppendEntriesChunkedRequestStreamSender, appendEntriesRequest *pb.AppendEntriesRequest) error {
reqBuf, err := proto.Marshal(appendEntriesRequest)
if err != nil {
return err
}
reqSize := len(reqBuf)
numChunks := reqSize / chunkSize
if reqSize%chunkSize != 0 {
numChunks++
}
remainingBytes := reqSize
for chunkIdx := 0; chunkIdx < numChunks; chunkIdx++ {
lowerBound := chunkIdx * chunkSize
upperBound := (chunkIdx + 1) * chunkSize
if reqSize < upperBound {
upperBound = reqSize
}
remainingBytes -= upperBound - lowerBound
chunk := &pb.AppendEntriesChunkedRequest{
RemainingBytes: int64(remainingBytes),
Chunk: reqBuf[lowerBound:upperBound],
}
if err := stream.Send(chunk); err != nil {
return err
}
}
return nil
}
type AppendEntriesChunkedRequestStreamReceiver interface {
Recv() (*pb.AppendEntriesChunkedRequest, error)
}
func receiveAppendEntriesChunkedRequest(stream AppendEntriesChunkedRequestStreamReceiver) (*pb.AppendEntriesRequest, error) {
var reqBuf []byte
chunk, err := stream.Recv()
if err != nil {
return &pb.AppendEntriesRequest{}, err
}
if chunk.RemainingBytes == 0 {
reqBuf = chunk.Chunk
} else {
reqBuf = make([]byte, len(chunk.Chunk)+int(chunk.RemainingBytes))
lowerBound := copy(reqBuf, chunk.Chunk)
for chunk.RemainingBytes > 0 {
chunk, err = stream.Recv()
if err != nil {
return &pb.AppendEntriesRequest{}, err
}
lowerBound += copy(reqBuf[lowerBound:], chunk.Chunk)
}
}
appendEntriesRequest := new(pb.AppendEntriesRequest)
if err := proto.Unmarshal(reqBuf, appendEntriesRequest); err != nil {
return &pb.AppendEntriesRequest{}, err
}
return appendEntriesRequest, nil
}

View File

@ -1,7 +1,7 @@
package transport
import (
pb "github.com/Jille/raft-grpc-transport/proto"
pb "deevirt.fr/compute/pkg/api/proto"
"github.com/hashicorp/raft"
)

View File

@ -4,7 +4,7 @@ import (
"context"
"io"
pb "github.com/Jille/raft-grpc-transport/proto"
pb "deevirt.fr/compute/pkg/api/proto"
"github.com/hashicorp/raft"
)
@ -60,6 +60,20 @@ func (g gRPCAPI) AppendEntries(ctx context.Context, req *pb.AppendEntriesRequest
return encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse)), nil
}
func (g gRPCAPI) AppendEntriesChunked(stream pb.RaftTransport_AppendEntriesChunkedServer) error {
appendEntriesRequest, err := receiveAppendEntriesChunkedRequest(stream)
if err != nil {
return err
}
resp, err := g.handleRPC(decodeAppendEntriesRequest(appendEntriesRequest), nil)
if err != nil {
return err
}
return stream.SendAndClose(encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse)))
}
func (g gRPCAPI) RequestVote(ctx context.Context, req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) {
resp, err := g.handleRPC(decodeRequestVoteRequest(req), nil)
if err != nil {
@ -137,10 +151,26 @@ func (g gRPCAPI) AppendEntriesPipeline(s pb.RaftTransport_AppendEntriesPipelineS
}
}
func (g gRPCAPI) AppendEntriesChunkedPipeline(s pb.RaftTransport_AppendEntriesChunkedPipelineServer) error {
for {
msg, err := receiveAppendEntriesChunkedRequest(s)
if err != nil {
return err
}
resp, err := g.handleRPC(decodeAppendEntriesRequest(msg), nil)
if err != nil {
return err
}
if err := s.Send(encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse))); err != nil {
return err
}
}
}
func isHeartbeat(command interface{}) bool {
req, ok := command.(*raft.AppendEntriesRequest)
if !ok {
return false
}
return req.Term != 0 && len(req.Leader) != 0 && req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && len(req.Entries) == 0 && req.LeaderCommitIndex == 0
return req.Term != 0 && len(req.Addr) != 0 && req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && len(req.Entries) == 0 && req.LeaderCommitIndex == 0
}

View File

@ -0,0 +1,23 @@
package transport
import "time"
type Option func(m *Manager)
// WithHeartbeatTimeout configures the transport to not wait for more than d
// for a heartbeat to be executed by a remote peer.
func WithHeartbeatTimeout(d time.Duration) Option {
return func(m *Manager) {
m.heartbeatTimeout = d
}
}
// WithAppendEntriesChunkSize configures the chunk size to use when switching
// to chunked AppendEntries. The default value is 4MB (the gRPC default), but
// as there is no way to auto-discover that value, it's up to the developer
// to configure this, if the default value is not appropriate.
func WithAppendEntriesChunkSize(v int) Option {
return func(m *Manager) {
m.appendEntriesChunkSize = v
}
}

View File

@ -1,16 +1,16 @@
package transport
import (
pb "github.com/Jille/raft-grpc-transport/proto"
pb "deevirt.fr/compute/pkg/api/proto"
"github.com/hashicorp/raft"
"google.golang.org/protobuf/types/known/timestamppb"
)
func encodeAppendEntriesRequest(s *raft.AppendEntriesRequest) *pb.AppendEntriesRequest {
return &pb.AppendEntriesRequest{
RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term,
Leader: s.Leader,
RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term,
//Leader: s.Leader,
PrevLogEntry: s.PrevLogEntry,
PrevLogTerm: s.PrevLogTerm,
Entries: encodeLogs(s.Entries),
@ -76,9 +76,9 @@ func encodeAppendEntriesResponse(s *raft.AppendEntriesResponse) *pb.AppendEntrie
func encodeRequestVoteRequest(s *raft.RequestVoteRequest) *pb.RequestVoteRequest {
return &pb.RequestVoteRequest{
RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term,
Candidate: s.Candidate,
RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term,
//Candidate: s.Candidate,
LastLogIndex: s.LastLogIndex,
LastLogTerm: s.LastLogTerm,
LeadershipTransfer: s.LeadershipTransfer,

View File

@ -5,7 +5,7 @@ import (
"sync"
"time"
pb "github.com/Jille/raft-grpc-transport/proto"
pb "deevirt.fr/compute/pkg/api/proto"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/raft"
"github.com/pkg/errors"
@ -25,8 +25,9 @@ type Manager struct {
heartbeatFuncMtx sync.Mutex
heartbeatTimeout time.Duration
connectionsMtx sync.Mutex
connections map[raft.ServerAddress]*conn
connectionsMtx sync.Mutex
connections map[raft.ServerAddress]*conn
appendEntriesChunkSize int
shutdown bool
shutdownCh chan struct{}
@ -39,8 +40,9 @@ func New(localAddress raft.ServerAddress, dialOptions []grpc.DialOption, options
localAddress: localAddress,
dialOptions: dialOptions,
rpcChan: make(chan raft.RPC),
connections: map[raft.ServerAddress]*conn{},
rpcChan: make(chan raft.RPC),
connections: map[raft.ServerAddress]*conn{},
appendEntriesChunkSize: 4*1024*1024 - 10, // same as gRPC default value (minus some overhead)
shutdownCh: make(chan struct{}),
}
@ -81,8 +83,8 @@ func (m *Manager) disconnectAll() error {
for k, conn := range m.connections {
// Lock conn.mtx to ensure Dial() is complete
conn.mtx.Lock()
conn.mtx.Unlock()
closeErr := conn.clientConn.Close()
conn.mtx.Unlock()
if closeErr != nil {
err = multierror.Append(err, closeErr)
}

View File

@ -1,25 +0,0 @@
BSD 2-Clause License
Copyright (c) 2020, Jille Timmermans
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,20 +0,0 @@
# raft-grpc-transport
[![Godoc](https://godoc.org/github.com/Jille/raft-grpc-transport?status.svg)](https://godoc.org/github.com/Jille/raft-grpc-transport)
This library provides a [Transport](https://godoc.org/github.com/hashicorp/raft#Transport) for https://github.com/hashicorp/raft over gRPC.
One benefit of this is that gRPC is easy to multiplex over a single port.
## Usage
```go
// ...
tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{grpc.WithInsecure()})
s := grpc.NewServer()
tm.Register(s)
r, err := raft.NewRaft(..., tm.Transport())
// ...
```
Want more example code? Check out main.go at https://github.com/Jille/raft-grpc-example

View File

@ -1,13 +0,0 @@
package transport
import "time"
type Option func(m *Manager)
// WithHeartbeatTimeout configures the transport to not wait for more than d
// for a heartbeat to be executed by a remote peer.
func WithHeartbeatTimeout(d time.Duration) Option {
return func(m *Manager) {
m.heartbeatTimeout = d
}
}

View File

@ -1,6 +0,0 @@
transport.pb.go: transport.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative transport.proto
force:
rm -f transport.pb.go
make transport.pb.go

2
vendor/modules.txt vendored
View File

@ -1,7 +1,5 @@
# github.com/Jille/raft-grpc-transport v1.6.1
## explicit; go 1.13
github.com/Jille/raft-grpc-transport
github.com/Jille/raft-grpc-transport/proto
# github.com/armon/go-metrics v0.4.1
## explicit; go 1.12
github.com/armon/go-metrics