Skip to content

Commit

Permalink
Improve node setup pt. 3 (#1435)
Browse files Browse the repository at this point in the history
This PR cleans up the way we create `bus`, `worker` and `autopilot` and
combine them into a node. It should now be a lot easier for 3rd party
devs to follow along with how nodes are being constructed and
configured, as well as use our library in their own codebase.

Fixes #1394
  • Loading branch information
ChrisSchinnerl authored Aug 15, 2024
2 parents 37dc91a + e3c9f37 commit 4ada222
Show file tree
Hide file tree
Showing 46 changed files with 1,814 additions and 1,739 deletions.
32 changes: 17 additions & 15 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.sia.tech/renterd/autopilot/contractor"
"go.sia.tech/renterd/autopilot/scanner"
"go.sia.tech/renterd/build"
"go.sia.tech/renterd/config"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/webhooks"
Expand Down Expand Up @@ -123,31 +124,32 @@ type Autopilot struct {
}

// New initializes an Autopilot.
func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (_ *Autopilot, err error) {
func New(cfg config.Autopilot, bus Bus, workers []Worker, logger *zap.Logger) (_ *Autopilot, err error) {
logger = logger.Named("autopilot").Named(cfg.ID)
shutdownCtx, shutdownCtxCancel := context.WithCancel(context.Background())
ap := &Autopilot{
alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)),
id: id,
alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", cfg.ID)),
id: cfg.ID,
bus: bus,
logger: logger.Sugar().Named("autopilot").Named(id),
logger: logger.Sugar(),
workers: newWorkerPool(workers),

shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,

tickerDuration: heartbeat,
tickerDuration: cfg.Heartbeat,

pruningAlertIDs: make(map[types.FileContractID]types.Hash256),
}

ap.s, err = scanner.New(ap.bus, scannerBatchSize, scannerNumThreads, scannerScanInterval, ap.logger)
ap.s, err = scanner.New(ap.bus, cfg.ScannerBatchSize, cfg.ScannerNumThreads, cfg.ScannerInterval, logger)
if err != nil {
return
}

ap.c = contractor.New(bus, bus, ap.logger, revisionSubmissionBuffer, revisionBroadcastInterval)
ap.m = newMigrator(ap, migrationHealthCutoff, migratorParallelSlabsPerWorker)
ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, accountsRefillInterval, revisionSubmissionBuffer)
ap.c = contractor.New(bus, bus, ap.logger, cfg.RevisionSubmissionBuffer, cfg.RevisionBroadcastInterval)
ap.m = newMigrator(ap, cfg.MigrationHealthCutoff, cfg.MigratorParallelSlabsPerWorker)
ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, cfg.AccountsRefillInterval, cfg.RevisionSubmissionBuffer)

return ap, nil
}
Expand Down Expand Up @@ -209,11 +211,11 @@ func (ap *Autopilot) configHandlerPOST(jc jape.Context) {
jc.Encode(res)
}

func (ap *Autopilot) Run() error {
func (ap *Autopilot) Run() {
ap.startStopMu.Lock()
if ap.isRunning() {
ap.startStopMu.Unlock()
return errors.New("already running")
return
}
ap.startTime = time.Now()
ap.triggerChan = make(chan bool, 1)
Expand All @@ -226,15 +228,15 @@ func (ap *Autopilot) Run() error {
// block until the autopilot is online
if online := ap.blockUntilOnline(); !online {
ap.logger.Error("autopilot stopped before it was able to come online")
return nil
return
}

// schedule a trigger when the wallet receives its first deposit
if err := ap.tryScheduleTriggerWhenFunded(); err != nil {
if !errors.Is(err, context.Canceled) {
ap.logger.Error(err)
}
return nil
return
}

var forceScan bool
Expand Down Expand Up @@ -342,15 +344,15 @@ func (ap *Autopilot) Run() error {

select {
case <-ap.shutdownCtx.Done():
return nil
return
case forceScan = <-ap.triggerChan:
ap.logger.Info("autopilot iteration triggered")
ap.ticker.Reset(ap.tickerDuration)
case <-ap.ticker.C:
case <-tickerFired:
}
}
return nil
return
}

// Shutdown shuts down the autopilot.
Expand Down
5 changes: 3 additions & 2 deletions autopilot/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ type (
}
)

func New(hs HostStore, scanBatchSize, scanThreads uint64, scanMinInterval time.Duration, logger *zap.SugaredLogger) (Scanner, error) {
func New(hs HostStore, scanBatchSize, scanThreads uint64, scanMinInterval time.Duration, logger *zap.Logger) (Scanner, error) {
logger = logger.Named("scanner")
if scanBatchSize == 0 {
return nil, errors.New("scanner batch size has to be greater than zero")
}
Expand All @@ -80,7 +81,7 @@ func New(hs HostStore, scanBatchSize, scanThreads uint64, scanMinInterval time.D
scanInterval: scanMinInterval,

statsHostPingMS: utils.NewDataPoints(0),
logger: logger.Named("scanner"),
logger: logger.Sugar(),

interruptChan: make(chan struct{}),
shutdownChan: make(chan struct{}),
Expand Down
2 changes: 1 addition & 1 deletion autopilot/scanner/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestScanner(t *testing.T) {
hs := &mockHostStore{hosts: test.NewHosts(100)}

// create test scanner
s, err := New(hs, testBatchSize, testNumThreads, time.Minute, zap.NewNop().Sugar())
s, err := New(hs, testBatchSize, testNumThreads, time.Minute, zap.NewNop())
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 4ada222

Please sign in to comment.