150 lines
2.7 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package raft
import (
"context"
"io"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
)
const (
snapshotRestoreMonitorInterval = 10 * time.Second
)
type snapshotRestoreMonitor struct {
logger hclog.Logger
cr CountingReader
size int64
networkTransfer bool
once sync.Once
cancel func()
doneCh chan struct{}
}
func startSnapshotRestoreMonitor(
logger hclog.Logger,
cr CountingReader,
size int64,
networkTransfer bool,
) *snapshotRestoreMonitor {
ctx, cancel := context.WithCancel(context.Background())
m := &snapshotRestoreMonitor{
logger: logger,
cr: cr,
size: size,
networkTransfer: networkTransfer,
cancel: cancel,
doneCh: make(chan struct{}),
}
go m.run(ctx)
return m
}
func (m *snapshotRestoreMonitor) run(ctx context.Context) {
defer close(m.doneCh)
ticker := time.NewTicker(snapshotRestoreMonitorInterval)
defer ticker.Stop()
ranOnce := false
for {
select {
case <-ctx.Done():
if !ranOnce {
m.runOnce()
}
return
case <-ticker.C:
m.runOnce()
ranOnce = true
}
}
}
func (m *snapshotRestoreMonitor) runOnce() {
readBytes := m.cr.Count()
pct := float64(100*readBytes) / float64(m.size)
message := "snapshot restore progress"
if m.networkTransfer {
message = "snapshot network transfer progress"
}
m.logger.Info(message,
"read-bytes", readBytes,
"percent-complete", hclog.Fmt("%0.2f%%", pct),
)
}
func (m *snapshotRestoreMonitor) StopAndWait() {
m.once.Do(func() {
m.cancel()
<-m.doneCh
})
}
type CountingReader interface {
io.Reader
Count() int64
}
type countingReader struct {
reader io.Reader
mu sync.Mutex
bytes int64
}
func (r *countingReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
r.mu.Lock()
r.bytes += int64(n)
r.mu.Unlock()
return n, err
}
func (r *countingReader) Count() int64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.bytes
}
func newCountingReader(r io.Reader) *countingReader {
return &countingReader{reader: r}
}
type countingReadCloser struct {
*countingReader
readCloser io.ReadCloser
}
func newCountingReadCloser(rc io.ReadCloser) *countingReadCloser {
return &countingReadCloser{
countingReader: newCountingReader(rc),
readCloser: rc,
}
}
func (c countingReadCloser) Close() error {
return c.readCloser.Close()
}
func (c countingReadCloser) WrappedReadCloser() io.ReadCloser {
return c.readCloser
}
// ReadCloserWrapper allows access to an underlying ReadCloser from a wrapper.
type ReadCloserWrapper interface {
io.ReadCloser
WrappedReadCloser() io.ReadCloser
}
var _ ReadCloserWrapper = &countingReadCloser{}