193 lines
6.9 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package raft
import (
"fmt"
"time"
metrics "github.com/hashicorp/go-metrics/compat"
)
// LogType describes various types of log entries.
type LogType uint8
const (
// LogCommand is applied to a user FSM.
LogCommand LogType = iota
// LogNoop is used to assert leadership.
LogNoop
// LogAddPeerDeprecated is used to add a new peer. This should only be used with
// older protocol versions designed to be compatible with unversioned
// Raft servers. See comments in config.go for details.
LogAddPeerDeprecated
// LogRemovePeerDeprecated is used to remove an existing peer. This should only be
// used with older protocol versions designed to be compatible with
// unversioned Raft servers. See comments in config.go for details.
LogRemovePeerDeprecated
// LogBarrier is used to ensure all preceding operations have been
// applied to the FSM. It is similar to LogNoop, but instead of returning
// once committed, it only returns once the FSM manager acks it. Otherwise,
// it is possible there are operations committed but not yet applied to
// the FSM.
LogBarrier
// LogConfiguration establishes a membership change configuration. It is
// created when a server is added, removed, promoted, etc. Only used
// when protocol version 1 or greater is in use.
LogConfiguration
)
// String returns LogType as a human readable string.
func (lt LogType) String() string {
switch lt {
case LogCommand:
return "LogCommand"
case LogNoop:
return "LogNoop"
case LogAddPeerDeprecated:
return "LogAddPeerDeprecated"
case LogRemovePeerDeprecated:
return "LogRemovePeerDeprecated"
case LogBarrier:
return "LogBarrier"
case LogConfiguration:
return "LogConfiguration"
default:
return fmt.Sprintf("%d", lt)
}
}
// Log entries are replicated to all members of the Raft cluster
// and form the heart of the replicated state machine.
type Log struct {
// Index holds the index of the log entry.
Index uint64
// Term holds the election term of the log entry.
Term uint64
// Type holds the type of the log entry.
Type LogType
// Data holds the log entry's type-specific data.
Data []byte
// Extensions holds an opaque byte slice of information for middleware. It
// is up to the client of the library to properly modify this as it adds
// layers and remove those layers when appropriate. This value is a part of
// the log, so very large values could cause timing issues.
//
// N.B. It is _up to the client_ to handle upgrade paths. For instance if
// using this with go-raftchunking, the client should ensure that all Raft
// peers are using a version that can handle that extension before ever
// actually triggering chunking behavior. It is sometimes sufficient to
// ensure that non-leaders are upgraded first, then the current leader is
// upgraded, but a leader changeover during this process could lead to
// trouble, so gating extension behavior via some flag in the client
// program is also a good idea.
Extensions []byte
// AppendedAt stores the time the leader first appended this log to it's
// LogStore. Followers will observe the leader's time. It is not used for
// coordination or as part of the replication protocol at all. It exists only
// to provide operational information for example how many seconds worth of
// logs are present on the leader which might impact follower's ability to
// catch up after restoring a large snapshot. We should never rely on this
// being in the past when appending on a follower or reading a log back since
// the clock skew can mean a follower could see a log with a future timestamp.
// In general too the leader is not required to persist the log before
// delivering to followers although the current implementation happens to do
// this.
AppendedAt time.Time
}
// LogStore is used to provide an interface for storing
// and retrieving logs in a durable fashion.
type LogStore interface {
// FirstIndex returns the first index written. 0 for no entries.
FirstIndex() (uint64, error)
// LastIndex returns the last index written. 0 for no entries.
LastIndex() (uint64, error)
// GetLog gets a log entry at a given index.
GetLog(index uint64, log *Log) error
// StoreLog stores a log entry.
StoreLog(log *Log) error
// StoreLogs stores multiple log entries. By default the logs stored may not be contiguous with previous logs (i.e. may have a gap in Index since the last log written). If an implementation can't tolerate this it may optionally implement `MonotonicLogStore` to indicate that this is not allowed. This changes Raft's behaviour after restoring a user snapshot to remove all previous logs instead of relying on a "gap" to signal the discontinuity between logs before the snapshot and logs after.
StoreLogs(logs []*Log) error
// DeleteRange deletes a range of log entries. The range is inclusive.
DeleteRange(min, max uint64) error
}
// MonotonicLogStore is an optional interface for LogStore implementations that
// cannot tolerate gaps in between the Index values of consecutive log entries. For example,
// this may allow more efficient indexing because the Index values are densely populated. If true is
// returned, Raft will avoid relying on gaps to trigger re-synching logs on followers after a
// snapshot is restored. The LogStore must have an efficient implementation of
// DeleteLogs for the case where all logs are removed, as this must be called after snapshot restore when gaps are not allowed.
// We avoid deleting all records for LogStores that do not implement MonotonicLogStore
// because although it's always correct to do so, it has a major negative performance impact on the BoltDB store that is currently
// the most widely used.
type MonotonicLogStore interface {
IsMonotonic() bool
}
func oldestLog(s LogStore) (Log, error) {
var l Log
// We might get unlucky and have a truncate right between getting first log
// index and fetching it so keep trying until we succeed or hard fail.
var lastFailIdx uint64
var lastErr error
for {
firstIdx, err := s.FirstIndex()
if err != nil {
return l, err
}
if firstIdx == 0 {
return l, ErrLogNotFound
}
if firstIdx == lastFailIdx {
// Got same index as last time around which errored, don't bother trying
// to fetch it again just return the error.
return l, lastErr
}
err = s.GetLog(firstIdx, &l)
if err == nil {
// We found the oldest log, break the loop
break
}
// We failed, keep trying to see if there is a new firstIndex
lastFailIdx = firstIdx
lastErr = err
}
return l, nil
}
func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(interval):
// In error case emit 0 as the age
ageMs := float32(0.0)
l, err := oldestLog(s)
if err == nil && !l.AppendedAt.IsZero() {
ageMs = float32(time.Since(l.AppendedAt).Milliseconds())
}
metrics.SetGauge(append(prefix, "oldestLogAge"), ageMs)
case <-stopCh:
return
}
}
}