Récupération du projet, l'auteur ne le suivant plus beaucoup

This commit is contained in:
Mickael BOURNEUF 2025-02-16 19:29:30 +01:00
parent 4edb3ff0eb
commit 7843d0ba75
13 changed files with 0 additions and 2807 deletions

View File

@ -1,25 +0,0 @@
BSD 2-Clause License
Copyright (c) 2020, Jille Timmermans
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,20 +0,0 @@
# raft-grpc-transport
[![Godoc](https://godoc.org/github.com/Jille/raft-grpc-transport?status.svg)](https://godoc.org/github.com/Jille/raft-grpc-transport)
This library provides a [Transport](https://godoc.org/github.com/hashicorp/raft#Transport) for https://github.com/hashicorp/raft over gRPC.
One benefit of this is that gRPC is easy to multiplex over a single port.
## Usage
```go
// ...
tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{grpc.WithInsecure()})
s := grpc.NewServer()
tm.Register(s)
r, err := raft.NewRaft(..., tm.Transport())
// ...
```
Want more example code? Check out main.go at https://github.com/Jille/raft-grpc-example

View File

@ -1,146 +0,0 @@
package transport
import (
pb "github.com/Jille/raft-grpc-transport/proto"
"github.com/hashicorp/raft"
)
func decodeAppendEntriesRequest(m *pb.AppendEntriesRequest) *raft.AppendEntriesRequest {
return &raft.AppendEntriesRequest{
RPCHeader: decodeRPCHeader(m.RpcHeader),
Term: m.Term,
Leader: m.Leader,
PrevLogEntry: m.PrevLogEntry,
PrevLogTerm: m.PrevLogTerm,
Entries: decodeLogs(m.Entries),
LeaderCommitIndex: m.LeaderCommitIndex,
}
}
func decodeRPCHeader(m *pb.RPCHeader) raft.RPCHeader {
return raft.RPCHeader{
ProtocolVersion: raft.ProtocolVersion(m.ProtocolVersion),
ID: m.Id,
Addr: m.Addr,
}
}
func decodeLogs(m []*pb.Log) []*raft.Log {
ret := make([]*raft.Log, len(m))
for i, l := range m {
ret[i] = decodeLog(l)
}
return ret
}
func decodeLog(m *pb.Log) *raft.Log {
return &raft.Log{
Index: m.Index,
Term: m.Term,
Type: decodeLogType(m.Type),
Data: m.Data,
Extensions: m.Extensions,
AppendedAt: m.AppendedAt.AsTime(),
}
}
func decodeLogType(m pb.Log_LogType) raft.LogType {
switch m {
case pb.Log_LOG_COMMAND:
return raft.LogCommand
case pb.Log_LOG_NOOP:
return raft.LogNoop
case pb.Log_LOG_ADD_PEER_DEPRECATED:
return raft.LogAddPeerDeprecated
case pb.Log_LOG_REMOVE_PEER_DEPRECATED:
return raft.LogRemovePeerDeprecated
case pb.Log_LOG_BARRIER:
return raft.LogBarrier
case pb.Log_LOG_CONFIGURATION:
return raft.LogConfiguration
default:
panic("invalid LogType")
}
}
func decodeAppendEntriesResponse(m *pb.AppendEntriesResponse) *raft.AppendEntriesResponse {
return &raft.AppendEntriesResponse{
RPCHeader: decodeRPCHeader(m.RpcHeader),
Term: m.Term,
LastLog: m.LastLog,
Success: m.Success,
NoRetryBackoff: m.NoRetryBackoff,
}
}
func decodeRequestVoteRequest(m *pb.RequestVoteRequest) *raft.RequestVoteRequest {
return &raft.RequestVoteRequest{
RPCHeader: decodeRPCHeader(m.RpcHeader),
Term: m.Term,
Candidate: m.Candidate,
LastLogIndex: m.LastLogIndex,
LastLogTerm: m.LastLogTerm,
LeadershipTransfer: m.LeadershipTransfer,
}
}
func decodeRequestVoteResponse(m *pb.RequestVoteResponse) *raft.RequestVoteResponse {
return &raft.RequestVoteResponse{
RPCHeader: decodeRPCHeader(m.RpcHeader),
Term: m.Term,
Peers: m.Peers,
Granted: m.Granted,
}
}
func decodeInstallSnapshotRequest(m *pb.InstallSnapshotRequest) *raft.InstallSnapshotRequest {
return &raft.InstallSnapshotRequest{
RPCHeader: decodeRPCHeader(m.RpcHeader),
SnapshotVersion: raft.SnapshotVersion(m.SnapshotVersion),
Term: m.Term,
Leader: m.Leader,
LastLogIndex: m.LastLogIndex,
LastLogTerm: m.LastLogTerm,
Peers: m.Peers,
Configuration: m.Configuration,
ConfigurationIndex: m.ConfigurationIndex,
Size: m.Size,
}
}
func decodeInstallSnapshotResponse(m *pb.InstallSnapshotResponse) *raft.InstallSnapshotResponse {
return &raft.InstallSnapshotResponse{
RPCHeader: decodeRPCHeader(m.RpcHeader),
Term: m.Term,
Success: m.Success,
}
}
func decodeTimeoutNowRequest(m *pb.TimeoutNowRequest) *raft.TimeoutNowRequest {
return &raft.TimeoutNowRequest{
RPCHeader: decodeRPCHeader(m.RpcHeader),
}
}
func decodeTimeoutNowResponse(m *pb.TimeoutNowResponse) *raft.TimeoutNowResponse {
return &raft.TimeoutNowResponse{
RPCHeader: decodeRPCHeader(m.RpcHeader),
}
}
func decodeRequestPreVoteRequest(m *pb.RequestPreVoteRequest) *raft.RequestPreVoteRequest {
return &raft.RequestPreVoteRequest{
RPCHeader: decodeRPCHeader(m.RpcHeader),
Term: m.Term,
LastLogIndex: m.LastLogIndex,
LastLogTerm: m.LastLogTerm,
}
}
func decodeRequestPreVoteResponse(m *pb.RequestPreVoteResponse) *raft.RequestPreVoteResponse {
return &raft.RequestPreVoteResponse{
RPCHeader: decodeRPCHeader(m.RpcHeader),
Term: m.Term,
Granted: m.Granted,
}
}

View File

@ -1,146 +0,0 @@
package transport
import (
"context"
"io"
pb "github.com/Jille/raft-grpc-transport/proto"
"github.com/hashicorp/raft"
)
// These are requests incoming over gRPC that we need to relay to the Raft engine.
type gRPCAPI struct {
manager *Manager
// "Unsafe" to ensure compilation fails if new methods are added but not implemented
pb.UnsafeRaftTransportServer
}
func (g gRPCAPI) handleRPC(command interface{}, data io.Reader) (interface{}, error) {
ch := make(chan raft.RPCResponse, 1)
rpc := raft.RPC{
Command: command,
RespChan: ch,
Reader: data,
}
if isHeartbeat(command) {
// We can take the fast path and use the heartbeat callback and skip the queue in g.manager.rpcChan.
g.manager.heartbeatFuncMtx.Lock()
fn := g.manager.heartbeatFunc
g.manager.heartbeatFuncMtx.Unlock()
if fn != nil {
fn(rpc)
goto wait
}
}
select {
case g.manager.rpcChan <- rpc:
case <-g.manager.shutdownCh:
return nil, raft.ErrTransportShutdown
}
wait:
select {
case resp := <-ch:
if resp.Error != nil {
return nil, resp.Error
}
return resp.Response, nil
case <-g.manager.shutdownCh:
return nil, raft.ErrTransportShutdown
}
}
func (g gRPCAPI) AppendEntries(ctx context.Context, req *pb.AppendEntriesRequest) (*pb.AppendEntriesResponse, error) {
resp, err := g.handleRPC(decodeAppendEntriesRequest(req), nil)
if err != nil {
return nil, err
}
return encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse)), nil
}
func (g gRPCAPI) RequestVote(ctx context.Context, req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) {
resp, err := g.handleRPC(decodeRequestVoteRequest(req), nil)
if err != nil {
return nil, err
}
return encodeRequestVoteResponse(resp.(*raft.RequestVoteResponse)), nil
}
func (g gRPCAPI) TimeoutNow(ctx context.Context, req *pb.TimeoutNowRequest) (*pb.TimeoutNowResponse, error) {
resp, err := g.handleRPC(decodeTimeoutNowRequest(req), nil)
if err != nil {
return nil, err
}
return encodeTimeoutNowResponse(resp.(*raft.TimeoutNowResponse)), nil
}
func (g gRPCAPI) RequestPreVote(ctx context.Context, req *pb.RequestPreVoteRequest) (*pb.RequestPreVoteResponse, error) {
resp, err := g.handleRPC(decodeRequestPreVoteRequest(req), nil)
if err != nil {
return nil, err
}
return encodeRequestPreVoteResponse(resp.(*raft.RequestPreVoteResponse)), nil
}
func (g gRPCAPI) InstallSnapshot(s pb.RaftTransport_InstallSnapshotServer) error {
isr, err := s.Recv()
if err != nil {
return err
}
resp, err := g.handleRPC(decodeInstallSnapshotRequest(isr), &snapshotStream{s, isr.GetData()})
if err != nil {
return err
}
return s.SendAndClose(encodeInstallSnapshotResponse(resp.(*raft.InstallSnapshotResponse)))
}
type snapshotStream struct {
s pb.RaftTransport_InstallSnapshotServer
buf []byte
}
func (s *snapshotStream) Read(b []byte) (int, error) {
if len(s.buf) > 0 {
n := copy(b, s.buf)
s.buf = s.buf[n:]
return n, nil
}
m, err := s.s.Recv()
if err != nil {
return 0, err
}
n := copy(b, m.GetData())
if n < len(m.GetData()) {
s.buf = m.GetData()[n:]
}
return n, nil
}
func (g gRPCAPI) AppendEntriesPipeline(s pb.RaftTransport_AppendEntriesPipelineServer) error {
for {
msg, err := s.Recv()
if err != nil {
return err
}
resp, err := g.handleRPC(decodeAppendEntriesRequest(msg), nil)
if err != nil {
// TODO(quis): One failure doesn't have to break the entire stream?
// Or does it all go wrong when it's out of order anyway?
return err
}
if err := s.Send(encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse))); err != nil {
return err
}
}
}
func isHeartbeat(command interface{}) bool {
req, ok := command.(*raft.AppendEntriesRequest)
if !ok {
return false
}
return req.Term != 0 && len(req.Leader) != 0 && req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && len(req.Entries) == 0 && req.LeaderCommitIndex == 0
}

View File

@ -1,13 +0,0 @@
package transport
import "time"
type Option func(m *Manager)
// WithHeartbeatTimeout configures the transport to not wait for more than d
// for a heartbeat to be executed by a remote peer.
func WithHeartbeatTimeout(d time.Duration) Option {
return func(m *Manager) {
m.heartbeatTimeout = d
}
}

View File

@ -1,6 +0,0 @@
transport.pb.go: transport.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative transport.proto
force:
rm -f transport.pb.go
make transport.pb.go

File diff suppressed because it is too large Load Diff

View File

@ -1,122 +0,0 @@
syntax = "proto3";
option go_package = "github.com/Jille/raft-grpc-transport/proto";
import "google/protobuf/timestamp.proto";
service RaftTransport {
// AppendEntriesPipeline opens an AppendEntries message stream.
rpc AppendEntriesPipeline(stream AppendEntriesRequest) returns (stream AppendEntriesResponse) {}
// AppendEntries performs a single append entries request / response.
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse) {}
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
rpc RequestVote(RequestVoteRequest) returns (RequestVoteResponse) {}
// TimeoutNow is used to start a leadership transfer to the target node.
rpc TimeoutNow(TimeoutNowRequest) returns (TimeoutNowResponse) {}
// InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.
rpc InstallSnapshot(stream InstallSnapshotRequest) returns (InstallSnapshotResponse) {}
// RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election.
rpc RequestPreVote(RequestPreVoteRequest) returns (RequestPreVoteResponse) {}
}
message RPCHeader {
int64 protocol_version = 1;
bytes id = 2;
bytes addr = 3;
}
message Log {
enum LogType {
LOG_COMMAND = 0;
LOG_NOOP = 1;
LOG_ADD_PEER_DEPRECATED = 2;
LOG_REMOVE_PEER_DEPRECATED = 3;
LOG_BARRIER = 4;
LOG_CONFIGURATION = 5;
}
uint64 index = 1;
uint64 term = 2;
LogType type = 3;
bytes data = 4;
bytes extensions = 5;
google.protobuf.Timestamp appended_at = 6;
}
message AppendEntriesRequest {
RPCHeader rpc_header = 1;
uint64 term = 2;
bytes leader = 3;
uint64 prev_log_entry = 4;
uint64 prev_log_term = 5;
repeated Log entries = 6;
uint64 leader_commit_index = 7;
}
message AppendEntriesResponse {
RPCHeader rpc_header = 1;
uint64 term = 2;
uint64 last_log = 3;
bool success = 4;
bool no_retry_backoff = 5;
}
message RequestVoteRequest {
RPCHeader rpc_header = 1;
uint64 term = 2;
bytes candidate = 3;
uint64 last_log_index = 4;
uint64 last_log_term = 5;
bool leadership_transfer = 6;
}
message RequestVoteResponse {
RPCHeader rpc_header = 1;
uint64 term = 2;
bytes peers = 3;
bool granted = 4;
}
message TimeoutNowRequest {
RPCHeader rpc_header = 1;
}
message TimeoutNowResponse {
RPCHeader rpc_header = 1;
}
// The first InstallSnapshotRequest on the stream contains all the metadata.
// All further messages contain only data.
message InstallSnapshotRequest {
RPCHeader rpc_header = 1;
int64 snapshot_version = 11;
uint64 term = 2;
bytes leader = 3;
uint64 last_log_index = 4;
uint64 last_log_term = 5;
bytes peers = 6;
bytes configuration = 7;
uint64 configuration_index = 8;
int64 size = 9;
bytes data = 10;
}
message InstallSnapshotResponse {
RPCHeader rpc_header = 1;
uint64 term = 2;
bool success = 3;
}
message RequestPreVoteRequest {
RPCHeader rpc_header = 1;
uint64 term = 2;
uint64 last_log_index = 3;
uint64 last_log_term = 4;
}
message RequestPreVoteResponse {
RPCHeader rpc_header = 1;
uint64 term = 2;
bool granted = 3;
}

View File

@ -1,364 +0,0 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.3
// source: transport.proto
package proto
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// RaftTransportClient is the client API for RaftTransport service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type RaftTransportClient interface {
// AppendEntriesPipeline opens an AppendEntries message stream.
AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_AppendEntriesPipelineClient, error)
// AppendEntries performs a single append entries request / response.
AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error)
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error)
// TimeoutNow is used to start a leadership transfer to the target node.
TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error)
// InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.
InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_InstallSnapshotClient, error)
// RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election.
RequestPreVote(ctx context.Context, in *RequestPreVoteRequest, opts ...grpc.CallOption) (*RequestPreVoteResponse, error)
}
type raftTransportClient struct {
cc grpc.ClientConnInterface
}
func NewRaftTransportClient(cc grpc.ClientConnInterface) RaftTransportClient {
return &raftTransportClient{cc}
}
func (c *raftTransportClient) AppendEntriesPipeline(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_AppendEntriesPipelineClient, error) {
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[0], "/RaftTransport/AppendEntriesPipeline", opts...)
if err != nil {
return nil, err
}
x := &raftTransportAppendEntriesPipelineClient{stream}
return x, nil
}
type RaftTransport_AppendEntriesPipelineClient interface {
Send(*AppendEntriesRequest) error
Recv() (*AppendEntriesResponse, error)
grpc.ClientStream
}
type raftTransportAppendEntriesPipelineClient struct {
grpc.ClientStream
}
func (x *raftTransportAppendEntriesPipelineClient) Send(m *AppendEntriesRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *raftTransportAppendEntriesPipelineClient) Recv() (*AppendEntriesResponse, error) {
m := new(AppendEntriesResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *raftTransportClient) AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error) {
out := new(AppendEntriesResponse)
err := c.cc.Invoke(ctx, "/RaftTransport/AppendEntries", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *raftTransportClient) RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error) {
out := new(RequestVoteResponse)
err := c.cc.Invoke(ctx, "/RaftTransport/RequestVote", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *raftTransportClient) TimeoutNow(ctx context.Context, in *TimeoutNowRequest, opts ...grpc.CallOption) (*TimeoutNowResponse, error) {
out := new(TimeoutNowResponse)
err := c.cc.Invoke(ctx, "/RaftTransport/TimeoutNow", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *raftTransportClient) InstallSnapshot(ctx context.Context, opts ...grpc.CallOption) (RaftTransport_InstallSnapshotClient, error) {
stream, err := c.cc.NewStream(ctx, &RaftTransport_ServiceDesc.Streams[1], "/RaftTransport/InstallSnapshot", opts...)
if err != nil {
return nil, err
}
x := &raftTransportInstallSnapshotClient{stream}
return x, nil
}
type RaftTransport_InstallSnapshotClient interface {
Send(*InstallSnapshotRequest) error
CloseAndRecv() (*InstallSnapshotResponse, error)
grpc.ClientStream
}
type raftTransportInstallSnapshotClient struct {
grpc.ClientStream
}
func (x *raftTransportInstallSnapshotClient) Send(m *InstallSnapshotRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *raftTransportInstallSnapshotClient) CloseAndRecv() (*InstallSnapshotResponse, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(InstallSnapshotResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *raftTransportClient) RequestPreVote(ctx context.Context, in *RequestPreVoteRequest, opts ...grpc.CallOption) (*RequestPreVoteResponse, error) {
out := new(RequestPreVoteResponse)
err := c.cc.Invoke(ctx, "/RaftTransport/RequestPreVote", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// RaftTransportServer is the server API for RaftTransport service.
// All implementations must embed UnimplementedRaftTransportServer
// for forward compatibility
type RaftTransportServer interface {
// AppendEntriesPipeline opens an AppendEntries message stream.
AppendEntriesPipeline(RaftTransport_AppendEntriesPipelineServer) error
// AppendEntries performs a single append entries request / response.
AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error)
// RequestVote is the command used by a candidate to ask a Raft peer for a vote in an election.
RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error)
// TimeoutNow is used to start a leadership transfer to the target node.
TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error)
// InstallSnapshot is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.
InstallSnapshot(RaftTransport_InstallSnapshotServer) error
// RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election.
RequestPreVote(context.Context, *RequestPreVoteRequest) (*RequestPreVoteResponse, error)
mustEmbedUnimplementedRaftTransportServer()
}
// UnimplementedRaftTransportServer must be embedded to have forward compatible implementations.
type UnimplementedRaftTransportServer struct {
}
func (UnimplementedRaftTransportServer) AppendEntriesPipeline(RaftTransport_AppendEntriesPipelineServer) error {
return status.Errorf(codes.Unimplemented, "method AppendEntriesPipeline not implemented")
}
func (UnimplementedRaftTransportServer) AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AppendEntries not implemented")
}
func (UnimplementedRaftTransportServer) RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RequestVote not implemented")
}
func (UnimplementedRaftTransportServer) TimeoutNow(context.Context, *TimeoutNowRequest) (*TimeoutNowResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method TimeoutNow not implemented")
}
func (UnimplementedRaftTransportServer) InstallSnapshot(RaftTransport_InstallSnapshotServer) error {
return status.Errorf(codes.Unimplemented, "method InstallSnapshot not implemented")
}
func (UnimplementedRaftTransportServer) RequestPreVote(context.Context, *RequestPreVoteRequest) (*RequestPreVoteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RequestPreVote not implemented")
}
func (UnimplementedRaftTransportServer) mustEmbedUnimplementedRaftTransportServer() {}
// UnsafeRaftTransportServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to RaftTransportServer will
// result in compilation errors.
type UnsafeRaftTransportServer interface {
mustEmbedUnimplementedRaftTransportServer()
}
func RegisterRaftTransportServer(s grpc.ServiceRegistrar, srv RaftTransportServer) {
s.RegisterService(&RaftTransport_ServiceDesc, srv)
}
func _RaftTransport_AppendEntriesPipeline_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RaftTransportServer).AppendEntriesPipeline(&raftTransportAppendEntriesPipelineServer{stream})
}
type RaftTransport_AppendEntriesPipelineServer interface {
Send(*AppendEntriesResponse) error
Recv() (*AppendEntriesRequest, error)
grpc.ServerStream
}
type raftTransportAppendEntriesPipelineServer struct {
grpc.ServerStream
}
func (x *raftTransportAppendEntriesPipelineServer) Send(m *AppendEntriesResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *raftTransportAppendEntriesPipelineServer) Recv() (*AppendEntriesRequest, error) {
m := new(AppendEntriesRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _RaftTransport_AppendEntries_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AppendEntriesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RaftTransportServer).AppendEntries(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/RaftTransport/AppendEntries",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RaftTransportServer).AppendEntries(ctx, req.(*AppendEntriesRequest))
}
return interceptor(ctx, in, info, handler)
}
func _RaftTransport_RequestVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestVoteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RaftTransportServer).RequestVote(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/RaftTransport/RequestVote",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RaftTransportServer).RequestVote(ctx, req.(*RequestVoteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _RaftTransport_TimeoutNow_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TimeoutNowRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RaftTransportServer).TimeoutNow(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/RaftTransport/TimeoutNow",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RaftTransportServer).TimeoutNow(ctx, req.(*TimeoutNowRequest))
}
return interceptor(ctx, in, info, handler)
}
func _RaftTransport_InstallSnapshot_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RaftTransportServer).InstallSnapshot(&raftTransportInstallSnapshotServer{stream})
}
type RaftTransport_InstallSnapshotServer interface {
SendAndClose(*InstallSnapshotResponse) error
Recv() (*InstallSnapshotRequest, error)
grpc.ServerStream
}
type raftTransportInstallSnapshotServer struct {
grpc.ServerStream
}
func (x *raftTransportInstallSnapshotServer) SendAndClose(m *InstallSnapshotResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *raftTransportInstallSnapshotServer) Recv() (*InstallSnapshotRequest, error) {
m := new(InstallSnapshotRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _RaftTransport_RequestPreVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestPreVoteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RaftTransportServer).RequestPreVote(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/RaftTransport/RequestPreVote",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RaftTransportServer).RequestPreVote(ctx, req.(*RequestPreVoteRequest))
}
return interceptor(ctx, in, info, handler)
}
// RaftTransport_ServiceDesc is the grpc.ServiceDesc for RaftTransport service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var RaftTransport_ServiceDesc = grpc.ServiceDesc{
ServiceName: "RaftTransport",
HandlerType: (*RaftTransportServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "AppendEntries",
Handler: _RaftTransport_AppendEntries_Handler,
},
{
MethodName: "RequestVote",
Handler: _RaftTransport_RequestVote_Handler,
},
{
MethodName: "TimeoutNow",
Handler: _RaftTransport_TimeoutNow_Handler,
},
{
MethodName: "RequestPreVote",
Handler: _RaftTransport_RequestPreVote_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "AppendEntriesPipeline",
Handler: _RaftTransport_AppendEntriesPipeline_Handler,
ServerStreams: true,
ClientStreams: true,
},
{
StreamName: "InstallSnapshot",
Handler: _RaftTransport_InstallSnapshot_Handler,
ClientStreams: true,
},
},
Metadata: "transport.proto",
}

View File

@ -1,329 +0,0 @@
package transport
import (
"context"
"io"
"sync"
"time"
pb "github.com/Jille/raft-grpc-transport/proto"
"github.com/hashicorp/raft"
"google.golang.org/grpc"
)
// These are calls from the Raft engine that we need to send out over gRPC.
type raftAPI struct {
manager *Manager
}
var _ raft.Transport = raftAPI{}
var _ raft.WithClose = raftAPI{}
var _ raft.WithPeers = raftAPI{}
var _ raft.WithPreVote = raftAPI{}
type conn struct {
clientConn *grpc.ClientConn
client pb.RaftTransportClient
mtx sync.Mutex
}
// Consumer returns a channel that can be used to consume and respond to RPC requests.
func (r raftAPI) Consumer() <-chan raft.RPC {
return r.manager.rpcChan
}
// LocalAddr is used to return our local address to distinguish from our peers.
func (r raftAPI) LocalAddr() raft.ServerAddress {
return r.manager.localAddress
}
func (r raftAPI) getPeer(target raft.ServerAddress) (pb.RaftTransportClient, error) {
r.manager.connectionsMtx.Lock()
c, ok := r.manager.connections[target]
if !ok {
c = &conn{}
c.mtx.Lock()
r.manager.connections[target] = c
}
r.manager.connectionsMtx.Unlock()
if ok {
c.mtx.Lock()
}
defer c.mtx.Unlock()
if c.clientConn == nil {
conn, err := grpc.Dial(string(target), r.manager.dialOptions...)
if err != nil {
return nil, err
}
c.clientConn = conn
c.client = pb.NewRaftTransportClient(conn)
}
return c.client, nil
}
// AppendEntries sends the appropriate RPC to the target node.
func (r raftAPI) AppendEntries(id raft.ServerID, target raft.ServerAddress, args *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) error {
c, err := r.getPeer(target)
if err != nil {
return err
}
ctx := context.TODO()
if r.manager.heartbeatTimeout > 0 && isHeartbeat(args) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.manager.heartbeatTimeout)
defer cancel()
}
ret, err := c.AppendEntries(ctx, encodeAppendEntriesRequest(args))
if err != nil {
return err
}
*resp = *decodeAppendEntriesResponse(ret)
return nil
}
// RequestVote sends the appropriate RPC to the target node.
func (r raftAPI) RequestVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestVoteRequest, resp *raft.RequestVoteResponse) error {
c, err := r.getPeer(target)
if err != nil {
return err
}
ret, err := c.RequestVote(context.TODO(), encodeRequestVoteRequest(args))
if err != nil {
return err
}
*resp = *decodeRequestVoteResponse(ret)
return nil
}
// TimeoutNow is used to start a leadership transfer to the target node.
func (r raftAPI) TimeoutNow(id raft.ServerID, target raft.ServerAddress, args *raft.TimeoutNowRequest, resp *raft.TimeoutNowResponse) error {
c, err := r.getPeer(target)
if err != nil {
return err
}
ret, err := c.TimeoutNow(context.TODO(), encodeTimeoutNowRequest(args))
if err != nil {
return err
}
*resp = *decodeTimeoutNowResponse(ret)
return nil
}
// RequestPreVote is the command used by a candidate to ask a Raft peer for a vote in an election.
func (r raftAPI) RequestPreVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestPreVoteRequest, resp *raft.RequestPreVoteResponse) error {
c, err := r.getPeer(target)
if err != nil {
return err
}
ret, err := c.RequestPreVote(context.TODO(), encodeRequestPreVoteRequest(args))
if err != nil {
return err
}
*resp = *decodeRequestPreVoteResponse(ret)
return nil
}
// InstallSnapshot is used to push a snapshot down to a follower. The data is read from
// the ReadCloser and streamed to the client.
func (r raftAPI) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, req *raft.InstallSnapshotRequest, resp *raft.InstallSnapshotResponse, data io.Reader) error {
c, err := r.getPeer(target)
if err != nil {
return err
}
stream, err := c.InstallSnapshot(context.TODO())
if err != nil {
return err
}
if err := stream.Send(encodeInstallSnapshotRequest(req)); err != nil {
return err
}
var buf [16384]byte
for {
n, err := data.Read(buf[:])
if err == io.EOF || (err == nil && n == 0) {
break
}
if err != nil {
return err
}
if err := stream.Send(&pb.InstallSnapshotRequest{
Data: buf[:n],
}); err != nil {
return err
}
}
ret, err := stream.CloseAndRecv()
if err != nil {
return err
}
*resp = *decodeInstallSnapshotResponse(ret)
return nil
}
// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
func (r raftAPI) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error) {
c, err := r.getPeer(target)
if err != nil {
return nil, err
}
ctx := context.TODO()
ctx, cancel := context.WithCancel(ctx)
stream, err := c.AppendEntriesPipeline(ctx)
if err != nil {
cancel()
return nil, err
}
rpa := raftPipelineAPI{
stream: stream,
cancel: cancel,
inflightCh: make(chan *appendFuture, 20),
doneCh: make(chan raft.AppendFuture, 20),
}
go rpa.receiver()
return rpa, nil
}
type raftPipelineAPI struct {
stream pb.RaftTransport_AppendEntriesPipelineClient
cancel func()
inflightChMtx sync.Mutex
inflightCh chan *appendFuture
doneCh chan raft.AppendFuture
}
// AppendEntries is used to add another request to the pipeline.
// The send may block which is an effective form of back-pressure.
func (r raftPipelineAPI) AppendEntries(req *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) (raft.AppendFuture, error) {
af := &appendFuture{
start: time.Now(),
request: req,
done: make(chan struct{}),
}
if err := r.stream.Send(encodeAppendEntriesRequest(req)); err != nil {
return nil, err
}
r.inflightChMtx.Lock()
select {
case <-r.stream.Context().Done():
default:
r.inflightCh <- af
}
r.inflightChMtx.Unlock()
return af, nil
}
// Consumer returns a channel that can be used to consume
// response futures when they are ready.
func (r raftPipelineAPI) Consumer() <-chan raft.AppendFuture {
return r.doneCh
}
// Close closes the pipeline and cancels all inflight RPCs
func (r raftPipelineAPI) Close() error {
r.cancel()
r.inflightChMtx.Lock()
close(r.inflightCh)
r.inflightChMtx.Unlock()
return nil
}
func (r raftPipelineAPI) receiver() {
for af := range r.inflightCh {
msg, err := r.stream.Recv()
if err != nil {
af.err = err
} else {
af.response = *decodeAppendEntriesResponse(msg)
}
close(af.done)
r.doneCh <- af
}
}
type appendFuture struct {
raft.AppendFuture
start time.Time
request *raft.AppendEntriesRequest
response raft.AppendEntriesResponse
err error
done chan struct{}
}
// Error blocks until the future arrives and then
// returns the error status of the future.
// This may be called any number of times - all
// calls will return the same value.
// Note that it is not OK to call this method
// twice concurrently on the same Future instance.
func (f *appendFuture) Error() error {
<-f.done
return f.err
}
// Start returns the time that the append request was started.
// It is always OK to call this method.
func (f *appendFuture) Start() time.Time {
return f.start
}
// Request holds the parameters of the AppendEntries call.
// It is always OK to call this method.
func (f *appendFuture) Request() *raft.AppendEntriesRequest {
return f.request
}
// Response holds the results of the AppendEntries call.
// This method must only be called after the Error
// method returns, and will only be valid on success.
func (f *appendFuture) Response() *raft.AppendEntriesResponse {
return &f.response
}
// EncodePeer is used to serialize a peer's address.
func (r raftAPI) EncodePeer(id raft.ServerID, addr raft.ServerAddress) []byte {
return []byte(addr)
}
// DecodePeer is used to deserialize a peer's address.
func (r raftAPI) DecodePeer(p []byte) raft.ServerAddress {
return raft.ServerAddress(p)
}
// SetHeartbeatHandler is used to setup a heartbeat handler
// as a fast-pass. This is to avoid head-of-line blocking from
// disk IO. If a Transport does not support this, it can simply
// ignore the call, and push the heartbeat onto the Consumer channel.
func (r raftAPI) SetHeartbeatHandler(cb func(rpc raft.RPC)) {
r.manager.heartbeatFuncMtx.Lock()
r.manager.heartbeatFunc = cb
r.manager.heartbeatFuncMtx.Unlock()
}
func (r raftAPI) Close() error {
return r.manager.Close()
}
func (r raftAPI) Connect(target raft.ServerAddress, t raft.Transport) {
_, _ = r.getPeer(target)
}
func (r raftAPI) Disconnect(target raft.ServerAddress) {
r.manager.connectionsMtx.Lock()
c, ok := r.manager.connections[target]
if !ok {
delete(r.manager.connections, target)
}
r.manager.connectionsMtx.Unlock()
if ok {
c.mtx.Lock()
c.mtx.Unlock()
_ = c.clientConn.Close()
}
}
func (r raftAPI) DisconnectAll() {
_ = r.manager.disconnectAll()
}

View File

@ -1,147 +0,0 @@
package transport
import (
pb "github.com/Jille/raft-grpc-transport/proto"
"github.com/hashicorp/raft"
"google.golang.org/protobuf/types/known/timestamppb"
)
func encodeAppendEntriesRequest(s *raft.AppendEntriesRequest) *pb.AppendEntriesRequest {
return &pb.AppendEntriesRequest{
RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term,
Leader: s.Leader,
PrevLogEntry: s.PrevLogEntry,
PrevLogTerm: s.PrevLogTerm,
Entries: encodeLogs(s.Entries),
LeaderCommitIndex: s.LeaderCommitIndex,
}
}
func encodeRPCHeader(s raft.RPCHeader) *pb.RPCHeader {
return &pb.RPCHeader{
ProtocolVersion: int64(s.ProtocolVersion),
Id: s.ID,
Addr: s.Addr,
}
}
func encodeLogs(s []*raft.Log) []*pb.Log {
ret := make([]*pb.Log, len(s))
for i, l := range s {
ret[i] = encodeLog(l)
}
return ret
}
func encodeLog(s *raft.Log) *pb.Log {
return &pb.Log{
Index: s.Index,
Term: s.Term,
Type: encodeLogType(s.Type),
Data: s.Data,
Extensions: s.Extensions,
AppendedAt: timestamppb.New(s.AppendedAt),
}
}
func encodeLogType(s raft.LogType) pb.Log_LogType {
switch s {
case raft.LogCommand:
return pb.Log_LOG_COMMAND
case raft.LogNoop:
return pb.Log_LOG_NOOP
case raft.LogAddPeerDeprecated:
return pb.Log_LOG_ADD_PEER_DEPRECATED
case raft.LogRemovePeerDeprecated:
return pb.Log_LOG_REMOVE_PEER_DEPRECATED
case raft.LogBarrier:
return pb.Log_LOG_BARRIER
case raft.LogConfiguration:
return pb.Log_LOG_CONFIGURATION
default:
panic("invalid LogType")
}
}
func encodeAppendEntriesResponse(s *raft.AppendEntriesResponse) *pb.AppendEntriesResponse {
return &pb.AppendEntriesResponse{
RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term,
LastLog: s.LastLog,
Success: s.Success,
NoRetryBackoff: s.NoRetryBackoff,
}
}
func encodeRequestVoteRequest(s *raft.RequestVoteRequest) *pb.RequestVoteRequest {
return &pb.RequestVoteRequest{
RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term,
Candidate: s.Candidate,
LastLogIndex: s.LastLogIndex,
LastLogTerm: s.LastLogTerm,
LeadershipTransfer: s.LeadershipTransfer,
}
}
func encodeRequestVoteResponse(s *raft.RequestVoteResponse) *pb.RequestVoteResponse {
return &pb.RequestVoteResponse{
RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term,
Peers: s.Peers,
Granted: s.Granted,
}
}
func encodeInstallSnapshotRequest(s *raft.InstallSnapshotRequest) *pb.InstallSnapshotRequest {
return &pb.InstallSnapshotRequest{
RpcHeader: encodeRPCHeader(s.RPCHeader),
SnapshotVersion: int64(s.SnapshotVersion),
Term: s.Term,
Leader: s.Leader,
LastLogIndex: s.LastLogIndex,
LastLogTerm: s.LastLogTerm,
Peers: s.Peers,
Configuration: s.Configuration,
ConfigurationIndex: s.ConfigurationIndex,
Size: s.Size,
}
}
func encodeInstallSnapshotResponse(s *raft.InstallSnapshotResponse) *pb.InstallSnapshotResponse {
return &pb.InstallSnapshotResponse{
RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term,
Success: s.Success,
}
}
func encodeTimeoutNowRequest(s *raft.TimeoutNowRequest) *pb.TimeoutNowRequest {
return &pb.TimeoutNowRequest{
RpcHeader: encodeRPCHeader(s.RPCHeader),
}
}
func encodeTimeoutNowResponse(s *raft.TimeoutNowResponse) *pb.TimeoutNowResponse {
return &pb.TimeoutNowResponse{
RpcHeader: encodeRPCHeader(s.RPCHeader),
}
}
func encodeRequestPreVoteRequest(s *raft.RequestPreVoteRequest) *pb.RequestPreVoteRequest {
return &pb.RequestPreVoteRequest{
RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term,
LastLogIndex: s.LastLogIndex,
LastLogTerm: s.LastLogTerm,
}
}
func encodeRequestPreVoteResponse(s *raft.RequestPreVoteResponse) *pb.RequestPreVoteResponse {
return &pb.RequestPreVoteResponse{
RpcHeader: encodeRPCHeader(s.RPCHeader),
Term: s.Term,
Granted: s.Granted,
}
}

View File

@ -1,97 +0,0 @@
// Package transport provides a Transport for github.com/hashicorp/raft over gRPC.
package transport
import (
"sync"
"time"
pb "github.com/Jille/raft-grpc-transport/proto"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/raft"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
var (
errCloseErr = errors.New("error closing connections")
)
type Manager struct {
localAddress raft.ServerAddress
dialOptions []grpc.DialOption
rpcChan chan raft.RPC
heartbeatFunc func(raft.RPC)
heartbeatFuncMtx sync.Mutex
heartbeatTimeout time.Duration
connectionsMtx sync.Mutex
connections map[raft.ServerAddress]*conn
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
}
// New creates both components of raft-grpc-transport: a gRPC service and a Raft Transport.
func New(localAddress raft.ServerAddress, dialOptions []grpc.DialOption, options ...Option) *Manager {
m := &Manager{
localAddress: localAddress,
dialOptions: dialOptions,
rpcChan: make(chan raft.RPC),
connections: map[raft.ServerAddress]*conn{},
shutdownCh: make(chan struct{}),
}
for _, opt := range options {
opt(m)
}
return m
}
// Register the RaftTransport gRPC service on a gRPC server.
func (m *Manager) Register(s grpc.ServiceRegistrar) {
pb.RegisterRaftTransportServer(s, gRPCAPI{manager: m})
}
// Transport returns a raft.Transport that communicates over gRPC.
func (m *Manager) Transport() raft.Transport {
return raftAPI{m}
}
func (m *Manager) Close() error {
m.shutdownLock.Lock()
defer m.shutdownLock.Unlock()
if m.shutdown {
return nil
}
close(m.shutdownCh)
m.shutdown = true
return m.disconnectAll()
}
func (m *Manager) disconnectAll() error {
m.connectionsMtx.Lock()
defer m.connectionsMtx.Unlock()
err := errCloseErr
for k, conn := range m.connections {
// Lock conn.mtx to ensure Dial() is complete
conn.mtx.Lock()
conn.mtx.Unlock()
closeErr := conn.clientConn.Close()
if closeErr != nil {
err = multierror.Append(err, closeErr)
}
delete(m.connections, k)
}
if err != errCloseErr {
return err
}
return nil
}

2
vendor/modules.txt vendored
View File

@ -1,7 +1,5 @@
# github.com/Jille/raft-grpc-transport v1.6.1
## explicit; go 1.13
github.com/Jille/raft-grpc-transport
github.com/Jille/raft-grpc-transport/proto
# github.com/armon/go-metrics v0.4.1
## explicit; go 1.12
github.com/armon/go-metrics