1283 lines
44 KiB
Go
1283 lines
44 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package raft
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
hclog "github.com/hashicorp/go-hclog"
|
|
metrics "github.com/hashicorp/go-metrics/compat"
|
|
)
|
|
|
|
const (
|
|
// SuggestedMaxDataSize of the data in a raft log entry, in bytes.
|
|
//
|
|
// The value is based on current architecture, default timing, etc. Clients can
|
|
// ignore this value if they want as there is no actual hard checking
|
|
// within the library. As the library is enhanced this value may change
|
|
// over time to reflect current suggested maximums.
|
|
//
|
|
// Applying log entries with data greater than this size risks RPC IO taking
|
|
// too long and preventing timely heartbeat signals. These signals are sent in serial
|
|
// in current transports, potentially causing leadership instability.
|
|
SuggestedMaxDataSize = 512 * 1024
|
|
)
|
|
|
|
var (
|
|
// ErrLeader is returned when an operation can't be completed on a
|
|
// leader node.
|
|
ErrLeader = errors.New("node is the leader")
|
|
|
|
// ErrNotLeader is returned when an operation can't be completed on a
|
|
// follower or candidate node.
|
|
ErrNotLeader = errors.New("node is not the leader")
|
|
|
|
// ErrNotVoter is returned when an operation can't be completed on a
|
|
// non-voter node.
|
|
ErrNotVoter = errors.New("node is not a voter")
|
|
|
|
// ErrLeadershipLost is returned when a leader fails to commit a log entry
|
|
// because it's been deposed in the process.
|
|
ErrLeadershipLost = errors.New("leadership lost while committing log")
|
|
|
|
// ErrAbortedByRestore is returned when a leader fails to commit a log
|
|
// entry because it's been superseded by a user snapshot restore.
|
|
ErrAbortedByRestore = errors.New("snapshot restored while committing log")
|
|
|
|
// ErrRaftShutdown is returned when operations are requested against an
|
|
// inactive Raft.
|
|
ErrRaftShutdown = errors.New("raft is already shutdown")
|
|
|
|
// ErrEnqueueTimeout is returned when a command fails due to a timeout.
|
|
ErrEnqueueTimeout = errors.New("timed out enqueuing operation")
|
|
|
|
// ErrNothingNewToSnapshot is returned when trying to create a snapshot
|
|
// but there's nothing new committed to the FSM since we started.
|
|
ErrNothingNewToSnapshot = errors.New("nothing new to snapshot")
|
|
|
|
// ErrUnsupportedProtocol is returned when an operation is attempted
|
|
// that's not supported by the current protocol version.
|
|
ErrUnsupportedProtocol = errors.New("operation not supported with current protocol version")
|
|
|
|
// ErrCantBootstrap is returned when attempt is made to bootstrap a
|
|
// cluster that already has state present.
|
|
ErrCantBootstrap = errors.New("bootstrap only works on new clusters")
|
|
|
|
// ErrLeadershipTransferInProgress is returned when the leader is rejecting
|
|
// client requests because it is attempting to transfer leadership.
|
|
ErrLeadershipTransferInProgress = errors.New("leadership transfer in progress")
|
|
)
|
|
|
|
// Raft implements a Raft node.
|
|
type Raft struct {
|
|
raftState
|
|
|
|
// protocolVersion is used to inter-operate with Raft servers running
|
|
// different versions of the library. See comments in config.go for more
|
|
// details.
|
|
protocolVersion ProtocolVersion
|
|
|
|
// applyCh is used to async send logs to the main thread to
|
|
// be committed and applied to the FSM.
|
|
applyCh chan *logFuture
|
|
|
|
// conf stores the current configuration to use. This is the most recent one
|
|
// provided. All reads of config values should use the config() helper method
|
|
// to read this safely.
|
|
conf atomic.Value
|
|
|
|
// confReloadMu ensures that only one thread can reload config at once since
|
|
// we need to read-modify-write the atomic. It is NOT necessary to hold this
|
|
// for any other operation e.g. reading config using config().
|
|
confReloadMu sync.Mutex
|
|
|
|
// FSM is the client state machine to apply commands to
|
|
fsm FSM
|
|
|
|
// fsmMutateCh is used to send state-changing updates to the FSM. This
|
|
// receives pointers to commitTuple structures when applying logs or
|
|
// pointers to restoreFuture structures when restoring a snapshot. We
|
|
// need control over the order of these operations when doing user
|
|
// restores so that we finish applying any old log applies before we
|
|
// take a user snapshot on the leader, otherwise we might restore the
|
|
// snapshot and apply old logs to it that were in the pipe.
|
|
fsmMutateCh chan interface{}
|
|
|
|
// fsmSnapshotCh is used to trigger a new snapshot being taken
|
|
fsmSnapshotCh chan *reqSnapshotFuture
|
|
|
|
// lastContact is the last time we had contact from the
|
|
// leader node. This can be used to gauge staleness.
|
|
lastContact time.Time
|
|
lastContactLock sync.RWMutex
|
|
|
|
// leaderAddr is the current cluster leader Address
|
|
leaderAddr ServerAddress
|
|
// LeaderID is the current cluster leader ID
|
|
leaderID ServerID
|
|
leaderLock sync.RWMutex
|
|
|
|
// leaderCh is used to notify of leadership changes
|
|
leaderCh chan bool
|
|
|
|
// leaderState used only while state is leader
|
|
leaderState leaderState
|
|
|
|
// candidateFromLeadershipTransfer is used to indicate that this server became
|
|
// candidate because the leader tries to transfer leadership. This flag is
|
|
// used in RequestVoteRequest to express that a leadership transfer is going
|
|
// on.
|
|
candidateFromLeadershipTransfer atomic.Bool
|
|
|
|
// Stores our local server ID, used to avoid sending RPCs to ourself
|
|
localID ServerID
|
|
|
|
// Stores our local addr
|
|
localAddr ServerAddress
|
|
|
|
// Used for our logging
|
|
logger hclog.Logger
|
|
|
|
// LogStore provides durable storage for logs
|
|
logs LogStore
|
|
|
|
// Used to request the leader to make configuration changes.
|
|
configurationChangeCh chan *configurationChangeFuture
|
|
|
|
// Tracks the latest configuration and latest committed configuration from
|
|
// the log/snapshot.
|
|
configurations configurations
|
|
|
|
// Holds a copy of the latest configuration which can be read independently
|
|
// of the main loop.
|
|
latestConfiguration atomic.Value
|
|
|
|
// RPC chan comes from the transport layer
|
|
rpcCh <-chan RPC
|
|
|
|
// Shutdown channel to exit, protected to prevent concurrent exits
|
|
shutdown bool
|
|
shutdownCh chan struct{}
|
|
shutdownLock sync.Mutex
|
|
|
|
// snapshots is used to store and retrieve snapshots
|
|
snapshots SnapshotStore
|
|
|
|
// userSnapshotCh is used for user-triggered snapshots
|
|
userSnapshotCh chan *userSnapshotFuture
|
|
|
|
// userRestoreCh is used for user-triggered restores of external
|
|
// snapshots
|
|
userRestoreCh chan *userRestoreFuture
|
|
|
|
// stable is a StableStore implementation for durable state
|
|
// It provides stable storage for many fields in raftState
|
|
stable StableStore
|
|
|
|
// The transport layer we use
|
|
trans Transport
|
|
|
|
// verifyCh is used to async send verify futures to the main thread
|
|
// to verify we are still the leader
|
|
verifyCh chan *verifyFuture
|
|
|
|
// configurationsCh is used to get the configuration data safely from
|
|
// outside of the main thread.
|
|
configurationsCh chan *configurationsFuture
|
|
|
|
// bootstrapCh is used to attempt an initial bootstrap from outside of
|
|
// the main thread.
|
|
bootstrapCh chan *bootstrapFuture
|
|
|
|
// List of observers and the mutex that protects them. The observers list
|
|
// is indexed by an artificial ID which is used for deregistration.
|
|
observersLock sync.RWMutex
|
|
observers map[uint64]*Observer
|
|
|
|
// leadershipTransferCh is used to start a leadership transfer from outside of
|
|
// the main thread.
|
|
leadershipTransferCh chan *leadershipTransferFuture
|
|
|
|
// leaderNotifyCh is used to tell leader that config has changed
|
|
leaderNotifyCh chan struct{}
|
|
|
|
// followerNotifyCh is used to tell followers that config has changed
|
|
followerNotifyCh chan struct{}
|
|
|
|
// mainThreadSaturation measures the saturation of the main raft goroutine.
|
|
mainThreadSaturation *saturationMetric
|
|
|
|
// preVoteDisabled control if the pre-vote feature is activated,
|
|
// prevote feature is disabled if set to true.
|
|
preVoteDisabled bool
|
|
|
|
// noLegacyTelemetry allows to skip the legacy metrics to avoid duplicates.
|
|
// legacy metrics are those that have `_peer_name` as metric suffix instead as labels.
|
|
// e.g: raft_replication_heartbeat_peer0
|
|
noLegacyTelemetry bool
|
|
}
|
|
|
|
// BootstrapCluster initializes a server's storage with the given cluster
|
|
// configuration. This should only be called at the beginning of time for the
|
|
// cluster with an identical configuration listing all Voter servers. There is
|
|
// no need to bootstrap Nonvoter and Staging servers.
|
|
//
|
|
// A cluster can only be bootstrapped once from a single participating Voter
|
|
// server. Any further attempts to bootstrap will return an error that can be
|
|
// safely ignored.
|
|
//
|
|
// One approach is to bootstrap a single server with a configuration
|
|
// listing just itself as a Voter, then invoke AddVoter() on it to add other
|
|
// servers to the cluster.
|
|
func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
|
|
snaps SnapshotStore, trans Transport, configuration Configuration,
|
|
) error {
|
|
// Validate the Raft server config.
|
|
if err := ValidateConfig(conf); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Sanity check the Raft peer configuration.
|
|
if err := checkConfiguration(configuration); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Make sure the cluster is in a clean state.
|
|
hasState, err := HasExistingState(logs, stable, snaps)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check for existing state: %v", err)
|
|
}
|
|
if hasState {
|
|
return ErrCantBootstrap
|
|
}
|
|
|
|
// Set current term to 1.
|
|
if err := stable.SetUint64(keyCurrentTerm, 1); err != nil {
|
|
return fmt.Errorf("failed to save current term: %v", err)
|
|
}
|
|
|
|
// Append configuration entry to log.
|
|
entry := &Log{
|
|
Index: 1,
|
|
Term: 1,
|
|
}
|
|
if conf.ProtocolVersion < 3 {
|
|
entry.Type = LogRemovePeerDeprecated
|
|
entry.Data = encodePeers(configuration, trans)
|
|
} else {
|
|
entry.Type = LogConfiguration
|
|
entry.Data = EncodeConfiguration(configuration)
|
|
}
|
|
if err := logs.StoreLog(entry); err != nil {
|
|
return fmt.Errorf("failed to append configuration entry to log: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RecoverCluster is used to manually force a new configuration in order to
|
|
// recover from a loss of quorum where the current configuration cannot be
|
|
// restored, such as when several servers die at the same time. This works by
|
|
// reading all the current state for this server, creating a snapshot with the
|
|
// supplied configuration, and then truncating the Raft log. This is the only
|
|
// safe way to force a given configuration without actually altering the log to
|
|
// insert any new entries, which could cause conflicts with other servers with
|
|
// different state.
|
|
//
|
|
// WARNING! This operation implicitly commits all entries in the Raft log, so
|
|
// in general this is an extremely unsafe operation. If you've lost your other
|
|
// servers and are performing a manual recovery, then you've also lost the
|
|
// commit information, so this is likely the best you can do, but you should be
|
|
// aware that calling this can cause Raft log entries that were in the process
|
|
// of being replicated but not yet be committed to be committed.
|
|
//
|
|
// Note the FSM passed here is used for the snapshot operations and will be
|
|
// left in a state that should not be used by the application. Be sure to
|
|
// discard this FSM and any associated state and provide a fresh one when
|
|
// calling NewRaft later.
|
|
//
|
|
// A typical way to recover the cluster is to shut down all servers and then
|
|
// run RecoverCluster on every server using an identical configuration. When
|
|
// the cluster is then restarted, and election should occur and then Raft will
|
|
// resume normal operation. If it's desired to make a particular server the
|
|
// leader, this can be used to inject a new configuration with that server as
|
|
// the sole voter, and then join up other new clean-state peer servers using
|
|
// the usual APIs in order to bring the cluster back into a known state.
|
|
func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
|
|
snaps SnapshotStore, trans Transport, configuration Configuration,
|
|
) error {
|
|
// Validate the Raft server config.
|
|
if err := ValidateConfig(conf); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Sanity check the Raft peer configuration.
|
|
if err := checkConfiguration(configuration); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Refuse to recover if there's no existing state. This would be safe to
|
|
// do, but it is likely an indication of an operator error where they
|
|
// expect data to be there and it's not. By refusing, we force them
|
|
// to show intent to start a cluster fresh by explicitly doing a
|
|
// bootstrap, rather than quietly fire up a fresh cluster here.
|
|
if hasState, err := HasExistingState(logs, stable, snaps); err != nil {
|
|
return fmt.Errorf("failed to check for existing state: %v", err)
|
|
} else if !hasState {
|
|
return fmt.Errorf("refused to recover cluster with no initial state, this is probably an operator error")
|
|
}
|
|
|
|
// Attempt to restore any snapshots we find, newest to oldest.
|
|
var (
|
|
snapshotIndex uint64
|
|
snapshotTerm uint64
|
|
snapshots, err = snaps.List()
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list snapshots: %v", err)
|
|
}
|
|
|
|
logger := conf.getOrCreateLogger()
|
|
|
|
for _, snapshot := range snapshots {
|
|
var source io.ReadCloser
|
|
_, source, err = snaps.Open(snapshot.ID)
|
|
if err != nil {
|
|
// Skip this one and try the next. We will detect if we
|
|
// couldn't open any snapshots.
|
|
continue
|
|
}
|
|
|
|
// Note this is the one place we call fsm.Restore without the
|
|
// fsmRestoreAndMeasure wrapper since this function should only be called to
|
|
// reset state on disk and the FSM passed will not be used for a running
|
|
// server instance. If the same process will eventually become a Raft peer
|
|
// then it will call NewRaft and restore again from disk then which will
|
|
// report metrics.
|
|
snapLogger := logger.With(
|
|
"id", snapshot.ID,
|
|
"last-index", snapshot.Index,
|
|
"last-term", snapshot.Term,
|
|
"size-in-bytes", snapshot.Size,
|
|
)
|
|
crc := newCountingReadCloser(source)
|
|
monitor := startSnapshotRestoreMonitor(snapLogger, crc, snapshot.Size, false)
|
|
err = fsm.Restore(crc)
|
|
// Close the source after the restore has completed
|
|
source.Close()
|
|
monitor.StopAndWait()
|
|
if err != nil {
|
|
// Same here, skip and try the next one.
|
|
continue
|
|
}
|
|
|
|
snapshotIndex = snapshot.Index
|
|
snapshotTerm = snapshot.Term
|
|
break
|
|
}
|
|
if len(snapshots) > 0 && (snapshotIndex == 0 || snapshotTerm == 0) {
|
|
return fmt.Errorf("failed to restore any of the available snapshots")
|
|
}
|
|
|
|
// The snapshot information is the best known end point for the data
|
|
// until we play back the Raft log entries.
|
|
lastIndex := snapshotIndex
|
|
lastTerm := snapshotTerm
|
|
|
|
// Apply any Raft log entries past the snapshot.
|
|
lastLogIndex, err := logs.LastIndex()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find last log: %v", err)
|
|
}
|
|
for index := snapshotIndex + 1; index <= lastLogIndex; index++ {
|
|
var entry Log
|
|
if err = logs.GetLog(index, &entry); err != nil {
|
|
return fmt.Errorf("failed to get log at index %d: %v", index, err)
|
|
}
|
|
if entry.Type == LogCommand {
|
|
_ = fsm.Apply(&entry)
|
|
}
|
|
lastIndex = entry.Index
|
|
lastTerm = entry.Term
|
|
}
|
|
|
|
// Create a new snapshot, placing the configuration in as if it was
|
|
// committed at index 1.
|
|
snapshot, err := fsm.Snapshot()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to snapshot FSM: %v", err)
|
|
}
|
|
version := getSnapshotVersion(conf.ProtocolVersion)
|
|
sink, err := snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create snapshot: %v", err)
|
|
}
|
|
if err = snapshot.Persist(sink); err != nil {
|
|
return fmt.Errorf("failed to persist snapshot: %v", err)
|
|
}
|
|
if err = sink.Close(); err != nil {
|
|
return fmt.Errorf("failed to finalize snapshot: %v", err)
|
|
}
|
|
|
|
// Compact the log so that we don't get bad interference from any
|
|
// configuration change log entries that might be there.
|
|
firstLogIndex, err := logs.FirstIndex()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get first log index: %v", err)
|
|
}
|
|
if err := logs.DeleteRange(firstLogIndex, lastLogIndex); err != nil {
|
|
return fmt.Errorf("log compaction failed: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetConfiguration returns the persisted configuration of the Raft cluster
|
|
// without starting a Raft instance or connecting to the cluster. This function
|
|
// has identical behavior to Raft.GetConfiguration.
|
|
func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore,
|
|
snaps SnapshotStore, trans Transport,
|
|
) (Configuration, error) {
|
|
conf.skipStartup = true
|
|
r, err := NewRaft(conf, fsm, logs, stable, snaps, trans)
|
|
if err != nil {
|
|
return Configuration{}, err
|
|
}
|
|
future := r.GetConfiguration()
|
|
if err = future.Error(); err != nil {
|
|
return Configuration{}, err
|
|
}
|
|
return future.Configuration(), nil
|
|
}
|
|
|
|
// HasExistingState returns true if the server has any existing state (logs,
|
|
// knowledge of a current term, or any snapshots).
|
|
func HasExistingState(logs LogStore, stable StableStore, snaps SnapshotStore) (bool, error) {
|
|
// Make sure we don't have a current term.
|
|
currentTerm, err := stable.GetUint64(keyCurrentTerm)
|
|
if err == nil {
|
|
if currentTerm > 0 {
|
|
return true, nil
|
|
}
|
|
} else {
|
|
if err.Error() != "not found" {
|
|
return false, fmt.Errorf("failed to read current term: %v", err)
|
|
}
|
|
}
|
|
|
|
// Make sure we have an empty log.
|
|
lastIndex, err := logs.LastIndex()
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to get last log index: %v", err)
|
|
}
|
|
if lastIndex > 0 {
|
|
return true, nil
|
|
}
|
|
|
|
// Make sure we have no snapshots
|
|
snapshots, err := snaps.List()
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to list snapshots: %v", err)
|
|
}
|
|
if len(snapshots) > 0 {
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// NewRaft is used to construct a new Raft node. It takes a configuration, as well
|
|
// as implementations of various interfaces that are required. If we have any
|
|
// old state, such as snapshots, logs, peers, etc, all those will be restored
|
|
// when creating the Raft node.
|
|
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
|
|
// Validate the configuration.
|
|
if err := ValidateConfig(conf); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Ensure we have a LogOutput.
|
|
logger := conf.getOrCreateLogger()
|
|
|
|
// Try to restore the current term.
|
|
currentTerm, err := stable.GetUint64(keyCurrentTerm)
|
|
if err != nil && err.Error() != "not found" {
|
|
return nil, fmt.Errorf("failed to load current term: %v", err)
|
|
}
|
|
|
|
// Read the index of the last log entry.
|
|
lastIndex, err := logs.LastIndex()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to find last log: %v", err)
|
|
}
|
|
|
|
// Get the last log entry.
|
|
var lastLog Log
|
|
if lastIndex > 0 {
|
|
if err = logs.GetLog(lastIndex, &lastLog); err != nil {
|
|
return nil, fmt.Errorf("failed to get last log at index %d: %v", lastIndex, err)
|
|
}
|
|
}
|
|
|
|
// Make sure we have a valid server address and ID.
|
|
protocolVersion := conf.ProtocolVersion
|
|
localAddr := trans.LocalAddr()
|
|
localID := conf.LocalID
|
|
|
|
// TODO (slackpad) - When we deprecate protocol version 2, remove this
|
|
// along with the AddPeer() and RemovePeer() APIs.
|
|
if protocolVersion < 3 && string(localID) != string(localAddr) {
|
|
return nil, fmt.Errorf("when running with ProtocolVersion < 3, LocalID must be set to the network address")
|
|
}
|
|
|
|
// Buffer applyCh to MaxAppendEntries if the option is enabled
|
|
applyCh := make(chan *logFuture)
|
|
if conf.BatchApplyCh {
|
|
applyCh = make(chan *logFuture, conf.MaxAppendEntries)
|
|
}
|
|
|
|
_, transportSupportPreVote := trans.(WithPreVote)
|
|
// Create Raft struct.
|
|
r := &Raft{
|
|
protocolVersion: protocolVersion,
|
|
applyCh: applyCh,
|
|
fsm: fsm,
|
|
fsmMutateCh: make(chan interface{}, 128),
|
|
fsmSnapshotCh: make(chan *reqSnapshotFuture),
|
|
leaderCh: make(chan bool, 1),
|
|
localID: localID,
|
|
localAddr: localAddr,
|
|
logger: logger,
|
|
logs: logs,
|
|
configurationChangeCh: make(chan *configurationChangeFuture),
|
|
configurations: configurations{},
|
|
rpcCh: trans.Consumer(),
|
|
snapshots: snaps,
|
|
userSnapshotCh: make(chan *userSnapshotFuture),
|
|
userRestoreCh: make(chan *userRestoreFuture),
|
|
shutdownCh: make(chan struct{}),
|
|
stable: stable,
|
|
trans: trans,
|
|
verifyCh: make(chan *verifyFuture, 64),
|
|
configurationsCh: make(chan *configurationsFuture, 8),
|
|
bootstrapCh: make(chan *bootstrapFuture),
|
|
observers: make(map[uint64]*Observer),
|
|
leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
|
|
leaderNotifyCh: make(chan struct{}, 1),
|
|
followerNotifyCh: make(chan struct{}, 1),
|
|
mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second),
|
|
preVoteDisabled: conf.PreVoteDisabled || !transportSupportPreVote,
|
|
noLegacyTelemetry: conf.NoLegacyTelemetry,
|
|
}
|
|
if !transportSupportPreVote && !conf.PreVoteDisabled {
|
|
r.logger.Warn("pre-vote is disabled because it is not supported by the Transport")
|
|
}
|
|
|
|
r.conf.Store(*conf)
|
|
|
|
// Initialize as a follower.
|
|
r.setState(Follower)
|
|
|
|
// Restore the current term and the last log.
|
|
r.setCurrentTerm(currentTerm)
|
|
r.setLastLog(lastLog.Index, lastLog.Term)
|
|
|
|
// Attempt to restore a snapshot if there are any.
|
|
if err := r.restoreSnapshot(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Scan through the log for any configuration change entries.
|
|
snapshotIndex, _ := r.getLastSnapshot()
|
|
for index := snapshotIndex + 1; index <= lastLog.Index; index++ {
|
|
var entry Log
|
|
if err := r.logs.GetLog(index, &entry); err != nil {
|
|
r.logger.Error("failed to get log", "index", index, "error", err)
|
|
panic(err)
|
|
}
|
|
if err := r.processConfigurationLogEntry(&entry); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
r.logger.Info("initial configuration",
|
|
"index", r.configurations.latestIndex,
|
|
"servers", hclog.Fmt("%+v", r.configurations.latest.Servers))
|
|
|
|
// Setup a heartbeat fast-path to avoid head-of-line
|
|
// blocking where possible. It MUST be safe for this
|
|
// to be called concurrently with a blocking RPC.
|
|
trans.SetHeartbeatHandler(r.processHeartbeat)
|
|
|
|
if conf.skipStartup {
|
|
return r, nil
|
|
}
|
|
// Start the background work.
|
|
r.goFunc(r.run)
|
|
r.goFunc(r.runFSM)
|
|
r.goFunc(r.runSnapshots)
|
|
return r, nil
|
|
}
|
|
|
|
// restoreSnapshot attempts to restore the latest snapshots, and fails if none
|
|
// of them can be restored. This is called at initialization time, and is
|
|
// completely unsafe to call at any other time.
|
|
func (r *Raft) restoreSnapshot() error {
|
|
snapshots, err := r.snapshots.List()
|
|
if err != nil {
|
|
r.logger.Error("failed to list snapshots", "error", err)
|
|
return err
|
|
}
|
|
|
|
// Try to load in order of newest to oldest
|
|
for _, snapshot := range snapshots {
|
|
if success := r.tryRestoreSingleSnapshot(snapshot); !success {
|
|
continue
|
|
}
|
|
|
|
// Update the lastApplied so we don't replay old logs
|
|
r.setLastApplied(snapshot.Index)
|
|
|
|
// Update the last stable snapshot info
|
|
r.setLastSnapshot(snapshot.Index, snapshot.Term)
|
|
|
|
// Update the configuration
|
|
var conf Configuration
|
|
var index uint64
|
|
if snapshot.Version > 0 {
|
|
conf = snapshot.Configuration
|
|
index = snapshot.ConfigurationIndex
|
|
} else {
|
|
var err error
|
|
if conf, err = decodePeers(snapshot.Peers, r.trans); err != nil {
|
|
return err
|
|
}
|
|
index = snapshot.Index
|
|
}
|
|
r.setCommittedConfiguration(conf, index)
|
|
r.setLatestConfiguration(conf, index)
|
|
|
|
// Success!
|
|
return nil
|
|
}
|
|
|
|
// If we had snapshots and failed to load them, its an error
|
|
if len(snapshots) > 0 {
|
|
return fmt.Errorf("failed to load any existing snapshots")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool {
|
|
if r.config().NoSnapshotRestoreOnStart {
|
|
return true
|
|
}
|
|
|
|
snapLogger := r.logger.With(
|
|
"id", snapshot.ID,
|
|
"last-index", snapshot.Index,
|
|
"last-term", snapshot.Term,
|
|
"size-in-bytes", snapshot.Size,
|
|
)
|
|
|
|
snapLogger.Info("starting restore from snapshot")
|
|
|
|
_, source, err := r.snapshots.Open(snapshot.ID)
|
|
if err != nil {
|
|
snapLogger.Error("failed to open snapshot", "error", err)
|
|
return false
|
|
}
|
|
|
|
if err := fsmRestoreAndMeasure(snapLogger, r.fsm, source, snapshot.Size); err != nil {
|
|
source.Close()
|
|
snapLogger.Error("failed to restore snapshot", "error", err)
|
|
return false
|
|
}
|
|
source.Close()
|
|
|
|
snapLogger.Info("restored from snapshot")
|
|
|
|
return true
|
|
}
|
|
|
|
func (r *Raft) config() Config {
|
|
return r.conf.Load().(Config)
|
|
}
|
|
|
|
// ReloadConfig updates the configuration of a running raft node. If the new
|
|
// configuration is invalid an error is returned and no changes made to the
|
|
// instance. All fields will be copied from rc into the new configuration, even
|
|
// if they are zero valued.
|
|
func (r *Raft) ReloadConfig(rc ReloadableConfig) error {
|
|
r.confReloadMu.Lock()
|
|
defer r.confReloadMu.Unlock()
|
|
|
|
// Load the current config (note we are under a lock so it can't be changed
|
|
// between this read and a later Store).
|
|
oldCfg := r.config()
|
|
|
|
// Set the reloadable fields
|
|
newCfg := rc.apply(oldCfg)
|
|
|
|
if err := ValidateConfig(&newCfg); err != nil {
|
|
return err
|
|
}
|
|
r.conf.Store(newCfg)
|
|
|
|
if rc.HeartbeatTimeout < oldCfg.HeartbeatTimeout {
|
|
// On leader, ensure replication loops running with a longer
|
|
// timeout than what we want now discover the change.
|
|
asyncNotifyCh(r.leaderNotifyCh)
|
|
// On follower, update current timer to use the shorter new value.
|
|
asyncNotifyCh(r.followerNotifyCh)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReloadableConfig returns the current state of the reloadable fields in Raft's
|
|
// configuration. This is useful for programs to discover the current state for
|
|
// reporting to users or tests. It is safe to call from any goroutine. It is
|
|
// intended for reporting and testing purposes primarily; external
|
|
// synchronization would be required to safely use this in a read-modify-write
|
|
// pattern for reloadable configuration options.
|
|
func (r *Raft) ReloadableConfig() ReloadableConfig {
|
|
cfg := r.config()
|
|
var rc ReloadableConfig
|
|
rc.fromConfig(cfg)
|
|
return rc
|
|
}
|
|
|
|
// BootstrapCluster is equivalent to non-member BootstrapCluster but can be
|
|
// called on an un-bootstrapped Raft instance after it has been created. This
|
|
// should only be called at the beginning of time for the cluster with an
|
|
// identical configuration listing all Voter servers. There is no need to
|
|
// bootstrap Nonvoter and Staging servers.
|
|
//
|
|
// A cluster can only be bootstrapped once from a single participating Voter
|
|
// server. Any further attempts to bootstrap will return an error that can be
|
|
// safely ignored.
|
|
//
|
|
// One sane approach is to bootstrap a single server with a configuration
|
|
// listing just itself as a Voter, then invoke AddVoter() on it to add other
|
|
// servers to the cluster.
|
|
func (r *Raft) BootstrapCluster(configuration Configuration) Future {
|
|
bootstrapReq := &bootstrapFuture{}
|
|
bootstrapReq.init()
|
|
bootstrapReq.configuration = configuration
|
|
select {
|
|
case <-r.shutdownCh:
|
|
return errorFuture{ErrRaftShutdown}
|
|
case r.bootstrapCh <- bootstrapReq:
|
|
return bootstrapReq
|
|
}
|
|
}
|
|
|
|
// Leader is used to return the current leader of the cluster.
|
|
// Deprecated: use LeaderWithID instead
|
|
// It may return empty string if there is no current leader
|
|
// or the leader is unknown.
|
|
// Deprecated: use LeaderWithID instead.
|
|
func (r *Raft) Leader() ServerAddress {
|
|
r.leaderLock.RLock()
|
|
leaderAddr := r.leaderAddr
|
|
r.leaderLock.RUnlock()
|
|
return leaderAddr
|
|
}
|
|
|
|
// LeaderWithID is used to return the current leader address and ID of the cluster.
|
|
// It may return empty strings if there is no current leader
|
|
// or the leader is unknown.
|
|
func (r *Raft) LeaderWithID() (ServerAddress, ServerID) {
|
|
r.leaderLock.RLock()
|
|
leaderAddr := r.leaderAddr
|
|
leaderID := r.leaderID
|
|
r.leaderLock.RUnlock()
|
|
return leaderAddr, leaderID
|
|
}
|
|
|
|
// Apply is used to apply a command to the FSM in a highly consistent
|
|
// manner. This returns a future that can be used to wait on the application.
|
|
// An optional timeout can be provided to limit the amount of time we wait
|
|
// for the command to be started. This must be run on the leader or it
|
|
// will fail.
|
|
//
|
|
// If the node discovers it is no longer the leader while applying the command,
|
|
// it will return ErrLeadershipLost. There is no way to guarantee whether the
|
|
// write succeeded or failed in this case. For example, if the leader is
|
|
// partitioned it can't know if a quorum of followers wrote the log to disk. If
|
|
// at least one did, it may survive into the next leader's term.
|
|
//
|
|
// If a user snapshot is restored while the command is in-flight, an
|
|
// ErrAbortedByRestore is returned. In this case the write effectively failed
|
|
// since its effects will not be present in the FSM after the restore.
|
|
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
|
|
return r.ApplyLog(Log{Data: cmd}, timeout)
|
|
}
|
|
|
|
// ApplyLog performs Apply but takes in a Log directly. The only values
|
|
// currently taken from the submitted Log are Data and Extensions. See
|
|
// Apply for details on error cases.
|
|
func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
|
|
metrics.IncrCounter([]string{"raft", "apply"}, 1)
|
|
|
|
var timer <-chan time.Time
|
|
if timeout > 0 {
|
|
timer = time.After(timeout)
|
|
}
|
|
|
|
// Create a log future, no index or term yet
|
|
logFuture := &logFuture{
|
|
log: Log{
|
|
Type: LogCommand,
|
|
Data: log.Data,
|
|
Extensions: log.Extensions,
|
|
},
|
|
}
|
|
logFuture.init()
|
|
|
|
select {
|
|
case <-timer:
|
|
return errorFuture{ErrEnqueueTimeout}
|
|
case <-r.shutdownCh:
|
|
return errorFuture{ErrRaftShutdown}
|
|
case r.applyCh <- logFuture:
|
|
return logFuture
|
|
}
|
|
}
|
|
|
|
// Barrier is used to issue a command that blocks until all preceding
|
|
// operations have been applied to the FSM. It can be used to ensure the
|
|
// FSM reflects all queued writes. An optional timeout can be provided to
|
|
// limit the amount of time we wait for the command to be started. This
|
|
// must be run on the leader, or it will fail.
|
|
func (r *Raft) Barrier(timeout time.Duration) Future {
|
|
metrics.IncrCounter([]string{"raft", "barrier"}, 1)
|
|
var timer <-chan time.Time
|
|
if timeout > 0 {
|
|
timer = time.After(timeout)
|
|
}
|
|
|
|
// Create a log future, no index or term yet
|
|
logFuture := &logFuture{log: Log{Type: LogBarrier}}
|
|
logFuture.init()
|
|
|
|
select {
|
|
case <-timer:
|
|
return errorFuture{ErrEnqueueTimeout}
|
|
case <-r.shutdownCh:
|
|
return errorFuture{ErrRaftShutdown}
|
|
case r.applyCh <- logFuture:
|
|
return logFuture
|
|
}
|
|
}
|
|
|
|
// VerifyLeader is used to ensure this peer is still the leader. It may be used
|
|
// to prevent returning stale data from the FSM after the peer has lost
|
|
// leadership.
|
|
func (r *Raft) VerifyLeader() Future {
|
|
metrics.IncrCounter([]string{"raft", "verify_leader"}, 1)
|
|
verifyFuture := &verifyFuture{}
|
|
verifyFuture.init()
|
|
select {
|
|
case <-r.shutdownCh:
|
|
return errorFuture{ErrRaftShutdown}
|
|
case r.verifyCh <- verifyFuture:
|
|
return verifyFuture
|
|
}
|
|
}
|
|
|
|
// GetConfiguration returns the latest configuration. This may not yet be
|
|
// committed. The main loop can access this directly.
|
|
func (r *Raft) GetConfiguration() ConfigurationFuture {
|
|
configReq := &configurationsFuture{}
|
|
configReq.init()
|
|
configReq.configurations = configurations{latest: r.getLatestConfiguration()}
|
|
configReq.respond(nil)
|
|
return configReq
|
|
}
|
|
|
|
// AddPeer to the cluster configuration. Must be run on the leader, or it will fail.
|
|
//
|
|
// Deprecated: Use AddVoter/AddNonvoter instead.
|
|
func (r *Raft) AddPeer(peer ServerAddress) Future {
|
|
if r.protocolVersion > 2 {
|
|
return errorFuture{ErrUnsupportedProtocol}
|
|
}
|
|
|
|
return r.requestConfigChange(configurationChangeRequest{
|
|
command: AddVoter,
|
|
serverID: ServerID(peer),
|
|
serverAddress: peer,
|
|
prevIndex: 0,
|
|
}, 0)
|
|
}
|
|
|
|
// RemovePeer from the cluster configuration. If the current leader is being
|
|
// removed, it will cause a new election to occur. Must be run on the leader,
|
|
// or it will fail.
|
|
|
|
// Deprecated: Use RemoveServer instead.
|
|
func (r *Raft) RemovePeer(peer ServerAddress) Future {
|
|
if r.protocolVersion > 2 {
|
|
return errorFuture{ErrUnsupportedProtocol}
|
|
}
|
|
|
|
return r.requestConfigChange(configurationChangeRequest{
|
|
command: RemoveServer,
|
|
serverID: ServerID(peer),
|
|
prevIndex: 0,
|
|
}, 0)
|
|
}
|
|
|
|
// AddVoter will add the given server to the cluster as a staging server. If the
|
|
// server is already in the cluster as a voter, this updates the server's address.
|
|
// This must be run on the leader or it will fail. The leader will promote the
|
|
// staging server to a voter once that server is ready. If nonzero, prevIndex is
|
|
// the index of the only configuration upon which this change may be applied; if
|
|
// another configuration entry has been added in the meantime, this request will
|
|
// fail. If nonzero, timeout is how long this server should wait before the
|
|
// configuration change log entry is appended.
|
|
func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
|
|
if r.protocolVersion < 2 {
|
|
return errorFuture{ErrUnsupportedProtocol}
|
|
}
|
|
|
|
return r.requestConfigChange(configurationChangeRequest{
|
|
command: AddVoter,
|
|
serverID: id,
|
|
serverAddress: address,
|
|
prevIndex: prevIndex,
|
|
}, timeout)
|
|
}
|
|
|
|
// AddNonvoter will add the given server to the cluster but won't assign it a
|
|
// vote. The server will receive log entries, but it won't participate in
|
|
// elections or log entry commitment. If the server is already in the cluster,
|
|
// this updates the server's address. This must be run on the leader or it will
|
|
// fail. For prevIndex and timeout, see AddVoter.
|
|
func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
|
|
if r.protocolVersion < 3 {
|
|
return errorFuture{ErrUnsupportedProtocol}
|
|
}
|
|
|
|
return r.requestConfigChange(configurationChangeRequest{
|
|
command: AddNonvoter,
|
|
serverID: id,
|
|
serverAddress: address,
|
|
prevIndex: prevIndex,
|
|
}, timeout)
|
|
}
|
|
|
|
// RemoveServer will remove the given server from the cluster. If the current
|
|
// leader is being removed, it will cause a new election to occur. This must be
|
|
// run on the leader or it will fail. For prevIndex and timeout, see AddVoter.
|
|
func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture {
|
|
if r.protocolVersion < 2 {
|
|
return errorFuture{ErrUnsupportedProtocol}
|
|
}
|
|
|
|
return r.requestConfigChange(configurationChangeRequest{
|
|
command: RemoveServer,
|
|
serverID: id,
|
|
prevIndex: prevIndex,
|
|
}, timeout)
|
|
}
|
|
|
|
// DemoteVoter will take away a server's vote, if it has one. If present, the
|
|
// server will continue to receive log entries, but it won't participate in
|
|
// elections or log entry commitment. If the server is not in the cluster, this
|
|
// does nothing. This must be run on the leader or it will fail. For prevIndex
|
|
// and timeout, see AddVoter.
|
|
func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture {
|
|
if r.protocolVersion < 3 {
|
|
return errorFuture{ErrUnsupportedProtocol}
|
|
}
|
|
|
|
return r.requestConfigChange(configurationChangeRequest{
|
|
command: DemoteVoter,
|
|
serverID: id,
|
|
prevIndex: prevIndex,
|
|
}, timeout)
|
|
}
|
|
|
|
// Shutdown is used to stop the Raft background routines.
|
|
// This is not a graceful operation. Provides a future that
|
|
// can be used to block until all background routines have exited.
|
|
func (r *Raft) Shutdown() Future {
|
|
r.shutdownLock.Lock()
|
|
defer r.shutdownLock.Unlock()
|
|
|
|
if !r.shutdown {
|
|
close(r.shutdownCh)
|
|
r.shutdown = true
|
|
r.setState(Shutdown)
|
|
return &shutdownFuture{r}
|
|
}
|
|
|
|
// avoid closing transport twice
|
|
return &shutdownFuture{nil}
|
|
}
|
|
|
|
// Snapshot is used to manually force Raft to take a snapshot. Returns a future
|
|
// that can be used to block until complete, and that contains a function that
|
|
// can be used to open the snapshot.
|
|
func (r *Raft) Snapshot() SnapshotFuture {
|
|
future := &userSnapshotFuture{}
|
|
future.init()
|
|
select {
|
|
case r.userSnapshotCh <- future:
|
|
return future
|
|
case <-r.shutdownCh:
|
|
future.respond(ErrRaftShutdown)
|
|
return future
|
|
}
|
|
}
|
|
|
|
// Restore is used to manually force Raft to consume an external snapshot, such
|
|
// as if restoring from a backup. We will use the current Raft configuration,
|
|
// not the one from the snapshot, so that we can restore into a new cluster. We
|
|
// will also use the max of the index of the snapshot, or the current index,
|
|
// and then add 1 to that, so we force a new state with a hole in the Raft log,
|
|
// so that the snapshot will be sent to followers and used for any new joiners.
|
|
// This can only be run on the leader, and blocks until the restore is complete
|
|
// or an error occurs.
|
|
//
|
|
// WARNING! This operation has the leader take on the state of the snapshot and
|
|
// then sets itself up so that it replicates that to its followers though the
|
|
// install snapshot process. This involves a potentially dangerous period where
|
|
// the leader commits ahead of its followers, so should only be used for disaster
|
|
// recovery into a fresh cluster, and should not be used in normal operations.
|
|
func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Duration) error {
|
|
metrics.IncrCounter([]string{"raft", "restore"}, 1)
|
|
var timer <-chan time.Time
|
|
if timeout > 0 {
|
|
timer = time.After(timeout)
|
|
}
|
|
|
|
// Perform the restore.
|
|
restore := &userRestoreFuture{
|
|
meta: meta,
|
|
reader: reader,
|
|
}
|
|
restore.init()
|
|
select {
|
|
case <-timer:
|
|
return ErrEnqueueTimeout
|
|
case <-r.shutdownCh:
|
|
return ErrRaftShutdown
|
|
case r.userRestoreCh <- restore:
|
|
// If the restore is ingested then wait for it to complete.
|
|
if err := restore.Error(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Apply a no-op log entry. Waiting for this allows us to wait until the
|
|
// followers have gotten the restore and replicated at least this new
|
|
// entry, which shows that we've also faulted and installed the
|
|
// snapshot with the contents of the restore.
|
|
noop := &logFuture{
|
|
log: Log{
|
|
Type: LogNoop,
|
|
},
|
|
}
|
|
noop.init()
|
|
select {
|
|
case <-timer:
|
|
return ErrEnqueueTimeout
|
|
case <-r.shutdownCh:
|
|
return ErrRaftShutdown
|
|
case r.applyCh <- noop:
|
|
return noop.Error()
|
|
}
|
|
}
|
|
|
|
// State returns the state of this raft peer.
|
|
func (r *Raft) State() RaftState {
|
|
return r.getState()
|
|
}
|
|
|
|
// LeaderCh is used to get a channel which delivers signals on acquiring or
|
|
// losing leadership. It sends true if we become the leader, and false if we
|
|
// lose it.
|
|
//
|
|
// Receivers can expect to receive a notification only if leadership
|
|
// transition has occurred.
|
|
//
|
|
// If receivers aren't ready for the signal, signals may drop and only the
|
|
// latest leadership transition. For example, if a receiver receives subsequent
|
|
// `true` values, they may deduce that leadership was lost and regained while
|
|
// the receiver was processing first leadership transition.
|
|
func (r *Raft) LeaderCh() <-chan bool {
|
|
return r.leaderCh
|
|
}
|
|
|
|
// String returns a string representation of this Raft node.
|
|
func (r *Raft) String() string {
|
|
return fmt.Sprintf("Node at %s [%v]", r.localAddr, r.getState())
|
|
}
|
|
|
|
// LastContact returns the time of last contact by a leader.
|
|
// This only makes sense if we are currently a follower.
|
|
func (r *Raft) LastContact() time.Time {
|
|
r.lastContactLock.RLock()
|
|
last := r.lastContact
|
|
r.lastContactLock.RUnlock()
|
|
return last
|
|
}
|
|
|
|
// Stats is used to return a map of various internal stats. This
|
|
// should only be used for informative purposes or debugging.
|
|
//
|
|
// Keys are: "state", "term", "last_log_index", "last_log_term",
|
|
// "commit_index", "applied_index", "fsm_pending",
|
|
// "last_snapshot_index", "last_snapshot_term",
|
|
// "latest_configuration", "last_contact", and "num_peers".
|
|
//
|
|
// The value of "state" is a numeric constant representing one of
|
|
// the possible leadership states the node is in at any given time.
|
|
// the possible states are: "Follower", "Candidate", "Leader", "Shutdown".
|
|
//
|
|
// The value of "latest_configuration" is a string which contains
|
|
// the id of each server, its suffrage status, and its address.
|
|
//
|
|
// The value of "last_contact" is either "never" if there
|
|
// has been no contact with a leader, "0" if the node is in the
|
|
// leader state, or the time since last contact with a leader
|
|
// formatted as a string.
|
|
//
|
|
// The value of "num_peers" is the number of other voting servers in the
|
|
// cluster, not including this node. If this node isn't part of the
|
|
// configuration then this will be "0".
|
|
//
|
|
// All other values are uint64s, formatted as strings.
|
|
func (r *Raft) Stats() map[string]string {
|
|
toString := func(v uint64) string {
|
|
return strconv.FormatUint(v, 10)
|
|
}
|
|
lastLogIndex, lastLogTerm := r.getLastLog()
|
|
lastSnapIndex, lastSnapTerm := r.getLastSnapshot()
|
|
s := map[string]string{
|
|
"state": r.getState().String(),
|
|
"term": toString(r.getCurrentTerm()),
|
|
"last_log_index": toString(lastLogIndex),
|
|
"last_log_term": toString(lastLogTerm),
|
|
"commit_index": toString(r.getCommitIndex()),
|
|
"applied_index": toString(r.getLastApplied()),
|
|
"fsm_pending": toString(uint64(len(r.fsmMutateCh))),
|
|
"last_snapshot_index": toString(lastSnapIndex),
|
|
"last_snapshot_term": toString(lastSnapTerm),
|
|
"protocol_version": toString(uint64(r.protocolVersion)),
|
|
"protocol_version_min": toString(uint64(ProtocolVersionMin)),
|
|
"protocol_version_max": toString(uint64(ProtocolVersionMax)),
|
|
"snapshot_version_min": toString(uint64(SnapshotVersionMin)),
|
|
"snapshot_version_max": toString(uint64(SnapshotVersionMax)),
|
|
}
|
|
|
|
future := r.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
r.logger.Warn("could not get configuration for stats", "error", err)
|
|
} else {
|
|
configuration := future.Configuration()
|
|
s["latest_configuration_index"] = toString(future.Index())
|
|
s["latest_configuration"] = fmt.Sprintf("%+v", configuration.Servers)
|
|
|
|
// This is a legacy metric that we've seen people use in the wild.
|
|
hasUs := false
|
|
numPeers := 0
|
|
for _, server := range configuration.Servers {
|
|
if server.Suffrage == Voter {
|
|
if server.ID == r.localID {
|
|
hasUs = true
|
|
} else {
|
|
numPeers++
|
|
}
|
|
}
|
|
}
|
|
if !hasUs {
|
|
numPeers = 0
|
|
}
|
|
s["num_peers"] = toString(uint64(numPeers))
|
|
}
|
|
|
|
last := r.LastContact()
|
|
if r.getState() == Leader {
|
|
s["last_contact"] = "0"
|
|
} else if last.IsZero() {
|
|
s["last_contact"] = "never"
|
|
} else {
|
|
s["last_contact"] = fmt.Sprintf("%v", time.Now().Sub(last))
|
|
}
|
|
return s
|
|
}
|
|
|
|
// CurrentTerm returns the current term.
|
|
func (r *Raft) CurrentTerm() uint64 {
|
|
return r.getCurrentTerm()
|
|
}
|
|
|
|
// LastIndex returns the last index in stable storage,
|
|
// either from the last log or from the last snapshot.
|
|
func (r *Raft) LastIndex() uint64 {
|
|
return r.getLastIndex()
|
|
}
|
|
|
|
// CommitIndex returns the committed index.
|
|
// This API maybe helpful for server to implement the read index optimization
|
|
// as described in the Raft paper.
|
|
func (r *Raft) CommitIndex() uint64 {
|
|
return r.getCommitIndex()
|
|
}
|
|
|
|
// AppliedIndex returns the last index applied to the FSM. This is generally
|
|
// lagging behind the last index, especially for indexes that are persisted but
|
|
// have not yet been considered committed by the leader. NOTE - this reflects
|
|
// the last index that was sent to the application's FSM over the apply channel
|
|
// but DOES NOT mean that the application's FSM has yet consumed it and applied
|
|
// it to its internal state. Thus, the application's state may lag behind this
|
|
// index.
|
|
func (r *Raft) AppliedIndex() uint64 {
|
|
return r.getLastApplied()
|
|
}
|
|
|
|
// LeadershipTransfer will transfer leadership to a server in the cluster.
|
|
// This can only be called from the leader, or it will fail. The leader will
|
|
// stop accepting client requests, make sure the target server is up to date
|
|
// and starts the transfer with a TimeoutNow message. This message has the same
|
|
// effect as if the election timeout on the target server fires. Since
|
|
// it is unlikely that another server is starting an election, it is very
|
|
// likely that the target server is able to win the election. Note that raft
|
|
// protocol version 3 is not sufficient to use LeadershipTransfer. A recent
|
|
// version of that library has to be used that includes this feature. Using
|
|
// transfer leadership is safe however in a cluster where not every node has
|
|
// the latest version. If a follower cannot be promoted, it will fail
|
|
// gracefully.
|
|
func (r *Raft) LeadershipTransfer() Future {
|
|
if r.protocolVersion < 3 {
|
|
return errorFuture{ErrUnsupportedProtocol}
|
|
}
|
|
|
|
return r.initiateLeadershipTransfer(nil, nil)
|
|
}
|
|
|
|
// LeadershipTransferToServer does the same as LeadershipTransfer but takes a
|
|
// server in the arguments in case a leadership should be transitioned to a
|
|
// specific server in the cluster. Note that raft protocol version 3 is not
|
|
// sufficient to use LeadershipTransfer. A recent version of that library has
|
|
// to be used that includes this feature. Using transfer leadership is safe
|
|
// however in a cluster where not every node has the latest version. If a
|
|
// follower cannot be promoted, it will fail gracefully.
|
|
func (r *Raft) LeadershipTransferToServer(id ServerID, address ServerAddress) Future {
|
|
if r.protocolVersion < 3 {
|
|
return errorFuture{ErrUnsupportedProtocol}
|
|
}
|
|
|
|
return r.initiateLeadershipTransfer(&id, &address)
|
|
}
|