Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new usable hosts endpoint #1647

Open
wants to merge 7 commits into
base: pj/usable-hosts
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ func (cm ContractMetadata) InSet(set string) bool {
return false
}

func (cm ContractMetadata) HostInfo() HostInfo {
return HostInfo{
PublicKey: cm.HostKey,
ContractID: cm.ID,
SiamuxAddr: cm.SiamuxAddr,
}
}

type (
Revision struct {
ContractID types.FileContractID `json:"contractID"`
Expand Down
24 changes: 24 additions & 0 deletions api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

const (
ModuleACL = "acl"
ModuleConsensus = "consensus"
ModuleContract = "contract"
ModuleContractSet = "contract_set"
Expand All @@ -28,6 +29,12 @@ var (
)

type (
EventACLUpdate struct {
Allowlist []types.PublicKey `json:"allowlist"`
Blocklist []string `json:"blocklist"`
Timestamp time.Time `json:"timestamp"`
}

EventConsensusUpdate struct {
ConsensusState
TransactionFee types.Currency `json:"transactionFee"`
Expand Down Expand Up @@ -73,6 +80,15 @@ type (
)

var (
WebhookACLUpdate = func(url string, headers map[string]string) webhooks.Webhook {
return webhooks.Webhook{
Event: EventUpdate,
Headers: headers,
Module: ModuleACL,
URL: url,
}
}

WebhookConsensusUpdate = func(url string, headers map[string]string) webhooks.Webhook {
return webhooks.Webhook{
Event: EventUpdate,
Expand Down Expand Up @@ -143,6 +159,14 @@ func ParseEventWebhook(event webhooks.Event) (interface{}, error) {
return nil, err
}
switch event.Module {
case ModuleACL:
if event.Event == EventUpdate {
var e EventACLUpdate
if err := json.Unmarshal(bytes, &e); err != nil {
return nil, err
}
return e, nil
}
case ModuleContract:
switch event.Event {
case EventAdd:
Expand Down
5 changes: 3 additions & 2 deletions api/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ type (
}

HostInfo struct {
PublicKey types.PublicKey `json:"publicKey"`
SiamuxAddr string `json:"siamuxAddr"`
ContractID types.FileContractID `json:"contractID"`
PublicKey types.PublicKey `json:"publicKey"`
SiamuxAddr string `json:"siamuxAddr"`
}

HostInteractions struct {
Expand Down
18 changes: 10 additions & 8 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ type (
UpdateHostAllowlistEntries(ctx context.Context, add, remove []types.PublicKey, clear bool) error
UpdateHostBlocklistEntries(ctx context.Context, add, remove []string, clear bool) error
UpdateHostCheck(ctx context.Context, autopilotID string, hk types.PublicKey, check api.HostCheck) error
UsableHosts(ctx context.Context, gc gouging.Checker, offset, limit int) ([]api.HostInfo, error)
UsableHosts(ctx context.Context, gc gouging.Checker, minWindowStart uint64, offset, limit int) ([]api.HostInfo, error)
}

// A MetadataStore stores information about contracts and objects.
Expand Down Expand Up @@ -316,9 +316,10 @@ type (
)

type Bus struct {
allowPrivateIPs bool
startTime time.Time
masterKey utils.MasterKey
allowPrivateIPs bool
startTime time.Time
masterKey utils.MasterKey
revisionSubmissionBuffer uint64

alerts alerts.Alerter
alertMgr AlertManager
Expand All @@ -343,14 +344,15 @@ type Bus struct {
}

// New returns a new Bus
func New(ctx context.Context, cfg config.Bus, masterKey [32]byte, am AlertManager, wm WebhooksManager, cm ChainManager, s Syncer, w Wallet, store Store, explorerURL string, l *zap.Logger) (_ *Bus, err error) {
func New(ctx context.Context, cfg config.Bus, masterKey [32]byte, am AlertManager, wm WebhooksManager, cm ChainManager, s Syncer, w Wallet, store Store, explorerURL string, revisionSubmissionBuffer uint64, l *zap.Logger) (_ *Bus, err error) {
l = l.Named("bus")
dialer := rhp.NewFallbackDialer(store, net.Dialer{}, l)

b := &Bus{
allowPrivateIPs: cfg.AllowPrivateIPs,
startTime: time.Now(),
masterKey: masterKey,
allowPrivateIPs: cfg.AllowPrivateIPs,
revisionSubmissionBuffer: revisionSubmissionBuffer,
startTime: time.Now(),
masterKey: masterKey,

s: s,
cm: cm,
Expand Down
36 changes: 35 additions & 1 deletion bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,13 +501,19 @@ func (b *Bus) hostsHandlerGET(jc jape.Context) {
return
}

cs, err := b.consensusState(jc.Request.Context())
if jc.Check("couldn't fetch consensus state", err) != nil {
return
}
minWindowStart := cs.BlockHeight + b.revisionSubmissionBuffer

gp, err := b.gougingParams(jc.Request.Context())
if jc.Check("could not get gouging parameters", err) != nil {
return
}
gc := gouging.NewChecker(gp.GougingSettings, gp.ConsensusState, nil, nil)

hosts, err := b.store.UsableHosts(jc.Request.Context(), gc, offset, limit)
hosts, err := b.store.UsableHosts(jc.Request.Context(), gc, minWindowStart, offset, limit)
if jc.Check("couldn't fetch hosts", err) != nil {
return
}
Expand Down Expand Up @@ -686,6 +692,20 @@ func (b *Bus) hostsAllowlistHandlerPUT(jc jape.Context) {
return
} else if jc.Check("couldn't update allowlist entries", b.store.UpdateHostAllowlistEntries(ctx, req.Add, req.Remove, req.Clear)) != nil {
return
} else {
if allowlist, err := b.store.HostAllowlist(ctx); jc.Check("couldn't fetch allowlist", err) == nil {
if blocklist, err := b.store.HostBlocklist(ctx); jc.Check("couldn't fetch blocklist", err) == nil {
b.broadcastAction(webhooks.Event{
Module: api.ModuleACL,
Event: api.EventUpdate,
Payload: api.EventACLUpdate{
Allowlist: allowlist,
Blocklist: blocklist,
Timestamp: time.Now().UTC(),
},
})
}
}
}
}
}
Expand All @@ -706,6 +726,20 @@ func (b *Bus) hostsBlocklistHandlerPUT(jc jape.Context) {
return
} else if jc.Check("couldn't update blocklist entries", b.store.UpdateHostBlocklistEntries(ctx, req.Add, req.Remove, req.Clear)) != nil {
return
} else {
if allowlist, err := b.store.HostAllowlist(ctx); jc.Check("couldn't fetch allowlist", err) == nil {
if blocklist, err := b.store.HostBlocklist(ctx); jc.Check("couldn't fetch blocklist", err) == nil {
b.broadcastAction(webhooks.Event{
Module: api.ModuleACL,
Event: api.EventUpdate,
Payload: api.EventACLUpdate{
Allowlist: allowlist,
Blocklist: blocklist,
Timestamp: time.Now().UTC(),
},
})
}
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/renterd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ func defaultConfig() config.Config {
AnnouncementMaxAgeHours: 24 * 7 * 52, // 1 year
Bootstrap: true,
GatewayAddr: ":9981",
UsedUTXOExpiry: 24 * time.Hour,
RevisionSubmissionBuffer: 150, // 144 + 6 blocks leeway
SlabBufferCompletionThreshold: 1 << 12,
UsedUTXOExpiry: 24 * time.Hour,
},
Worker: config.Worker{
Enabled: true,
Expand Down
2 changes: 1 addition & 1 deletion cmd/renterd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func newBus(ctx context.Context, cfg config.Config, pk types.PrivateKey, network
}

// create bus
b, err := bus.New(ctx, cfg.Bus, masterKey, alertsMgr, wh, cm, s, w, sqlStore, explorerURL, logger)
b, err := bus.New(ctx, cfg.Bus, masterKey, alertsMgr, wh, cm, s, w, sqlStore, explorerURL, cfg.Bus.RevisionSubmissionBuffer, logger)
if err != nil {
return nil, nil, fmt.Errorf("failed to create bus: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ type (
GatewayAddr string `yaml:"gatewayAddr,omitempty"`
RemoteAddr string `yaml:"remoteAddr,omitempty"`
RemotePassword string `yaml:"remotePassword,omitempty"`
UsedUTXOExpiry time.Duration `yaml:"usedUtxoExpiry,omitempty"`
RevisionSubmissionBuffer uint64 `yaml:"revisionSubmissionBuffer,omitempty"`
SlabBufferCompletionThreshold int64 `yaml:"slabBufferCompleionThreshold,omitempty"`
UsedUTXOExpiry time.Duration `yaml:"usedUtxoExpiry,omitempty"`
}

// LogFile configures the file output of the logger.
Expand Down
5 changes: 4 additions & 1 deletion internal/test/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func newTestBus(ctx context.Context, cm *chain.Manager, genesisBlock types.Block
masterKey := blake2b.Sum256(append([]byte("worker"), pk...))

// create bus
b, err := bus.New(ctx, cfg, masterKey, alertsMgr, wh, cm, s, w, sqlStore, "", logger)
b, err := bus.New(ctx, cfg, masterKey, alertsMgr, wh, cm, s, w, sqlStore, "", cfg.RevisionSubmissionBuffer, logger)
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down Expand Up @@ -716,6 +716,9 @@ func (c *TestCluster) sync() {
func (c *TestCluster) WaitForAccounts() []api.Account {
c.tt.Helper()

// mine a block (ensures worker cache gets invalidated)
c.MineBlocks(1)

// build hosts map
hostsMap := make(map[types.PublicKey]struct{})
for _, host := range c.hosts {
Expand Down
22 changes: 7 additions & 15 deletions internal/test/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2618,13 +2618,10 @@ func TestDownloadAllHosts(t *testing.T) {
t.SkipNow()
}

// get rid of redundancy
rs := test.RedundancySettings
rs.MinShards = rs.TotalShards

// create a test cluster
cluster := newTestCluster(t, testClusterOptions{
hosts: rs.TotalShards,
logger: newTestLogger(false),
hosts: test.RedundancySettings.TotalShards,
uploadPacking: false, // make sure data is uploaded
})
defer cluster.Shutdown()
Expand All @@ -2633,12 +2630,6 @@ func TestDownloadAllHosts(t *testing.T) {
w := cluster.Worker
tt := cluster.tt

// update redundancy settings
us, err := b.UploadSettings(context.Background())
us.Redundancy = rs
tt.OK(err)
tt.OK(b.UpdateUploadSettings(context.Background(), us))

// prepare a file
data := make([]byte, 128)
tt.OKAll(frand.Read(data))
Expand All @@ -2660,13 +2651,10 @@ func TestDownloadAllHosts(t *testing.T) {
}
}
}
if len(usedHosts) != rs.TotalShards {
if len(usedHosts) != test.RedundancySettings.TotalShards {
t.Fatalf("unexpected number of used hosts %d", len(usedHosts))
}

// add a host
cluster.AddHosts(1)

// grab random used host
var randomHost string
for _, host := range cluster.hosts {
Expand All @@ -2679,6 +2667,10 @@ func TestDownloadAllHosts(t *testing.T) {
// add it to the blocklist
tt.OK(b.UpdateHostBlocklist(context.Background(), []string{randomHost}, nil, false))

// add a host
cluster.AddHostsBlocking(1)
cluster.WaitForAccounts()

// wait until we migrated away from that host
var newHost types.PublicKey
tt.Retry(100, 100*time.Millisecond, func() error {
Expand Down
8 changes: 8 additions & 0 deletions internal/test/e2e/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
func TestEvents(t *testing.T) {
// list all webhooks
allEvents := []func(string, map[string]string) webhooks.Webhook{
api.WebhookACLUpdate,
api.WebhookConsensusUpdate,
api.WebhookContractArchive,
api.WebhookContractRenew,
Expand Down Expand Up @@ -132,6 +133,9 @@ func TestEvents(t *testing.T) {
settings.NetAddress = "127.0.0.1:0"
tt.OK(h.UpdateSettings(settings))

// update ACL
tt.OK(b.UpdateHostAllowlist(context.Background(), []types.PublicKey{h.PublicKey()}, nil, false))

// wait until we received the events
tt.Retry(100, 100*time.Millisecond, func() error {
mu.Lock()
Expand All @@ -148,6 +152,10 @@ func TestEvents(t *testing.T) {
event, err := api.ParseEventWebhook(r)
tt.OK(err)
switch e := event.(type) {
case api.EventACLUpdate:
if e.Timestamp.IsZero() || len(e.Allowlist) != 1 {
t.Fatalf("unexpected event %+v", e)
}
case api.EventContractRenew:
if e.Renewal.ID != renewed.ID || e.Renewal.RenewedFrom != c.ID || e.Timestamp.IsZero() {
t.Fatalf("unexpected event %+v", e)
Expand Down
Loading
Loading