From 49375514bc81de627f55dd4e0fee64ea744077c9 Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 1 Nov 2024 12:56:18 +0100 Subject: [PATCH 1/4] worker: use new hosts endpoint --- api/contract.go | 8 +++ api/host.go | 5 +- bus/bus.go | 14 ++-- bus/routes.go | 8 ++- cmd/renterd/config.go | 3 +- cmd/renterd/node.go | 2 +- config/config.go | 3 +- internal/test/e2e/cluster.go | 2 +- internal/worker/accounts.go | 101 +++++++++++--------------- internal/worker/accounts_test.go | 7 +- internal/worker/cache.go | 120 ++++++------------------------- internal/worker/cache_test.go | 38 +++++----- stores/hostdb.go | 4 +- stores/hostdb_test.go | 20 +++++- stores/metadata_test.go | 3 + stores/sql/database.go | 2 +- stores/sql/main.go | 16 +++-- stores/sql/mysql/main.go | 4 +- stores/sql/sqlite/main.go | 4 +- worker/bench_test.go | 2 +- worker/download.go | 49 ++++++------- worker/downloader.go | 6 +- worker/downloader_test.go | 2 +- worker/host.go | 87 ++++++++++++---------- worker/host_test.go | 12 +++- worker/migrations.go | 45 ++++++------ worker/mocks_test.go | 20 ++++++ worker/pricetables.go | 2 +- worker/upload.go | 2 +- worker/upload_test.go | 28 ++++---- worker/uploader.go | 2 +- worker/worker.go | 51 +++++++------ worker/worker_test.go | 11 +++ 33 files changed, 341 insertions(+), 342 deletions(-) diff --git a/api/contract.go b/api/contract.go index 486304be9..322dd782f 100644 --- a/api/contract.go +++ b/api/contract.go @@ -246,3 +246,11 @@ func (cm ContractMetadata) InSet(set string) bool { } return false } + +func (cm ContractMetadata) HostInfo() HostInfo { + return HostInfo{ + PublicKey: cm.HostKey, + ContractID: cm.ID, + SiamuxAddr: cm.SiamuxAddr, + } +} diff --git a/api/host.go b/api/host.go index f5056d89c..ebb386486 100644 --- a/api/host.go +++ b/api/host.go @@ -127,8 +127,9 @@ type ( } HostInfo struct { - PublicKey types.PublicKey `json:"publicKey"` - SiamuxAddr string `json:"siamuxAddr"` + ContractID types.FileContractID `json:"contractID"` + PublicKey types.PublicKey `json:"publicKey"` + SiamuxAddr string `json:"siamuxAddr"` } HostInteractions struct { diff --git a/bus/bus.go b/bus/bus.go index 38b6e4537..0701bb1c4 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -212,7 +212,7 @@ type ( UpdateHostAllowlistEntries(ctx context.Context, add, remove []types.PublicKey, clear bool) error UpdateHostBlocklistEntries(ctx context.Context, add, remove []string, clear bool) error UpdateHostCheck(ctx context.Context, autopilotID string, hk types.PublicKey, check api.HostCheck) error - UsableHosts(ctx context.Context, offset, limit int) ([]api.HostInfo, error) + UsableHosts(ctx context.Context, minWindowStart uint64, offset, limit int) ([]api.HostInfo, error) } // A MetadataStore stores information about contracts and objects. @@ -314,8 +314,9 @@ type ( ) type Bus struct { - startTime time.Time - masterKey utils.MasterKey + startTime time.Time + masterKey utils.MasterKey + revisionSubmissionBuffer uint64 alerts alerts.Alerter alertMgr AlertManager @@ -340,13 +341,14 @@ type Bus struct { } // New returns a new Bus -func New(ctx context.Context, masterKey [32]byte, am AlertManager, wm WebhooksManager, cm ChainManager, s Syncer, w Wallet, store Store, announcementMaxAge time.Duration, explorerURL string, l *zap.Logger) (_ *Bus, err error) { +func New(ctx context.Context, masterKey [32]byte, am AlertManager, wm WebhooksManager, cm ChainManager, s Syncer, w Wallet, store Store, announcementMaxAge time.Duration, explorerURL string, revisionSubmissionBuffer uint64, l *zap.Logger) (_ *Bus, err error) { l = l.Named("bus") dialer := rhp.NewFallbackDialer(store, net.Dialer{}, l) b := &Bus{ - startTime: time.Now(), - masterKey: masterKey, + startTime: time.Now(), + masterKey: masterKey, + revisionSubmissionBuffer: revisionSubmissionBuffer, s: s, cm: cm, diff --git a/bus/routes.go b/bus/routes.go index 28141ed46..5484af145 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -501,7 +501,13 @@ func (b *Bus) hostsHandlerGET(jc jape.Context) { return } - hosts, err := b.store.UsableHosts(jc.Request.Context(), offset, limit) + cs, err := b.consensusState(jc.Request.Context()) + if jc.Check("couldn't fetch consensus state", err) != nil { + return + } + minWindowStart := cs.BlockHeight + b.revisionSubmissionBuffer + + hosts, err := b.store.UsableHosts(jc.Request.Context(), minWindowStart, offset, limit) if jc.Check("couldn't fetch hosts", err) != nil { return } diff --git a/cmd/renterd/config.go b/cmd/renterd/config.go index 56a5437a7..68827c115 100644 --- a/cmd/renterd/config.go +++ b/cmd/renterd/config.go @@ -89,8 +89,9 @@ func defaultConfig() config.Config { AnnouncementMaxAgeHours: 24 * 7 * 52, // 1 year Bootstrap: true, GatewayAddr: ":9981", - UsedUTXOExpiry: 24 * time.Hour, + RevisionSubmissionBuffer: 150, // 144 + 6 blocks leeway SlabBufferCompletionThreshold: 1 << 12, + UsedUTXOExpiry: 24 * time.Hour, }, Worker: config.Worker{ Enabled: true, diff --git a/cmd/renterd/node.go b/cmd/renterd/node.go index 2cbd522c4..db19297e3 100644 --- a/cmd/renterd/node.go +++ b/cmd/renterd/node.go @@ -386,7 +386,7 @@ func newBus(ctx context.Context, cfg config.Config, pk types.PrivateKey, network // create bus announcementMaxAgeHours := time.Duration(cfg.Bus.AnnouncementMaxAgeHours) * time.Hour - b, err := bus.New(ctx, masterKey, alertsMgr, wh, cm, s, w, sqlStore, announcementMaxAgeHours, explorerURL, logger) + b, err := bus.New(ctx, masterKey, alertsMgr, wh, cm, s, w, sqlStore, announcementMaxAgeHours, explorerURL, cfg.Bus.RevisionSubmissionBuffer, logger) if err != nil { return nil, nil, fmt.Errorf("failed to create bus: %w", err) } diff --git a/config/config.go b/config/config.go index f74a765e8..d97bfff60 100644 --- a/config/config.go +++ b/config/config.go @@ -59,8 +59,9 @@ type ( GatewayAddr string `yaml:"gatewayAddr,omitempty"` RemoteAddr string `yaml:"remoteAddr,omitempty"` RemotePassword string `yaml:"remotePassword,omitempty"` - UsedUTXOExpiry time.Duration `yaml:"usedUtxoExpiry,omitempty"` + RevisionSubmissionBuffer uint64 `yaml:"revisionSubmissionBuffer,omitempty"` SlabBufferCompletionThreshold int64 `yaml:"slabBufferCompleionThreshold,omitempty"` + UsedUTXOExpiry time.Duration `yaml:"usedUtxoExpiry,omitempty"` } // LogFile configures the file output of the logger. diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index ce8e5ff6e..7f154798b 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -617,7 +617,7 @@ func newTestBus(ctx context.Context, cm *chain.Manager, genesisBlock types.Block // create bus announcementMaxAgeHours := time.Duration(cfg.AnnouncementMaxAgeHours) * time.Hour - b, err := bus.New(ctx, masterKey, alertsMgr, wh, cm, s, w, sqlStore, announcementMaxAgeHours, "", logger) + b, err := bus.New(ctx, masterKey, alertsMgr, wh, cm, s, w, sqlStore, announcementMaxAgeHours, "", cfg.RevisionSubmissionBuffer, logger) if err != nil { return nil, nil, nil, nil, err } diff --git a/internal/worker/accounts.go b/internal/worker/accounts.go index f73c9f529..7f938ea8e 100644 --- a/internal/worker/accounts.go +++ b/internal/worker/accounts.go @@ -33,11 +33,11 @@ var ( type ( AccountFunder interface { - FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, desired types.Currency) error + FundAccount(ctx context.Context, hi api.HostInfo, desired types.Currency) error } AccountSyncer interface { - SyncAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string) error + SyncAccount(ctx context.Context, hi api.HostInfo) error } AccountStore interface { @@ -49,27 +49,26 @@ type ( ConsensusState(ctx context.Context) (api.ConsensusState, error) } - DownloadContracts interface { - DownloadContracts(ctx context.Context) ([]api.ContractMetadata, error) + HostStore interface { + UsableHosts(ctx context.Context) ([]api.HostInfo, error) } ) type ( AccountMgr struct { - alerts alerts.Alerter - funder AccountFunder - syncer AccountSyncer - dc DownloadContracts - cs ConsensusState - s AccountStore - key utils.AccountsKey - logger *zap.SugaredLogger - owner string - refillInterval time.Duration - revisionSubmissionBuffer uint64 - shutdownCtx context.Context - shutdownCancel context.CancelFunc - wg sync.WaitGroup + alerts alerts.Alerter + funder AccountFunder + syncer AccountSyncer + cs ConsensusState + s AccountStore + hs HostStore + key utils.AccountsKey + logger *zap.SugaredLogger + owner string + refillInterval time.Duration + shutdownCtx context.Context + shutdownCancel context.CancelFunc + wg sync.WaitGroup mu sync.Mutex byID map[rhpv3.Account]*Account @@ -92,7 +91,7 @@ type ( // NewAccountManager creates a new account manager. It will load all accounts // from the given store and mark the shutdown as unclean. When Shutdown is // called it will save all accounts. -func NewAccountManager(key utils.AccountsKey, owner string, alerter alerts.Alerter, funder AccountFunder, syncer AccountSyncer, cs ConsensusState, dc DownloadContracts, s AccountStore, refillInterval time.Duration, l *zap.Logger) (*AccountMgr, error) { +func NewAccountManager(key utils.AccountsKey, owner string, alerter alerts.Alerter, funder AccountFunder, syncer AccountSyncer, cs ConsensusState, hs HostStore, s AccountStore, refillInterval time.Duration, l *zap.Logger) (*AccountMgr, error) { logger := l.Named("accounts").Sugar() shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) @@ -101,7 +100,7 @@ func NewAccountManager(key utils.AccountsKey, owner string, alerter alerts.Alert funder: funder, syncer: syncer, cs: cs, - dc: dc, + hs: hs, s: s, key: key, logger: logger, @@ -309,40 +308,33 @@ func (a *AccountMgr) markRefillDone(hk types.PublicKey) { // goroutine from a previous call, refillWorkerAccounts will skip that account // until the previously launched goroutine returns. func (a *AccountMgr) refillAccounts() { - // fetch config - cs, err := a.cs.ConsensusState(a.shutdownCtx) + // fetch all usable hosts + hosts, err := a.hs.UsableHosts(a.shutdownCtx) if err != nil { - a.logger.Errorw(fmt.Sprintf("failed to fetch consensus state for refill: %v", err)) + a.logger.Errorw(fmt.Sprintf("failed to fetch usable hosts: %v", err)) return - } - - // fetch all contracts - contracts, err := a.dc.DownloadContracts(a.shutdownCtx) - if err != nil { - a.logger.Errorw(fmt.Sprintf("failed to fetch contracts for refill: %v", err)) - return - } else if len(contracts) == 0 { + } else if len(hosts) == 0 { return } // refill accounts in separate goroutines - for _, c := range contracts { + for _, hi := range hosts { // launch refill if not already in progress - if a.markRefillInProgress(c.HostKey) { - go func(contract api.ContractMetadata) { - defer a.markRefillDone(contract.HostKey) + if a.markRefillInProgress(hi.PublicKey) { + go func() { + defer a.markRefillDone(hi.PublicKey) rCtx, cancel := context.WithTimeout(a.shutdownCtx, 5*time.Minute) defer cancel() // refill - refilled, err := a.refillAccount(rCtx, c, cs.BlockHeight, a.revisionSubmissionBuffer) + refilled, err := a.refillAccount(rCtx, hi) // determine whether to log something shouldLog := true a.mu.Lock() - if t, exists := a.lastLoggedRefillErr[contract.HostKey]; !exists || err == nil { - a.lastLoggedRefillErr[contract.HostKey] = time.Now() + if t, exists := a.lastLoggedRefillErr[hi.PublicKey]; !exists || err == nil { + a.lastLoggedRefillErr[hi.PublicKey] = time.Now() } else if time.Since(t) < time.Hour { // only log error once per hour per account shouldLog = false @@ -350,34 +342,27 @@ func (a *AccountMgr) refillAccounts() { a.mu.Unlock() if err != nil && shouldLog { - a.logger.Error("failed to refill account for host", zap.Stringer("hostKey", contract.HostKey), zap.Error(err)) + a.logger.Error("failed to refill account for host", zap.Stringer("hostKey", hi.PublicKey), zap.Error(err)) } else if refilled { - a.logger.Infow("successfully refilled account for host", zap.Stringer("hostKey", contract.HostKey), zap.Error(err)) + a.logger.Infow("successfully refilled account for host", zap.Stringer("hostKey", hi.PublicKey), zap.Error(err)) } - }(c) + }() } } } -func (a *AccountMgr) refillAccount(ctx context.Context, contract api.ContractMetadata, bh, revisionSubmissionBuffer uint64) (bool, error) { +func (a *AccountMgr) refillAccount(ctx context.Context, hi api.HostInfo) (bool, error) { // fetch the account - account := a.Account(contract.HostKey) - - // check if the contract is too close to the proof window to be revised, - // trying to refill the account would result in the host not returning the - // revision and returning an obfuscated error - if (bh + revisionSubmissionBuffer) > contract.WindowStart { - return false, fmt.Errorf("contract %v is too close to the proof window to be revised", contract.ID) - } + account := a.Account(hi.PublicKey) // check if a host is potentially cheating before refilling. // We only check against the max drift if the account's drift is // negative because we don't care if we have more money than // expected. if account.Drift.Cmp(maxNegDrift) < 0 { - alert := newAccountRefillAlert(account.ID, contract, errMaxDriftExceeded, + alert := newAccountRefillAlert(account.ID, hi.PublicKey, hi.ContractID, errMaxDriftExceeded, "accountID", account.ID.String(), - "hostKey", contract.HostKey.String(), + "hostKey", hi.PublicKey.String(), "balance", account.Balance.String(), "drift", account.Drift.String(), ) @@ -390,13 +375,13 @@ func (a *AccountMgr) refillAccount(ctx context.Context, contract api.ContractMet // check if a resync is needed if account.RequiresSync { // sync the account - err := a.syncer.SyncAccount(ctx, contract.ID, contract.HostKey, contract.SiamuxAddr) + err := a.syncer.SyncAccount(ctx, hi) if err != nil { return false, fmt.Errorf("failed to sync account's balance: %w", err) } // refetch the account after syncing - account = a.Account(contract.HostKey) + account = a.Account(hi.PublicKey) } // check if refill is needed @@ -405,7 +390,7 @@ func (a *AccountMgr) refillAccount(ctx context.Context, contract api.ContractMet } // fund the account - err := a.funder.FundAccount(ctx, contract.ID, contract.HostKey, maxBalance) + err := a.funder.FundAccount(ctx, hi, maxBalance) if err != nil { return false, fmt.Errorf("failed to fund account: %w", err) } @@ -593,12 +578,12 @@ func (a *Account) setBalance(balance *big.Int) { zap.Stringer("drift", drift)) } -func newAccountRefillAlert(id rhpv3.Account, contract api.ContractMetadata, err error, keysAndValues ...string) alerts.Alert { +func newAccountRefillAlert(id rhpv3.Account, hk types.PublicKey, fcid types.FileContractID, err error, keysAndValues ...string) alerts.Alert { data := map[string]interface{}{ "error": err.Error(), "accountID": id.String(), - "contractID": contract.ID.String(), - "hostKey": contract.HostKey.String(), + "contractID": fcid.String(), + "hostKey": hk.String(), } for i := 0; i < len(keysAndValues); i += 2 { data[keysAndValues[i]] = keysAndValues[i+1] diff --git a/internal/worker/accounts_test.go b/internal/worker/accounts_test.go index d33e67200..643e80721 100644 --- a/internal/worker/accounts_test.go +++ b/internal/worker/accounts_test.go @@ -30,10 +30,10 @@ func (b *mockAccountMgrBackend) RegisterAlert(context.Context, alerts.Alert) err return nil } -func (b *mockAccountMgrBackend) FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, balance types.Currency) error { +func (b *mockAccountMgrBackend) FundAccount(ctx context.Context, hi api.HostInfo, balance types.Currency) error { return nil } -func (b *mockAccountMgrBackend) SyncAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string) error { +func (b *mockAccountMgrBackend) SyncAccount(ctx context.Context, hi api.HostInfo) error { return nil } func (b *mockAccountMgrBackend) Accounts(context.Context, string) ([]api.Account, error) { @@ -48,6 +48,9 @@ func (b *mockAccountMgrBackend) ConsensusState(ctx context.Context) (api.Consens func (b *mockAccountMgrBackend) DownloadContracts(ctx context.Context) ([]api.ContractMetadata, error) { return nil, nil } +func (b *mockAccountMgrBackend) UsableHosts(ctx context.Context) ([]api.HostInfo, error) { + return nil, nil +} func TestAccounts(t *testing.T) { // create a manager with an account for a single host diff --git a/internal/worker/cache.go b/internal/worker/cache.go index d357293ff..2ea434928 100644 --- a/internal/worker/cache.go +++ b/internal/worker/cache.go @@ -18,8 +18,8 @@ import ( ) const ( - cacheKeyDownloadContracts = "downloadcontracts" - cacheKeyGougingParams = "gougingparams" + cacheKeyGougingParams = "gougingparams" + cacheKeyUsableHosts = "usablehosts" cacheEntryExpiry = 5 * time.Minute ) @@ -83,12 +83,12 @@ func (c *memoryCache) Invalidate(key string) { type ( Bus interface { - Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) + UsableHosts(ctx context.Context, opts api.UsableHostOptions) ([]api.HostInfo, error) GougingParams(ctx context.Context) (api.GougingParams, error) } WorkerCache interface { - DownloadContracts(ctx context.Context) ([]api.ContractMetadata, error) + UsableHosts(ctx context.Context) ([]api.HostInfo, error) GougingParams(ctx context.Context) (api.GougingParams, error) HandleEvent(event webhooks.Event) error Subscribe(e EventSubscriber) error @@ -115,28 +115,28 @@ func NewCache(b Bus, logger *zap.Logger) WorkerCache { } } -func (c *cache) DownloadContracts(ctx context.Context) (contracts []api.ContractMetadata, err error) { +func (c *cache) UsableHosts(ctx context.Context) (hosts []api.HostInfo, err error) { // fetch directly from bus if the cache is not ready if !c.isReady() { c.logger.Warn(errCacheNotReady) - contracts, err = c.b.Contracts(ctx, api.ContractsOpts{}) + hosts, err = c.b.UsableHosts(ctx, api.UsableHostOptions{}) return } // fetch from bus if it's not cached or expired - value, found, expired := c.cache.Get(cacheKeyDownloadContracts) + value, found, expired := c.cache.Get(cacheKeyUsableHosts) if !found || expired { - contracts, err = c.b.Contracts(ctx, api.ContractsOpts{}) + hosts, err = c.b.UsableHosts(ctx, api.UsableHostOptions{}) if err == nil { - c.cache.Set(cacheKeyDownloadContracts, contracts) + c.cache.Set(cacheKeyUsableHosts, hosts) } - if expired && !contractsEqual(value.([]api.ContractMetadata), contracts) { - c.logger.Warn(fmt.Errorf("%w: key %v", errCacheOutdated, cacheKeyDownloadContracts)) + if expired && !hostsEqual(value.([]api.HostInfo), hosts) { + c.logger.Warn(fmt.Errorf("%w: key %v", errCacheOutdated, cacheKeyUsableHosts)) } return } - return value.([]api.ContractMetadata), nil + return value.([]api.HostInfo), nil } func (c *cache) GougingParams(ctx context.Context) (gp api.GougingParams, err error) { @@ -178,21 +178,14 @@ func (c *cache) HandleEvent(event webhooks.Event) (err error) { case api.EventConsensusUpdate: log = log.With("bh", e.BlockHeight, "ts", e.Timestamp) c.handleConsensusUpdate(e) + case api.EventSettingUpdate: + log = log.With("gouging", e.GougingSettings != nil, "pinned", e.PinnedSettings != nil, "upload", e.UploadSettings != nil, "ts", e.Timestamp) + c.handleSettingUpdate(e) case api.EventContractAdd: - log = log.With("fcid", e.Added.ID, "ts", e.Timestamp) - c.handleContractAdd(e) case api.EventContractArchive: - log = log.With("fcid", e.ContractID, "ts", e.Timestamp) - c.handleContractArchive(e) case api.EventContractRenew: - log = log.With("fcid", e.Renewal.ID, "renewedFrom", e.Renewal.RenewedFrom, "ts", e.Timestamp) - c.handleContractRenew(e) case api.EventHostUpdate: - log = log.With("hk", e.HostKey, "ts", e.Timestamp) - c.handleHostUpdate(e) - case api.EventSettingUpdate: - log = log.With("gouging", e.GougingSettings != nil, "pinned", e.PinnedSettings != nil, "upload", e.UploadSettings != nil, "ts", e.Timestamp) - c.handleSettingUpdate(e) + c.cache.Invalidate(cacheKeyUsableHosts) default: log.Info("unhandled event", e) return @@ -243,79 +236,6 @@ func (c *cache) handleConsensusUpdate(event api.EventConsensusUpdate) { c.cache.Set(cacheKeyGougingParams, gp) } -func (c *cache) handleContractAdd(event api.EventContractAdd) { - // return early if the cache doesn't have contracts - value, found, _ := c.cache.Get(cacheKeyDownloadContracts) - if !found { - return - } - contracts := value.([]api.ContractMetadata) - - // add the contract to the cache - for _, contract := range contracts { - if contract.ID == event.Added.ID { - return - } - } - contracts = append(contracts, event.Added) - c.cache.Set(cacheKeyDownloadContracts, contracts) -} - -func (c *cache) handleContractArchive(event api.EventContractArchive) { - // return early if the cache doesn't have contracts - value, found, _ := c.cache.Get(cacheKeyDownloadContracts) - if !found { - return - } - contracts := value.([]api.ContractMetadata) - - // remove the contract from the cache - for i, contract := range contracts { - if contract.ID == event.ContractID { - contracts = append(contracts[:i], contracts[i+1:]...) - break - } - } - c.cache.Set(cacheKeyDownloadContracts, contracts) -} - -func (c *cache) handleContractRenew(event api.EventContractRenew) { - // return early if the cache doesn't have contracts - value, found, _ := c.cache.Get(cacheKeyDownloadContracts) - if !found { - return - } - contracts := value.([]api.ContractMetadata) - - // update the renewed contract in the cache - for i, contract := range contracts { - if contract.ID == event.Renewal.RenewedFrom { - contracts[i] = event.Renewal - break - } - } - - c.cache.Set(cacheKeyDownloadContracts, contracts) -} - -func (c *cache) handleHostUpdate(e api.EventHostUpdate) { - // return early if the cache doesn't have contracts - value, found, _ := c.cache.Get(cacheKeyDownloadContracts) - if !found { - return - } - contracts := value.([]api.ContractMetadata) - - // update the host's IP in the cache - for i, contract := range contracts { - if contract.HostKey == e.HostKey { - contracts[i].HostIP = e.NetAddr - } - } - - c.cache.Set(cacheKeyDownloadContracts, contracts) -} - func (c *cache) handleSettingUpdate(e api.EventSettingUpdate) { // return early if the cache doesn't have gouging params to update value, found, _ := c.cache.Get(cacheKeyGougingParams) @@ -334,14 +254,14 @@ func (c *cache) handleSettingUpdate(e api.EventSettingUpdate) { c.cache.Set(cacheKeyGougingParams, gp) } -func contractsEqual(x, y []api.ContractMetadata) bool { +func hostsEqual(x, y []api.HostInfo) bool { if len(x) != len(y) { return false } - sort.Slice(x, func(i, j int) bool { return x[i].ID.String() < x[j].ID.String() }) - sort.Slice(y, func(i, j int) bool { return y[i].ID.String() < y[j].ID.String() }) + sort.Slice(x, func(i, j int) bool { return x[i].PublicKey.String() < x[j].PublicKey.String() }) + sort.Slice(y, func(i, j int) bool { return y[i].PublicKey.String() < y[j].PublicKey.String() }) for i, c := range x { - if c.ID.String() != y[i].ID.String() { + if c.PublicKey.String() != y[i].PublicKey.String() || c.SiamuxAddr != y[i].SiamuxAddr { return false } } diff --git a/internal/worker/cache_test.go b/internal/worker/cache_test.go index 2c2bc5d21..3b408408b 100644 --- a/internal/worker/cache_test.go +++ b/internal/worker/cache_test.go @@ -20,13 +20,17 @@ type mockBus struct { gougingParams api.GougingParams } -func (m *mockBus) Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) { - return m.contracts, nil -} func (m *mockBus) GougingParams(ctx context.Context) (api.GougingParams, error) { return m.gougingParams, nil } +func (m *mockBus) UsableHosts(ctx context.Context, opts api.UsableHostOptions) (hosts []api.HostInfo, _ error) { + for _, c := range m.contracts { + hosts = append(hosts, c.HostInfo()) + } + return +} + type mockEventSubscriber struct { readyChan chan struct{} } @@ -78,11 +82,11 @@ func TestWorkerCache(t *testing.T) { c.Subscribe(m) // assert using cache before it's ready prints a warning - contracts, err := c.DownloadContracts(context.Background()) + hosts, err := c.UsableHosts(context.Background()) if err != nil { t.Fatal(err) - } else if len(contracts) != 3 { - t.Fatal("expected 3 contracts, got", len(contracts)) + } else if len(hosts) != 3 { + t.Fatal("expected 3 hosts, got", len(hosts)) } gp, err := c.GougingParams(context.Background()) if err != nil { @@ -105,8 +109,8 @@ func TestWorkerCache(t *testing.T) { // close the ready channel close(m.readyChan) - // fetch contracts & gouging params so they're cached - _, err = c.DownloadContracts(context.Background()) + // fetch usable hosts & gouging params so they're cached + _, err = c.UsableHosts(context.Background()) if err != nil { t.Fatal(err) } @@ -117,25 +121,25 @@ func TestWorkerCache(t *testing.T) { // update bus contracts & expire cache entry manually b.contracts = append(b.contracts, testContractMetadata(4)) - contracts, err = c.DownloadContracts(context.Background()) + hosts, err = c.UsableHosts(context.Background()) if err != nil { t.Fatal(err) - } else if len(contracts) != 3 { - t.Fatal("expected 3 contracts, got", len(contracts)) + } else if len(hosts) != 3 { + t.Fatal("expected 3 hosts, got", len(hosts)) } mc.mu.Lock() - mc.items[cacheKeyDownloadContracts].expiry = time.Now().Add(-1 * time.Minute) + mc.items[cacheKeyUsableHosts].expiry = time.Now().Add(-1 * time.Minute) mc.mu.Unlock() - // fetch contracts again, assert we have 4 now and we printed a warning to indicate the cache entry was invalid - contracts, err = c.DownloadContracts(context.Background()) + // fetch hosts again, assert we have 4 now and we printed a warning to indicate the cache entry was invalid + hosts, err = c.UsableHosts(context.Background()) if err != nil { t.Fatal(err) - } else if len(contracts) != 4 { - t.Fatal("expected 4 contracts, got", len(contracts)) + } else if len(hosts) != 4 { + t.Fatal("expected 4 hosts, got", len(hosts)) } else if logs := observedLogs.FilterLevelExact(zap.WarnLevel); logs.Len() != 1 { t.Fatal("expected 1 warning, got", logs.Len(), logs.All()) - } else if lines := observedLogs.TakeAll(); !strings.Contains(lines[0].Message, errCacheOutdated.Error()) || !strings.Contains(lines[0].Message, cacheKeyDownloadContracts) { + } else if lines := observedLogs.TakeAll(); !strings.Contains(lines[0].Message, errCacheOutdated.Error()) || !strings.Contains(lines[0].Message, cacheKeyUsableHosts) { t.Fatal("expected error message to contain 'cache is outdated', got", lines[0].Message) } diff --git a/stores/hostdb.go b/stores/hostdb.go index 606b66930..cdadc0ce2 100644 --- a/stores/hostdb.go +++ b/stores/hostdb.go @@ -117,9 +117,9 @@ func (s *SQLStore) RecordPriceTables(ctx context.Context, priceTableUpdate []api }) } -func (s *SQLStore) UsableHosts(ctx context.Context, offset, limit int) (hosts []api.HostInfo, err error) { +func (s *SQLStore) UsableHosts(ctx context.Context, minWindowStart uint64, offset, limit int) (hosts []api.HostInfo, err error) { err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { - hosts, err = tx.UsableHosts(ctx, offset, limit) + hosts, err = tx.UsableHosts(ctx, minWindowStart, offset, limit) return err }) return diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index 94a5318f5..512aaba15 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -511,7 +511,7 @@ func TestUsableHosts(t *testing.T) { } // assert h1 and h2 are usable and ordered by score - hosts, err := ss.UsableHosts(ctx, 0, -1) + hosts, err := ss.UsableHosts(ctx, 0, 0, -1) if err != nil { t.Fatal(err) } else if len(hosts) != 2 { @@ -523,7 +523,7 @@ func TestUsableHosts(t *testing.T) { } // assert offset and limit - hosts, err = ss.UsableHosts(ctx, 1, 1) + hosts, err = ss.UsableHosts(ctx, 0, 1, 1) if err != nil { t.Fatal(err) } else if len(hosts) != 1 { @@ -531,7 +531,21 @@ func TestUsableHosts(t *testing.T) { } else if hosts[0].PublicKey != hks[0] { t.Fatal("unexpected", hosts) } - hosts, err = ss.UsableHosts(ctx, 2, 1) + hosts, err = ss.UsableHosts(ctx, 0, 2, 1) + if err != nil { + t.Fatal(err) + } else if len(hosts) != 0 { + t.Fatal("unexpected", len(hosts)) + } + + // assert minWindowStart is taken into account + hosts, err = ss.UsableHosts(ctx, 9, 0, -1) + if err != nil { + t.Fatal(err) + } else if len(hosts) != 2 { + t.Fatal("unexpected", len(hosts)) + } + hosts, err = ss.UsableHosts(ctx, 10, 0, -1) if err != nil { t.Fatal(err) } else if len(hosts) != 0 { diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 0bac13260..6d03b85b4 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -660,6 +660,7 @@ func newTestContract(fcid types.FileContractID, hk types.PublicKey) api.Contract State: api.ContractStatePending, ContractPrice: types.NewCurrency64(1), InitialRenterFunds: types.NewCurrency64(2), + WindowStart: 10, } } @@ -840,6 +841,7 @@ func TestSQLMetadataStore(t *testing.T) { State: api.ContractStatePending, ContractPrice: types.NewCurrency64(1), InitialRenterFunds: types.NewCurrency64(2), + WindowStart: 10, } expectedObjSlab2 := object.Slab{ @@ -863,6 +865,7 @@ func TestSQLMetadataStore(t *testing.T) { State: api.ContractStatePending, ContractPrice: types.NewCurrency64(1), InitialRenterFunds: types.NewCurrency64(2), + WindowStart: 10, } // compare slabs diff --git a/stores/sql/database.go b/stores/sql/database.go index d223b94ef..cabebd5cd 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -370,7 +370,7 @@ type ( // UsableHosts returns a list of hosts that are ready to be used. That // means they are deemed usable by the autopilot, they are not gouging, // not blocked, not offline, etc. - UsableHosts(ctx context.Context, offset, limit int) ([]api.HostInfo, error) + UsableHosts(ctx context.Context, minWindowStart uint64, offset, limit int) ([]api.HostInfo, error) // WalletEvents returns all wallet events in the database. WalletEvents(ctx context.Context, offset, limit int) ([]wallet.Event, error) diff --git a/stores/sql/main.go b/stores/sql/main.go index 8abf06779..dba798755 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -2159,7 +2159,7 @@ func UnspentSiacoinElements(ctx context.Context, tx sql.Tx) (elements []types.Si return } -func UsableHosts(ctx context.Context, tx sql.Tx, offset, limit int) ([]api.HostInfo, error) { +func UsableHosts(ctx context.Context, tx sql.Tx, minWindowStart uint64, offset, limit int) ([]api.HostInfo, error) { // handle input parameters if offset < 0 { return nil, ErrNegativeOffset @@ -2167,8 +2167,11 @@ func UsableHosts(ctx context.Context, tx sql.Tx, offset, limit int) ([]api.HostI limit = math.MaxInt64 } - // only include allowed hosts + // only include contracts with a window start greater than the given value var whereExprs []string + whereExprs = append(whereExprs, "c.window_start > ?") + + // only include allowed hosts var hasAllowlist bool if err := tx.QueryRow(ctx, "SELECT EXISTS (SELECT 1 FROM host_allowlist_entries)").Scan(&hasAllowlist); err != nil { return nil, fmt.Errorf("failed to check for allowlist: %w", err) @@ -2212,14 +2215,15 @@ EXISTS ( SELECT h.public_key, COALESCE(h.net_address, ""), - COALESCE(h.settings->>'$.siamuxport', "") AS siamux_port + COALESCE(h.settings->>'$.siamuxport', "") AS siamux_port, + MAX(c.fcid) FROM hosts h INNER JOIN contracts c on c.host_id = h.id and c.archival_reason IS NULL INNER JOIN host_checks hc on hc.db_host_id = h.id and hc.db_autopilot_id = ? WHERE %s GROUP by h.id ORDER BY MAX(hc.score_age) * MAX(hc.score_collateral) * MAX(hc.score_interactions) * MAX(hc.score_storage_remaining) * MAX(hc.score_uptime) * MAX(hc.score_version) * MAX(hc.score_prices) DESC - LIMIT ? OFFSET ?`, strings.Join(whereExprs, "AND")), autopilotID, autopilotID, limit, offset) + LIMIT ? OFFSET ?`, strings.Join(whereExprs, " AND ")), autopilotID, minWindowStart, autopilotID, limit, offset) if err != nil { return nil, fmt.Errorf("failed to fetch hosts: %w", err) } @@ -2229,7 +2233,8 @@ EXISTS ( for rows.Next() { var hk PublicKey var addr, port string - err := rows.Scan(&hk, &addr, &port) + var fcid FileContractID + err := rows.Scan(&hk, &addr, &port, &fcid) if err != nil { return nil, fmt.Errorf("failed to scan host: %w", err) } @@ -2238,6 +2243,7 @@ EXISTS ( continue } hosts = append(hosts, api.HostInfo{ + ContractID: types.FileContractID(fcid), PublicKey: types.PublicKey(hk), SiamuxAddr: net.JoinHostPort(host, port), }) diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index db18795b3..f1e7408a6 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -1242,8 +1242,8 @@ func (tx *MainDatabaseTx) UpdateSlabHealth(ctx context.Context, limit int64, min return res.RowsAffected() } -func (tx *MainDatabaseTx) UsableHosts(ctx context.Context, offset, limit int) ([]api.HostInfo, error) { - return ssql.UsableHosts(ctx, tx, offset, limit) +func (tx *MainDatabaseTx) UsableHosts(ctx context.Context, minWindowStart uint64, offset, limit int) ([]api.HostInfo, error) { + return ssql.UsableHosts(ctx, tx, minWindowStart, offset, limit) } func (tx *MainDatabaseTx) WalletEvents(ctx context.Context, offset, limit int) ([]wallet.Event, error) { diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 877a48648..8cdcee7ad 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -1242,8 +1242,8 @@ func (tx *MainDatabaseTx) UpdateSlabHealth(ctx context.Context, limit int64, min return res.RowsAffected() } -func (tx *MainDatabaseTx) UsableHosts(ctx context.Context, offset, limit int) ([]api.HostInfo, error) { - return ssql.UsableHosts(ctx, tx, offset, limit) +func (tx *MainDatabaseTx) UsableHosts(ctx context.Context, minWindowStart uint64, offset, limit int) ([]api.HostInfo, error) { + return ssql.UsableHosts(ctx, tx, minWindowStart, offset, limit) } func (tx *MainDatabaseTx) WalletEvents(ctx context.Context, offset, limit int) ([]wallet.Event, error) { diff --git a/worker/bench_test.go b/worker/bench_test.go index 3cde1792a..860164a8f 100644 --- a/worker/bench_test.go +++ b/worker/bench_test.go @@ -45,7 +45,7 @@ func BenchmarkDownloaderSingleObject(b *testing.B) { b.SetBytes(o.Size) b.ResetTimer() for i := 0; i < b.N; i++ { - err = w.downloadManager.DownloadObject(context.Background(), io.Discard, *o.Object, 0, uint64(o.Size), w.Contracts()) + err = w.downloadManager.DownloadObject(context.Background(), io.Discard, *o.Object, 0, uint64(o.Size), w.UsableHosts()) if err != nil { b.Fatal(err) } diff --git a/worker/download.go b/worker/download.go index ca8d9a213..cd02c5979 100644 --- a/worker/download.go +++ b/worker/download.go @@ -168,7 +168,7 @@ func newDownloadManager(ctx context.Context, uploadKey *utils.UploadKey, hm Host } } -func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o object.Object, offset, length uint64, contracts []api.ContractMetadata) (err error) { +func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o object.Object, offset, length uint64, hosts []api.HostInfo) (err error) { // calculate what slabs we need var ss []slabSlice for _, s := range o.Slabs { @@ -200,12 +200,12 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o } // refresh the downloaders - mgr.refreshDownloaders(contracts) + mgr.refreshDownloaders(hosts) - // build a map to count available shards later - hosts := make(map[types.PublicKey]struct{}) - for _, c := range contracts { - hosts[c.HostKey] = struct{}{} + // map available hosts + available := make(map[types.PublicKey]struct{}) + for _, h := range hosts { + available[h.PublicKey] = struct{}{} } // create the cipher writer @@ -263,14 +263,14 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o } // check if we have enough downloaders - var available uint8 + var numAvailable uint8 for _, s := range next.Shards { - if isSectorAvailable(s, hosts) { - available++ + if isSectorAvailable(s, available) { + numAvailable++ } } - if available < next.MinShards { - responseChan <- &slabDownloadResponse{err: fmt.Errorf("%w: %v/%v", errDownloadNotEnoughHosts, available, next.MinShards)} + if numAvailable < next.MinShards { + responseChan <- &slabDownloadResponse{err: fmt.Errorf("%w: %v/%v", errDownloadNotEnoughHosts, numAvailable, next.MinShards)} return } @@ -374,14 +374,14 @@ outer: return nil } -func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata) ([][]byte, bool, error) { +func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, hosts []api.HostInfo) ([][]byte, bool, error) { // refresh the downloaders - mgr.refreshDownloaders(contracts) + mgr.refreshDownloaders(hosts) - // grab available hosts + // map available hosts available := make(map[types.PublicKey]struct{}) - for _, c := range contracts { - available[c.HostKey] = struct{}{} + for _, h := range hosts { + available[h.PublicKey] = struct{}{} } // count how many shards we can download (best-case) @@ -487,14 +487,14 @@ func (mgr *downloadManager) fetchPartialSlab(ctx context.Context, key object.Enc return data, nil, nil } -func (mgr *downloadManager) refreshDownloaders(contracts []api.ContractMetadata) { +func (mgr *downloadManager) refreshDownloaders(hosts []api.HostInfo) { mgr.mu.Lock() defer mgr.mu.Unlock() // build map - want := make(map[types.PublicKey]api.ContractMetadata) - for _, c := range contracts { - want[c.HostKey] = c + want := make(map[types.PublicKey]string) + for _, h := range hosts { + want[h.PublicKey] = h.SiamuxAddr } // prune downloaders @@ -510,12 +510,9 @@ func (mgr *downloadManager) refreshDownloaders(contracts []api.ContractMetadata) } // update downloaders - for _, c := range want { - // create a host - host := mgr.hm.Host(c.HostKey, c.ID, c.SiamuxAddr) - downloader := newDownloader(mgr.shutdownCtx, host) - mgr.downloaders[c.HostKey] = downloader - go downloader.processQueue() + for hk, siamuxAddr := range want { + mgr.downloaders[hk] = newDownloader(mgr.shutdownCtx, mgr.hm.Downloader(hk, siamuxAddr)) + go mgr.downloaders[hk].processQueue() } } diff --git a/worker/downloader.go b/worker/downloader.go index 9720237a1..00403da32 100644 --- a/worker/downloader.go +++ b/worker/downloader.go @@ -25,7 +25,7 @@ var ( type ( downloader struct { - host Host + host SectorDownloader statsDownloadSpeedBytesPerMS *utils.DataPoints // keep track of this separately for stats (no decay is applied) statsSectorDownloadEstimateInMS *utils.DataPoints @@ -41,9 +41,9 @@ type ( } ) -func newDownloader(ctx context.Context, host Host) *downloader { +func newDownloader(ctx context.Context, h SectorDownloader) *downloader { return &downloader{ - host: host, + host: h, statsSectorDownloadEstimateInMS: utils.NewDataPoints(10 * time.Minute), statsDownloadSpeedBytesPerMS: utils.NewDataPoints(0), diff --git a/worker/downloader_test.go b/worker/downloader_test.go index 8097b8304..ca004582b 100644 --- a/worker/downloader_test.go +++ b/worker/downloader_test.go @@ -14,7 +14,7 @@ func TestDownloaderStopped(t *testing.T) { dm := w.downloadManager h := hosts[0] - dm.refreshDownloaders(w.Contracts()) + dm.refreshDownloaders(w.UsableHosts()) dl := w.downloadManager.downloaders[h.PublicKey()] dl.Stop() diff --git a/worker/host.go b/worker/host.go index 2ecd95233..18db3c3cc 100644 --- a/worker/host.go +++ b/worker/host.go @@ -17,10 +17,14 @@ import ( ) type ( + SectorDownloader interface { + DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error + PublicKey() types.PublicKey + } + Host interface { PublicKey() types.PublicKey - DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error PriceTable(ctx context.Context, rev *types.FileContractRevision) (api.HostPriceTable, types.Currency, error) @@ -32,11 +36,21 @@ type ( } HostManager interface { - Host(hk types.PublicKey, fcid types.FileContractID, siamuxAddr string) Host + Downloader(hk types.PublicKey, siamuxAddr string) SectorDownloader + Host(hi api.HostInfo) Host } ) type ( + sectorDownloader struct { + hk types.PublicKey + siamuxAddr string + + acc *worker.Account + pts *priceTables + rhp3 *rhp3.Client + } + host struct { hk types.PublicKey renterKey types.PrivateKey @@ -57,50 +71,34 @@ var ( _ HostManager = (*Worker)(nil) ) -func (w *Worker) Host(hk types.PublicKey, fcid types.FileContractID, siamuxAddr string) Host { +func (w *Worker) Downloader(hk types.PublicKey, siamuxAddr string) SectorDownloader { + return §orDownloader{ + hk: hk, + siamuxAddr: siamuxAddr, + + acc: w.accounts.ForHost(hk), + pts: w.priceTables, + rhp3: w.rhp3Client, + } +} + +func (w *Worker) Host(hi api.HostInfo) Host { return &host{ client: w.rhp3Client, - hk: hk, - acc: w.accounts.ForHost(hk), + hk: hi.PublicKey, + acc: w.accounts.ForHost(hi.PublicKey), bus: w.bus, contractSpendingRecorder: w.contractSpendingRecorder, - logger: w.logger.Named(hk.String()[:4]), - fcid: fcid, - siamuxAddr: siamuxAddr, - renterKey: w.deriveRenterKey(hk), + logger: w.logger.Named(hi.PublicKey.String()[:4]), + fcid: hi.ContractID, + siamuxAddr: hi.SiamuxAddr, + renterKey: w.deriveRenterKey(hi.PublicKey), priceTables: w.priceTables, } } func (h *host) PublicKey() types.PublicKey { return h.hk } -func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) (err error) { - var amount types.Currency - return h.acc.WithWithdrawal(func() (types.Currency, error) { - pt, uptc, err := h.priceTables.fetch(ctx, h.hk, nil) - if err != nil { - return types.ZeroCurrency, err - } - hpt := pt.HostPriceTable - amount = uptc - - // check for download gouging specifically - gc, err := GougingCheckerFromContext(ctx, overpay) - if err != nil { - return amount, err - } - if breakdown := gc.Check(nil, &hpt); breakdown.DownloadErr != "" { - return amount, fmt.Errorf("%w: %v", gouging.ErrPriceTableGouging, breakdown.DownloadErr) - } - - cost, err := h.client.ReadSector(ctx, offset, length, root, w, h.hk, h.siamuxAddr, h.acc.ID(), h.acc.Key(), hpt) - if err != nil { - return amount, err - } - return amount.Add(cost), nil - }) -} - func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error { // fetch price table var pt rhpv3.HostPriceTable @@ -242,3 +240,20 @@ func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision) } return pt.HostPriceTable, cost, nil } + +func (d *sectorDownloader) DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) (err error) { + return d.acc.WithWithdrawal(func() (types.Currency, error) { + pt, ptc, err := d.pts.fetch(ctx, d.hk, nil) + if err != nil { + return types.ZeroCurrency, err + } + + cost, err := d.rhp3.ReadSector(ctx, offset, length, root, w, d.hk, d.siamuxAddr, d.acc.ID(), d.acc.Key(), pt.HostPriceTable) + if err != nil { + return ptc, err + } + return ptc.Add(cost), nil + }) +} + +func (d *sectorDownloader) PublicKey() types.PublicKey { return d.hk } diff --git a/worker/host_test.go b/worker/host_test.go index f6ea236cd..1805b756a 100644 --- a/worker/host_test.go +++ b/worker/host_test.go @@ -38,7 +38,7 @@ func newTestHostManager(t test.TestingCommon) *testHostManager { return &testHostManager{tt: test.NewTT(t), hosts: make(map[types.PublicKey]*testHost)} } -func (hm *testHostManager) Host(hk types.PublicKey, fcid types.FileContractID, siamuxAddr string) Host { +func (hm *testHostManager) Downloader(hk types.PublicKey, siamuxAddr string) SectorDownloader { hm.mu.Lock() defer hm.mu.Unlock() @@ -48,6 +48,16 @@ func (hm *testHostManager) Host(hk types.PublicKey, fcid types.FileContractID, s return hm.hosts[hk] } +func (hm *testHostManager) Host(hi api.HostInfo) Host { + hm.mu.Lock() + defer hm.mu.Unlock() + + if _, ok := hm.hosts[hi.PublicKey]; !ok { + hm.tt.Fatal("host not found") + } + return hm.hosts[hi.PublicKey] +} + func (hm *testHostManager) addHost(h *testHost) { hm.mu.Lock() defer hm.mu.Unlock() diff --git a/worker/migrations.go b/worker/migrations.go index 13b31aa52..7756c1766 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -11,44 +11,39 @@ import ( "go.uber.org/zap" ) -func (w *Worker) migrate(ctx context.Context, s object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) error { - // make a map of good hosts - goodHosts := make(map[types.PublicKey]map[types.FileContractID]bool) - for _, c := range ulContracts { - if goodHosts[c.HostKey] == nil { - goodHosts[c.HostKey] = make(map[types.FileContractID]bool) - } - goodHosts[c.HostKey][c.ID] = false +func (w *Worker) migrate(ctx context.Context, s object.Slab, contractSet string, dlHosts []api.HostInfo, ulContracts []api.ContractMetadata, bh uint64) error { + // map usable hosts + usableHosts := make(map[types.PublicKey]struct{}) + for _, h := range dlHosts { + usableHosts[h.PublicKey] = struct{}{} } - // make a map of hosts we can download from - h2c := make(map[types.PublicKey]types.FileContractID) - for _, c := range append(dlContracts, ulContracts...) { - h2c[c.HostKey] = c.ID + // map usable contracts + usableContracts := make(map[types.FileContractID]struct{}) + for _, c := range ulContracts { + usableContracts[c.ID] = struct{}{} } // collect indices of shards that need to be migrated - usedMap := make(map[types.PublicKey]struct{}) + seen := make(map[types.PublicKey]struct{}) var shardIndices []int SHARDS: for i, shard := range s.Shards { for hk, fcids := range shard.Contracts { for _, fcid := range fcids { // bad host - if _, exists := goodHosts[hk]; !exists { + if _, ok := usableHosts[hk]; !ok { continue } // bad contract - if _, exists := goodHosts[hk][fcid]; !exists { + if _, ok := usableContracts[fcid]; !ok { continue } // reused host - _, exists := usedMap[hk] - if exists { + if _, used := seen[hk]; used { continue } - goodHosts[hk][fcid] = true - usedMap[hk] = struct{}{} + seen[hk] = struct{}{} continue SHARDS } } @@ -65,7 +60,7 @@ SHARDS: // which we have a contract (so hosts from which we can download) missingShards := len(shardIndices) for _, si := range shardIndices { - if _, exists := h2c[s.Shards[si].LatestHost]; exists { + if _, exists := usableHosts[s.Shards[si].LatestHost]; exists { missingShards-- continue } @@ -87,7 +82,7 @@ SHARDS: defer mem.Release() // download the slab - shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, s, dlContracts) + shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, s, dlHosts) if err != nil { w.logger.Debugw("slab migration failed", zap.Error(err), @@ -107,10 +102,10 @@ SHARDS: // filter upload contracts to the ones we haven't used yet var allowed []api.ContractMetadata - for c := range ulContracts { - if _, exists := usedMap[ulContracts[c].HostKey]; !exists { - allowed = append(allowed, ulContracts[c]) - usedMap[ulContracts[c].HostKey] = struct{}{} + for _, c := range ulContracts { + if _, used := seen[c.HostKey]; !used { + allowed = append(allowed, c) + seen[c.HostKey] = struct{}{} } } diff --git a/worker/mocks_test.go b/worker/mocks_test.go index d93e18c9e..e96536d6e 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math" + "net" "sync" "time" @@ -271,6 +272,25 @@ func (hs *hostStoreMock) Host(ctx context.Context, hostKey types.PublicKey) (api return h.hi, nil } +func (hs *hostStoreMock) UsableHosts(ctx context.Context, opts api.UsableHostOptions) (hosts []api.HostInfo, _ error) { + hs.mu.Lock() + defer hs.mu.Unlock() + + for _, h := range hs.hosts { + host, _, err := net.SplitHostPort(h.hi.NetAddress) + if err != nil || host == "" { + continue + } + + hosts = append(hosts, api.HostInfo{ + PublicKey: h.hk, + SiamuxAddr: net.JoinHostPort(host, h.hi.Settings.SiaMuxPort), + }) + } + + return +} + func (hs *hostStoreMock) RecordHostScans(ctx context.Context, scans []api.HostScan) error { return nil } diff --git a/worker/pricetables.go b/worker/pricetables.go index fc3901f67..0bf3e3c9b 100644 --- a/worker/pricetables.go +++ b/worker/pricetables.go @@ -171,7 +171,7 @@ func (p *priceTable) fetch(ctx context.Context, rev *types.FileContractRevision) } // otherwise fetch it - h := p.hm.Host(p.hk, types.FileContractID{}, host.Settings.SiamuxAddr()) + h := p.hm.Host(api.HostInfo{PublicKey: p.hk, SiamuxAddr: host.Settings.SiamuxAddr()}) hpt, cost, err = h.PriceTable(ctx, rev) // record it in the background diff --git a/worker/upload.go b/worker/upload.go index 8fdfefe2e..2b07963ea 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -347,7 +347,7 @@ func (mgr *uploadManager) newUploader(os ObjectStore, cl ContractLocker, cs Cont statsSectorUploadSpeedBytesPerMS: utils.NewDataPoints(0), // covered by mutex - host: hm.Host(c.HostKey, c.ID, c.SiamuxAddr), + host: hm.Host(c.HostInfo()), fcid: c.ID, endHeight: c.WindowEnd, queue: make([]*sectorUploadReq, 0), diff --git a/worker/upload_test.go b/worker/upload_test.go index fd9a68251..da6cdea20 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -60,7 +60,7 @@ func TestUpload(t *testing.T) { // download the data and assert it matches var buf bytes.Buffer - err = dl.DownloadObject(context.Background(), &buf, *o.Object, 0, uint64(o.Size), w.Contracts()) + err = dl.DownloadObject(context.Background(), &buf, *o.Object, 0, uint64(o.Size), w.UsableHosts()) if err != nil { t.Fatal(err) } else if !bytes.Equal(data, buf.Bytes()) { @@ -69,17 +69,17 @@ func TestUpload(t *testing.T) { // filter contracts to have (at most) min shards used contracts var n int - var filtered []api.ContractMetadata + var filtered []api.HostInfo for _, md := range w.Contracts() { // add unused contracts if _, used := used[md.HostKey]; !used { - filtered = append(filtered, md) + filtered = append(filtered, md.HostInfo()) continue } // add min shards used contracts if n < int(params.rs.MinShards) { - filtered = append(filtered, md) + filtered = append(filtered, md.HostInfo()) n++ } } @@ -94,8 +94,8 @@ func TestUpload(t *testing.T) { } // filter out one contract - expect download to fail - for i, md := range filtered { - if _, used := used[md.HostKey]; used { + for i, h := range filtered { + if _, used := used[h.PublicKey]; used { filtered = append(filtered[:i], filtered[i+1:]...) break } @@ -165,7 +165,7 @@ func TestUploadPackedSlab(t *testing.T) { // download the data and assert it matches var buf bytes.Buffer - err = dl.DownloadObject(context.Background(), &buf, *o.Object, 0, uint64(o.Size), w.Contracts()) + err = dl.DownloadObject(context.Background(), &buf, *o.Object, 0, uint64(o.Size), w.UsableHosts()) if err != nil { t.Fatal(err) } else if !bytes.Equal(data, buf.Bytes()) { @@ -201,7 +201,7 @@ func TestUploadPackedSlab(t *testing.T) { // download the data again and assert it matches buf.Reset() - err = dl.DownloadObject(context.Background(), &buf, *o.Object, 0, uint64(o.Size), w.Contracts()) + err = dl.DownloadObject(context.Background(), &buf, *o.Object, 0, uint64(o.Size), w.UsableHosts()) if err != nil { t.Fatal(err) } else if !bytes.Equal(data, buf.Bytes()) { @@ -314,7 +314,7 @@ func TestMigrateLostSector(t *testing.T) { } // download the slab - shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) + shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.UsableHosts()) if err != nil { t.Fatal(err) } @@ -417,7 +417,7 @@ func TestUploadShards(t *testing.T) { } // download the slab - shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) + shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.UsableHosts()) if err != nil { t.Fatal(err) } @@ -470,16 +470,16 @@ func TestUploadShards(t *testing.T) { } // create download contracts - contracts = contracts[:0] + var hosts []api.HostInfo for _, c := range w.Contracts() { if _, bad := badHosts[c.HostKey]; !bad { - contracts = append(contracts, c) + hosts = append(hosts, c.HostInfo()) } } // download the data and assert it matches var buf bytes.Buffer - err = dl.DownloadObject(context.Background(), &buf, *o.Object, 0, uint64(o.Size), contracts) + err = dl.DownloadObject(context.Background(), &buf, *o.Object, 0, uint64(o.Size), hosts) if err != nil { t.Fatal(err) } else if !bytes.Equal(data, buf.Bytes()) { @@ -621,7 +621,7 @@ func TestUploadRegression(t *testing.T) { // download data for good measure var buf bytes.Buffer - err = dl.DownloadObject(context.Background(), &buf, *o.Object, 0, uint64(o.Size), w.Contracts()) + err = dl.DownloadObject(context.Background(), &buf, *o.Object, 0, uint64(o.Size), w.UsableHosts()) if err != nil { t.Fatal(err) } else if !bytes.Equal(data, buf.Bytes()) { diff --git a/worker/uploader.go b/worker/uploader.go index 410760b1a..30a298ff1 100644 --- a/worker/uploader.go +++ b/worker/uploader.go @@ -77,7 +77,7 @@ func (u *uploader) Refresh(c api.ContractMetadata) { u.mu.Lock() defer u.mu.Unlock() - u.host = u.hm.Host(c.HostKey, c.ID, c.SiamuxAddr) + u.host = u.hm.Host(c.HostInfo()) u.fcid = c.ID u.siamuxAddr = c.SiamuxAddr u.endHeight = c.WindowEnd diff --git a/worker/worker.go b/worker/worker.go index eece75ce7..0c7e6a776 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -101,6 +101,7 @@ type ( RecordContractSpending(ctx context.Context, records []api.ContractSpendingRecord) error Host(ctx context.Context, hostKey types.PublicKey) (api.Host, error) + UsableHosts(ctx context.Context, opts api.UsableHostOptions) ([]api.HostInfo, error) } ObjectStore interface { @@ -206,10 +207,9 @@ func (w *Worker) isStopped() bool { return false } -func (w *Worker) withRevision(ctx context.Context, fetchTimeout time.Duration, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string, lockPriority int, fn func(rev types.FileContractRevision) error) error { - return w.withContractLock(ctx, fcid, lockPriority, func() error { - h := w.Host(hk, fcid, siamuxAddr) - rev, err := h.FetchRevision(ctx, fetchTimeout) +func (w *Worker) withRevision(ctx context.Context, hi api.HostInfo, fetchTimeout time.Duration, lockPriority int, fn func(rev types.FileContractRevision) error) error { + return w.withContractLock(ctx, hi.ContractID, lockPriority, func() error { + rev, err := w.Host(hi).FetchRevision(ctx, fetchTimeout) if err != nil { return err } @@ -270,7 +270,7 @@ func (w *Worker) fetchContracts(ctx context.Context, metadatas []api.ContractMet worker := func() { for md := range reqs { var revision types.FileContractRevision - err := w.withRevision(ctx, timeout, md.ID, md.HostKey, md.SiamuxAddr, lockingPriorityActiveContractRevision, func(rev types.FileContractRevision) error { + err := w.withRevision(ctx, md.HostInfo(), timeout, lockingPriorityActiveContractRevision, func(rev types.FileContractRevision) error { revision = rev return nil }) @@ -343,7 +343,7 @@ func (w *Worker) rhpPriceTableHandler(jc jape.Context) { defer cancel() } - hpt, err = w.Host(rptr.HostKey, types.FileContractID{}, rptr.SiamuxAddr).PriceTableUnpaid(ctx) + hpt, err = w.Host(api.HostInfo{PublicKey: rptr.HostKey, SiamuxAddr: rptr.SiamuxAddr}).PriceTableUnpaid(ctx) if jc.Check("could not get price table", err) != nil { return } @@ -395,22 +395,20 @@ func (w *Worker) slabMigrateHandler(jc jape.Context) { // attach gouging checker to the context ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - // fetch all contracts - dlContracts, err := w.bus.Contracts(ctx, api.ContractsOpts{}) - if jc.Check("couldn't fetch contracts from bus", err) != nil { + // fetch hosts + dlHosts, err := w.bus.UsableHosts(ctx, api.UsableHostOptions{}) + if jc.Check("couldn't fetch hosts from bus", err) != nil { return } - // filter upload contracts - var ulContracts []api.ContractMetadata - for _, c := range dlContracts { - if c.InSet(up.ContractSet) { - ulContracts = append(ulContracts, c) - } + // fetch contracts + ulContracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) + if jc.Check("couldn't fetch contracts from bus", err) != nil { + return } // migrate the slab and handle alerts - err = w.migrate(ctx, slab, up.ContractSet, dlContracts, ulContracts, up.CurrentHeight) + err = w.migrate(ctx, slab, up.ContractSet, dlHosts, ulContracts, up.CurrentHeight) if err != nil && !utils.IsErr(err, api.ErrSlabNotFound) { var objects []api.ObjectMetadata if res, err := w.bus.Objects(ctx, "", api.ListObjectOptions{SlabEncryptionKey: slab.EncryptionKey}); err != nil { @@ -1156,9 +1154,9 @@ func (w *Worker) headObject(ctx context.Context, bucket, key string, onlyMetadat }, res, nil } -func (w *Worker) FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, desired types.Currency) error { +func (w *Worker) FundAccount(ctx context.Context, hi api.HostInfo, desired types.Currency) error { // calculate the deposit amount - acc := w.accounts.ForHost(hk) + acc := w.accounts.ForHost(hi.PublicKey) return acc.WithDeposit(func(balance types.Currency) (types.Currency, error) { // return early if we have the desired balance if balance.Cmp(desired) >= 0 { @@ -1168,7 +1166,7 @@ func (w *Worker) FundAccount(ctx context.Context, fcid types.FileContractID, hk // fund the account var err error - deposit, err = w.bus.FundAccount(ctx, acc.ID(), fcid, desired.Sub(balance)) + deposit, err = w.bus.FundAccount(ctx, acc.ID(), hi.ContractID, desired.Sub(balance)) if err != nil { if rhp3.IsBalanceMaxExceeded(err) { acc.ScheduleSync() @@ -1208,8 +1206,8 @@ func (w *Worker) GetObject(ctx context.Context, bucket, key string, opts api.Dow return nil, fmt.Errorf("couldn't fetch gouging parameters from bus: %w", err) } - // fetch all contracts - contracts, err := w.cache.DownloadContracts(ctx) + // fetch usable hosts + hosts, err := w.cache.UsableHosts(ctx) if err != nil { return nil, fmt.Errorf("couldn't fetch contracts from bus: %w", err) } @@ -1224,13 +1222,13 @@ func (w *Worker) GetObject(ctx context.Context, bucket, key string, opts api.Dow // otherwise return a pipe reader downloadFn := func(wr io.Writer, offset, length int64) error { ctx = WithGougingChecker(ctx, w.bus, gp) - err = w.downloadManager.DownloadObject(ctx, wr, obj, uint64(offset), uint64(length), contracts) + err = w.downloadManager.DownloadObject(ctx, wr, obj, uint64(offset), uint64(length), hosts) if err != nil { w.logger.Error(err) if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errDownloadCancelled) && !errors.Is(err, io.ErrClosedPipe) { - w.registerAlert(newDownloadFailedAlert(bucket, key, offset, length, int64(len(contracts)), err)) + w.registerAlert(newDownloadFailedAlert(bucket, key, offset, length, int64(len(hosts)), err)) } return fmt.Errorf("failed to download object: %w", err) } @@ -1255,7 +1253,7 @@ func (w *Worker) HeadObject(ctx context.Context, bucket, key string, opts api.He return res, err } -func (w *Worker) SyncAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string) error { +func (w *Worker) SyncAccount(ctx context.Context, hi api.HostInfo) error { // attach gouging checker gp, err := w.cache.GougingParams(ctx) if err != nil { @@ -1264,9 +1262,8 @@ func (w *Worker) SyncAccount(ctx context.Context, fcid types.FileContractID, hk ctx = WithGougingChecker(ctx, w.bus, gp) // sync the account - h := w.Host(hk, fcid, siamuxAddr) - err = w.withRevision(ctx, defaultRevisionFetchTimeout, fcid, hk, siamuxAddr, lockingPrioritySyncing, func(rev types.FileContractRevision) error { - return h.SyncAccount(ctx, &rev) + err = w.withRevision(ctx, hi, defaultRevisionFetchTimeout, lockingPrioritySyncing, func(rev types.FileContractRevision) error { + return w.Host(hi).SyncAccount(ctx, &rev) }) if err != nil { return fmt.Errorf("failed to sync account; %w", err) diff --git a/worker/worker_test.go b/worker/worker_test.go index 4472f64b5..eb94fd38f 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -130,6 +130,17 @@ func (w *testWorker) RenewContract(hk types.PublicKey) *contractMock { return renewal } +func (w *testWorker) UsableHosts() (hosts []api.HostInfo) { + metadatas, err := w.cs.Contracts(context.Background(), api.ContractsOpts{}) + if err != nil { + w.tt.Fatal(err) + } + for _, md := range metadatas { + hosts = append(hosts, md.HostInfo()) + } + return +} + func newTestWorkerCfg() config.Worker { return config.Worker{ AccountsRefillInterval: time.Second, From a14bf7adc6db5746893a13848474fdf0e809fc3c Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 1 Nov 2024 13:28:15 +0100 Subject: [PATCH 2/4] testing: fix integration tests --- internal/test/e2e/cluster.go | 3 +++ internal/worker/accounts.go | 2 -- internal/worker/cache.go | 3 +++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index 7f154798b..761c8c48b 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -717,6 +717,9 @@ func (c *TestCluster) sync() { func (c *TestCluster) WaitForAccounts() []api.Account { c.tt.Helper() + // mine a block (ensures worker cache gets invalidated) + c.MineBlocks(1) + // build hosts map hostsMap := make(map[types.PublicKey]struct{}) for _, host := range c.hosts { diff --git a/internal/worker/accounts.go b/internal/worker/accounts.go index 7f938ea8e..94eae4c20 100644 --- a/internal/worker/accounts.go +++ b/internal/worker/accounts.go @@ -313,8 +313,6 @@ func (a *AccountMgr) refillAccounts() { if err != nil { a.logger.Errorw(fmt.Sprintf("failed to fetch usable hosts: %v", err)) return - } else if len(hosts) == 0 { - return } // refill accounts in separate goroutines diff --git a/internal/worker/cache.go b/internal/worker/cache.go index 2ea434928..2de374bae 100644 --- a/internal/worker/cache.go +++ b/internal/worker/cache.go @@ -224,6 +224,9 @@ func (c *cache) isReady() bool { } func (c *cache) handleConsensusUpdate(event api.EventConsensusUpdate) { + // invalidate usable hosts cache + c.cache.Invalidate(cacheKeyUsableHosts) + // return early if the doesn't have gouging params to update value, found, _ := c.cache.Get(cacheKeyGougingParams) if !found { From c73e4bbff92b531034bd5f36f3862caf69931d0f Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 4 Nov 2024 13:54:21 +0100 Subject: [PATCH 3/4] worker: add ACL update webhook --- api/events.go | 24 ++++++++++++++++++++++++ bus/routes.go | 28 ++++++++++++++++++++++++++++ internal/test/e2e/cluster_test.go | 22 +++++++--------------- internal/test/e2e/events_test.go | 8 ++++++++ internal/worker/cache.go | 2 ++ internal/worker/events.go | 1 + internal/worker/events_test.go | 4 ++-- worker/worker.go | 2 +- 8 files changed, 73 insertions(+), 18 deletions(-) diff --git a/api/events.go b/api/events.go index 85fe857d9..ea237cade 100644 --- a/api/events.go +++ b/api/events.go @@ -11,6 +11,7 @@ import ( ) const ( + ModuleACL = "acl" ModuleConsensus = "consensus" ModuleContract = "contract" ModuleContractSet = "contract_set" @@ -28,6 +29,12 @@ var ( ) type ( + EventACLUpdate struct { + Allowlist []types.PublicKey `json:"allowlist"` + Blocklist []string `json:"blocklist"` + Timestamp time.Time `json:"timestamp"` + } + EventConsensusUpdate struct { ConsensusState TransactionFee types.Currency `json:"transactionFee"` @@ -73,6 +80,15 @@ type ( ) var ( + WebhookACLUpdate = func(url string, headers map[string]string) webhooks.Webhook { + return webhooks.Webhook{ + Event: EventUpdate, + Headers: headers, + Module: ModuleACL, + URL: url, + } + } + WebhookConsensusUpdate = func(url string, headers map[string]string) webhooks.Webhook { return webhooks.Webhook{ Event: EventUpdate, @@ -143,6 +159,14 @@ func ParseEventWebhook(event webhooks.Event) (interface{}, error) { return nil, err } switch event.Module { + case ModuleACL: + if event.Event == EventUpdate { + var e EventACLUpdate + if err := json.Unmarshal(bytes, &e); err != nil { + return nil, err + } + return e, nil + } case ModuleContract: switch event.Event { case EventAdd: diff --git a/bus/routes.go b/bus/routes.go index d03f3c98f..ee589dcea 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -700,6 +700,20 @@ func (b *Bus) hostsAllowlistHandlerPUT(jc jape.Context) { return } else if jc.Check("couldn't update allowlist entries", b.store.UpdateHostAllowlistEntries(ctx, req.Add, req.Remove, req.Clear)) != nil { return + } else { + if allowlist, err := b.store.HostAllowlist(ctx); jc.Check("couldn't fetch allowlist", err) == nil { + if blocklist, err := b.store.HostBlocklist(ctx); jc.Check("couldn't fetch blocklist", err) == nil { + b.broadcastAction(webhooks.Event{ + Module: api.ModuleACL, + Event: api.EventUpdate, + Payload: api.EventACLUpdate{ + Allowlist: allowlist, + Blocklist: blocklist, + Timestamp: time.Now().UTC(), + }, + }) + } + } } } } @@ -720,6 +734,20 @@ func (b *Bus) hostsBlocklistHandlerPUT(jc jape.Context) { return } else if jc.Check("couldn't update blocklist entries", b.store.UpdateHostBlocklistEntries(ctx, req.Add, req.Remove, req.Clear)) != nil { return + } else { + if allowlist, err := b.store.HostAllowlist(ctx); jc.Check("couldn't fetch allowlist", err) == nil { + if blocklist, err := b.store.HostBlocklist(ctx); jc.Check("couldn't fetch blocklist", err) == nil { + b.broadcastAction(webhooks.Event{ + Module: api.ModuleACL, + Event: api.EventUpdate, + Payload: api.EventACLUpdate{ + Allowlist: allowlist, + Blocklist: blocklist, + Timestamp: time.Now().UTC(), + }, + }) + } + } } } } diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 32526785a..1a0e019e3 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -2616,13 +2616,10 @@ func TestDownloadAllHosts(t *testing.T) { t.SkipNow() } - // get rid of redundancy - rs := test.RedundancySettings - rs.MinShards = rs.TotalShards - // create a test cluster cluster := newTestCluster(t, testClusterOptions{ - hosts: rs.TotalShards, + logger: newTestLogger(false), + hosts: test.RedundancySettings.TotalShards, uploadPacking: false, // make sure data is uploaded }) defer cluster.Shutdown() @@ -2631,12 +2628,6 @@ func TestDownloadAllHosts(t *testing.T) { w := cluster.Worker tt := cluster.tt - // update redundancy settings - us, err := b.UploadSettings(context.Background()) - us.Redundancy = rs - tt.OK(err) - tt.OK(b.UpdateUploadSettings(context.Background(), us)) - // prepare a file data := make([]byte, 128) tt.OKAll(frand.Read(data)) @@ -2658,13 +2649,10 @@ func TestDownloadAllHosts(t *testing.T) { } } } - if len(usedHosts) != rs.TotalShards { + if len(usedHosts) != test.RedundancySettings.TotalShards { t.Fatalf("unexpected number of used hosts %d", len(usedHosts)) } - // add a host - cluster.AddHosts(1) - // grab random used host var randomHost string for _, host := range cluster.hosts { @@ -2677,6 +2665,10 @@ func TestDownloadAllHosts(t *testing.T) { // add it to the blocklist tt.OK(b.UpdateHostBlocklist(context.Background(), []string{randomHost}, nil, false)) + // add a host + cluster.AddHostsBlocking(1) + cluster.WaitForAccounts() + // wait until we migrated away from that host var newHost types.PublicKey tt.Retry(100, 100*time.Millisecond, func() error { diff --git a/internal/test/e2e/events_test.go b/internal/test/e2e/events_test.go index 5fa7dd768..b87170b89 100644 --- a/internal/test/e2e/events_test.go +++ b/internal/test/e2e/events_test.go @@ -21,6 +21,7 @@ import ( func TestEvents(t *testing.T) { // list all webhooks allEvents := []func(string, map[string]string) webhooks.Webhook{ + api.WebhookACLUpdate, api.WebhookConsensusUpdate, api.WebhookContractArchive, api.WebhookContractRenew, @@ -132,6 +133,9 @@ func TestEvents(t *testing.T) { settings.NetAddress = "127.0.0.1:0" tt.OK(h.UpdateSettings(settings)) + // update ACL + tt.OK(b.UpdateHostAllowlist(context.Background(), []types.PublicKey{h.PublicKey()}, nil, false)) + // wait until we received the events tt.Retry(100, 100*time.Millisecond, func() error { mu.Lock() @@ -148,6 +152,10 @@ func TestEvents(t *testing.T) { event, err := api.ParseEventWebhook(r) tt.OK(err) switch e := event.(type) { + case api.EventACLUpdate: + if e.Timestamp.IsZero() || len(e.Allowlist) != 1 { + t.Fatalf("unexpected event %+v", e) + } case api.EventContractRenew: if e.Renewal.ID != renewed.ID || e.Renewal.RenewedFrom != c.ID || e.Timestamp.IsZero() { t.Fatalf("unexpected event %+v", e) diff --git a/internal/worker/cache.go b/internal/worker/cache.go index 2de374bae..4bab86a43 100644 --- a/internal/worker/cache.go +++ b/internal/worker/cache.go @@ -178,9 +178,11 @@ func (c *cache) HandleEvent(event webhooks.Event) (err error) { case api.EventConsensusUpdate: log = log.With("bh", e.BlockHeight, "ts", e.Timestamp) c.handleConsensusUpdate(e) + c.cache.Invalidate(cacheKeyUsableHosts) case api.EventSettingUpdate: log = log.With("gouging", e.GougingSettings != nil, "pinned", e.PinnedSettings != nil, "upload", e.UploadSettings != nil, "ts", e.Timestamp) c.handleSettingUpdate(e) + case api.EventACLUpdate: case api.EventContractAdd: case api.EventContractArchive: case api.EventContractRenew: diff --git a/internal/worker/events.go b/internal/worker/events.go index e0960fd5c..a0aa2debc 100644 --- a/internal/worker/events.go +++ b/internal/worker/events.go @@ -111,6 +111,7 @@ func (e *eventSubscriber) Register(ctx context.Context, eventsURL string, opts . // prepare webhooks webhooks := []webhooks.Webhook{ + api.WebhookACLUpdate(eventsURL, headers), api.WebhookConsensusUpdate(eventsURL, headers), api.WebhookContractAdd(eventsURL, headers), api.WebhookContractArchive(eventsURL, headers), diff --git a/internal/worker/events_test.go b/internal/worker/events_test.go index 95a74da91..2ade704c0 100644 --- a/internal/worker/events_test.go +++ b/internal/worker/events_test.go @@ -156,8 +156,8 @@ func TestEventSubscriber(t *testing.T) { time.Sleep(testRegisterInterval) // assert webhook was registered - if webhooks := w.Webhooks(); len(webhooks) != 6 { - t.Fatal("expected 6 webhooks, got", len(webhooks)) + if webhooks := w.Webhooks(); len(webhooks) != 7 { + t.Fatal("expected 7 webhooks, got", len(webhooks)) } // send the same event again diff --git a/worker/worker.go b/worker/worker.go index 4f48e7e76..942243cae 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -334,7 +334,7 @@ func (w *Worker) slabMigrateHandler(jc jape.Context) { if err != nil && !utils.IsErr(err, api.ErrSlabNotFound) { var objects []api.ObjectMetadata if res, err := w.bus.Objects(ctx, "", api.ListObjectOptions{SlabEncryptionKey: slab.EncryptionKey}); err != nil { - w.logger.Errorf("failed to list objects for slab key; %w", err) + w.logger.Error("failed to list objects for slab key; %v", err) } else { objects = res.Objects } From 4d00c8315ffb6e19b27924bf6377c41829088b6b Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 6 Nov 2024 16:27:40 +0100 Subject: [PATCH 4/4] cache: fix race --- internal/worker/cache.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/worker/cache.go b/internal/worker/cache.go index 4bab86a43..ea6a42776 100644 --- a/internal/worker/cache.go +++ b/internal/worker/cache.go @@ -217,6 +217,9 @@ func (c *cache) Subscribe(e EventSubscriber) (err error) { } func (c *cache) isReady() bool { + c.mu.Lock() + defer c.mu.Unlock() + select { case <-c.readyChan: return true