// Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 package raft import ( "fmt" "time" metrics "github.com/hashicorp/go-metrics/compat" ) // LogType describes various types of log entries. type LogType uint8 const ( // LogCommand is applied to a user FSM. LogCommand LogType = iota // LogNoop is used to assert leadership. LogNoop // LogAddPeerDeprecated is used to add a new peer. This should only be used with // older protocol versions designed to be compatible with unversioned // Raft servers. See comments in config.go for details. LogAddPeerDeprecated // LogRemovePeerDeprecated is used to remove an existing peer. This should only be // used with older protocol versions designed to be compatible with // unversioned Raft servers. See comments in config.go for details. LogRemovePeerDeprecated // LogBarrier is used to ensure all preceding operations have been // applied to the FSM. It is similar to LogNoop, but instead of returning // once committed, it only returns once the FSM manager acks it. Otherwise, // it is possible there are operations committed but not yet applied to // the FSM. LogBarrier // LogConfiguration establishes a membership change configuration. It is // created when a server is added, removed, promoted, etc. Only used // when protocol version 1 or greater is in use. LogConfiguration ) // String returns LogType as a human readable string. func (lt LogType) String() string { switch lt { case LogCommand: return "LogCommand" case LogNoop: return "LogNoop" case LogAddPeerDeprecated: return "LogAddPeerDeprecated" case LogRemovePeerDeprecated: return "LogRemovePeerDeprecated" case LogBarrier: return "LogBarrier" case LogConfiguration: return "LogConfiguration" default: return fmt.Sprintf("%d", lt) } } // Log entries are replicated to all members of the Raft cluster // and form the heart of the replicated state machine. type Log struct { // Index holds the index of the log entry. Index uint64 // Term holds the election term of the log entry. Term uint64 // Type holds the type of the log entry. Type LogType // Data holds the log entry's type-specific data. Data []byte // Extensions holds an opaque byte slice of information for middleware. It // is up to the client of the library to properly modify this as it adds // layers and remove those layers when appropriate. This value is a part of // the log, so very large values could cause timing issues. // // N.B. It is _up to the client_ to handle upgrade paths. For instance if // using this with go-raftchunking, the client should ensure that all Raft // peers are using a version that can handle that extension before ever // actually triggering chunking behavior. It is sometimes sufficient to // ensure that non-leaders are upgraded first, then the current leader is // upgraded, but a leader changeover during this process could lead to // trouble, so gating extension behavior via some flag in the client // program is also a good idea. Extensions []byte // AppendedAt stores the time the leader first appended this log to it's // LogStore. Followers will observe the leader's time. It is not used for // coordination or as part of the replication protocol at all. It exists only // to provide operational information for example how many seconds worth of // logs are present on the leader which might impact follower's ability to // catch up after restoring a large snapshot. We should never rely on this // being in the past when appending on a follower or reading a log back since // the clock skew can mean a follower could see a log with a future timestamp. // In general too the leader is not required to persist the log before // delivering to followers although the current implementation happens to do // this. AppendedAt time.Time } // LogStore is used to provide an interface for storing // and retrieving logs in a durable fashion. type LogStore interface { // FirstIndex returns the first index written. 0 for no entries. FirstIndex() (uint64, error) // LastIndex returns the last index written. 0 for no entries. LastIndex() (uint64, error) // GetLog gets a log entry at a given index. GetLog(index uint64, log *Log) error // StoreLog stores a log entry. StoreLog(log *Log) error // StoreLogs stores multiple log entries. By default the logs stored may not be contiguous with previous logs (i.e. may have a gap in Index since the last log written). If an implementation can't tolerate this it may optionally implement `MonotonicLogStore` to indicate that this is not allowed. This changes Raft's behaviour after restoring a user snapshot to remove all previous logs instead of relying on a "gap" to signal the discontinuity between logs before the snapshot and logs after. StoreLogs(logs []*Log) error // DeleteRange deletes a range of log entries. The range is inclusive. DeleteRange(min, max uint64) error } // MonotonicLogStore is an optional interface for LogStore implementations that // cannot tolerate gaps in between the Index values of consecutive log entries. For example, // this may allow more efficient indexing because the Index values are densely populated. If true is // returned, Raft will avoid relying on gaps to trigger re-synching logs on followers after a // snapshot is restored. The LogStore must have an efficient implementation of // DeleteLogs for the case where all logs are removed, as this must be called after snapshot restore when gaps are not allowed. // We avoid deleting all records for LogStores that do not implement MonotonicLogStore // because although it's always correct to do so, it has a major negative performance impact on the BoltDB store that is currently // the most widely used. type MonotonicLogStore interface { IsMonotonic() bool } func oldestLog(s LogStore) (Log, error) { var l Log // We might get unlucky and have a truncate right between getting first log // index and fetching it so keep trying until we succeed or hard fail. var lastFailIdx uint64 var lastErr error for { firstIdx, err := s.FirstIndex() if err != nil { return l, err } if firstIdx == 0 { return l, ErrLogNotFound } if firstIdx == lastFailIdx { // Got same index as last time around which errored, don't bother trying // to fetch it again just return the error. return l, lastErr } err = s.GetLog(firstIdx, &l) if err == nil { // We found the oldest log, break the loop break } // We failed, keep trying to see if there is a new firstIndex lastFailIdx = firstIdx lastErr = err } return l, nil } func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, stopCh <-chan struct{}) { for { select { case <-time.After(interval): // In error case emit 0 as the age ageMs := float32(0.0) l, err := oldestLog(s) if err == nil && !l.AppendedAt.IsZero() { ageMs = float32(time.Since(l.AppendedAt).Milliseconds()) } metrics.SetGauge(append(prefix, "oldestLogAge"), ageMs) case <-stopCh: return } } }