Skip to content

Commit

Permalink
Add RHP4 client wrapper (#1630)
Browse files Browse the repository at this point in the history
This PR adds a `rhp4Client` to both `worker` and `bus`. The client is
similar to the existing ones we have.

NOTE: The client doesn't implement any RPCs yet except for the settings
one as an example. The corresponding PRs will implement those RPCs and
make any chances to the signatures if needed.
  • Loading branch information
ChrisSchinnerl authored Oct 31, 2024
2 parents e4fe42d + de77fc0 commit 45abf51
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 34 deletions.
20 changes: 12 additions & 8 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.sia.tech/renterd/internal/rhp"
rhp2 "go.sia.tech/renterd/internal/rhp/v2"
rhp3 "go.sia.tech/renterd/internal/rhp/v3"
rhp4 "go.sia.tech/renterd/internal/rhp/v4"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/stores"
Expand Down Expand Up @@ -325,8 +326,9 @@ type Bus struct {
w Wallet
store Store

rhp2 *rhp2.Client
rhp3 *rhp3.Client
rhp2Client *rhp2.Client
rhp3Client *rhp3.Client
rhp4Client *rhp4.Client

contractLocker ContractLocker
explorer *ibus.Explorer
Expand All @@ -339,6 +341,7 @@ type Bus struct {
// New returns a new Bus
func New(ctx context.Context, masterKey [32]byte, am AlertManager, wm WebhooksManager, cm ChainManager, s Syncer, w Wallet, store Store, announcementMaxAge time.Duration, explorerURL string, l *zap.Logger) (_ *Bus, err error) {
l = l.Named("bus")
dialer := rhp.NewFallbackDialer(store, net.Dialer{}, l)

b := &Bus{
startTime: time.Now(),
Expand All @@ -355,8 +358,9 @@ func New(ctx context.Context, masterKey [32]byte, am AlertManager, wm WebhooksMa
webhooksMgr: wm,
logger: l.Sugar(),

rhp2: rhp2.New(rhp.NewFallbackDialer(store, net.Dialer{}, l), l),
rhp3: rhp3.New(rhp.NewFallbackDialer(store, net.Dialer{}, l), l),
rhp2Client: rhp2.New(dialer, l),
rhp3Client: rhp3.New(dialer, l),
rhp4Client: rhp4.New(dialer),
}

// create contract locker
Expand Down Expand Up @@ -614,7 +618,7 @@ func (b *Bus) broadcastContract(ctx context.Context, fcid types.FileContractID)
renterKey := b.masterKey.DeriveContractKey(c.HostKey)

// fetch revision
rev, err := b.rhp2.SignedRevision(ctx, c.HostIP, c.HostKey, renterKey, fcid, time.Minute)
rev, err := b.rhp2Client.SignedRevision(ctx, c.HostIP, c.HostKey, renterKey, fcid, time.Minute)
if err != nil {
return types.TransactionID{}, fmt.Errorf("couldn't fetch revision; %w", err)
}
Expand Down Expand Up @@ -716,7 +720,7 @@ func (b *Bus) formContract(ctx context.Context, hostSettings rhpv2.HostSettings,
b.w.SignTransaction(&txn, toSign, wallet.ExplicitCoveredFields(txn))

// form the contract
contract, txnSet, err := b.rhp2.FormContract(ctx, hostKey, hostIP, renterKey, append(b.cm.UnconfirmedParents(txn), txn))
contract, txnSet, err := b.rhp2Client.FormContract(ctx, hostKey, hostIP, renterKey, append(b.cm.UnconfirmedParents(txn), txn))
if err != nil {
b.w.ReleaseInputs([]types.Transaction{txn}, nil)
return rhpv2.ContractRevision{}, err
Expand Down Expand Up @@ -866,15 +870,15 @@ func (b *Bus) renewContract(ctx context.Context, cs consensus.State, gp api.Goug
}()

// fetch the revision
rev, err := b.rhp3.Revision(ctx, c.ID, c.HostKey, c.SiamuxAddr)
rev, err := b.rhp3Client.Revision(ctx, c.ID, c.HostKey, c.SiamuxAddr)
if err != nil {
return rhpv2.ContractRevision{}, types.ZeroCurrency, types.ZeroCurrency, fmt.Errorf("couldn't fetch revision; %w", err)
}

// renew contract
gc := gouging.NewChecker(gp.GougingSettings, gp.ConsensusState, nil, nil)
prepareRenew := b.prepareRenew(cs, rev, hs.Address, b.w.Address(), renterFunds, minNewCollateral, endHeight, expectedNewStorage)
newRevision, txnSet, contractPrice, fundAmount, err := b.rhp3.Renew(ctx, gc, rev, renterKey, c.HostKey, c.SiamuxAddr, prepareRenew, b.w.SignTransaction)
newRevision, txnSet, contractPrice, fundAmount, err := b.rhp3Client.Renew(ctx, gc, rev, renterKey, c.HostKey, c.SiamuxAddr, prepareRenew, b.w.SignTransaction)
if err != nil {
return rhpv2.ContractRevision{}, types.ZeroCurrency, types.ZeroCurrency, fmt.Errorf("couldn't renew contract; %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (b *Bus) accountsFundHandler(jc jape.Context) {
defer b.contractLocker.Release(req.ContractID, lockID)

// latest revision
rev, err := b.rhp3.Revision(jc.Request.Context(), req.ContractID, cm.HostKey, cm.SiamuxAddr)
rev, err := b.rhp3Client.Revision(jc.Request.Context(), req.ContractID, cm.HostKey, cm.SiamuxAddr)
if jc.Check("failed to fetch contract revision", err) != nil {
return
}
Expand All @@ -72,7 +72,7 @@ func (b *Bus) accountsFundHandler(jc jape.Context) {
}

// price table
pt, err := b.rhp3.PriceTable(jc.Request.Context(), cm.HostKey, cm.SiamuxAddr, rhp3.PreparePriceTableContractPayment(&rev, req.AccountID, rk))
pt, err := b.rhp3Client.PriceTable(jc.Request.Context(), cm.HostKey, cm.SiamuxAddr, rhp3.PreparePriceTableContractPayment(&rev, req.AccountID, rk))
if jc.Check("failed to fetch price table", err) != nil {
return
}
Expand All @@ -92,7 +92,7 @@ func (b *Bus) accountsFundHandler(jc jape.Context) {
}

// fund the account
err = b.rhp3.FundAccount(jc.Request.Context(), &rev, cm.HostKey, cm.SiamuxAddr, deposit, req.AccountID, pt.HostPriceTable, rk)
err = b.rhp3Client.FundAccount(jc.Request.Context(), &rev, cm.HostKey, cm.SiamuxAddr, deposit, req.AccountID, pt.HostPriceTable, rk)
if jc.Check("failed to fund account", err) != nil {
return
}
Expand Down Expand Up @@ -852,7 +852,7 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) {

// prune the contract
rk := b.masterKey.DeriveContractKey(c.HostKey)
rev, spending, pruned, remaining, err := b.rhp2.PruneContract(pruneCtx, rk, gc, c.HostIP, c.HostKey, fcid, c.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) {
rev, spending, pruned, remaining, err := b.rhp2Client.PruneContract(pruneCtx, rk, gc, c.HostIP, c.HostKey, fcid, c.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) {
indices, err := b.store.PrunableContractRoots(ctx, fcid, roots)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2319,7 +2319,7 @@ func (b *Bus) contractsFormHandler(jc jape.Context) {
gc := gouging.NewChecker(gp.GougingSettings, gp.ConsensusState, nil, nil)

// fetch host settings
settings, err := b.rhp2.Settings(ctx, rfr.HostKey, rfr.HostIP)
settings, err := b.rhp2Client.Settings(ctx, rfr.HostKey, rfr.HostIP)
if jc.Check("couldn't fetch host settings", err) != nil {
return
}
Expand Down
11 changes: 5 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ require (
github.com/mattn/go-sqlite3 v1.14.24
github.com/montanaflynn/stats v0.7.1
github.com/shopspring/decimal v1.4.0
go.sia.tech/core v0.5.0
go.sia.tech/coreutils v0.5.0
go.sia.tech/core v0.5.1-0.20241028140321-8319d4147268
go.sia.tech/coreutils v0.5.1-0.20241028141200-29e49cd3cb7e
go.sia.tech/gofakes3 v0.0.5
go.sia.tech/hostd v1.1.3-0.20241007200428-58d2cc50a7cd
go.sia.tech/hostd v1.1.3-0.20241028141324-e5eeff133133
go.sia.tech/jape v0.12.1
go.sia.tech/mux v1.3.0
go.sia.tech/web/renterd v0.65.0
Expand All @@ -31,7 +31,7 @@ require (

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/cloudflare/cloudflare-go v0.104.0 // indirect
github.com/cloudflare/cloudflare-go v0.107.0 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
Expand All @@ -45,7 +45,6 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/time v0.7.0 // indirect
golang.org/x/tools v0.23.0 // indirect
nhooyr.io/websocket v1.8.17 // indirect
)
22 changes: 10 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/cloudflare/cloudflare-go v0.104.0 h1:R/lB0dZupaZbOgibAH/BRrkFbZ6Acn/WsKg2iX2xXuY=
github.com/cloudflare/cloudflare-go v0.104.0/go.mod h1:pfUQ4PIG4ISI0/Mmc21Bp86UnFU0ktmPf3iTgbSL+cM=
github.com/cloudflare/cloudflare-go v0.107.0 h1:cMDIw2tzt6TXCJyMFVyP+BPOVkIfMvcKjhMNSNvuEPc=
github.com/cloudflare/cloudflare-go v0.107.0/go.mod h1:5cYGzVBqNTLxMYSLdVjuSs5LJL517wJDSvMPWUrzHzc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -55,14 +55,14 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
go.sia.tech/core v0.5.0 h1:feLC7DSCF+PhU157s/94106hFKyiGrGQ9HC3/dF/l7E=
go.sia.tech/core v0.5.0/go.mod h1:P3C1BWa/7J4XgdzWuaYHBvLo2RzZ0UBaJM4TG1GWB2g=
go.sia.tech/coreutils v0.5.0 h1:/xKxdw83iZy0jjLzI2NGHyG4azyjK5DJscxpkr6nIGQ=
go.sia.tech/coreutils v0.5.0/go.mod h1:VYM4FcmlhVrpDGvglLHjRW+gitoaxPNLvp5mL2quilo=
go.sia.tech/core v0.5.1-0.20241028140321-8319d4147268 h1:Afh3x9rg6pI183LQVYIGQ3quhrRgHWez4987JSKmTpk=
go.sia.tech/core v0.5.1-0.20241028140321-8319d4147268/go.mod h1:P3C1BWa/7J4XgdzWuaYHBvLo2RzZ0UBaJM4TG1GWB2g=
go.sia.tech/coreutils v0.5.1-0.20241028141200-29e49cd3cb7e h1:vhqvgo9X6VWiJblgXuTooUhercsPEhizHVGWfh2E5VE=
go.sia.tech/coreutils v0.5.1-0.20241028141200-29e49cd3cb7e/go.mod h1:YlsVY5XQRfTEp2FiaOiOq0KpmWGzJ5stnuESJR/pktg=
go.sia.tech/gofakes3 v0.0.5 h1:vFhVBUFbKE9ZplvLE2w4TQxFMQyF8qvgxV4TaTph+Vw=
go.sia.tech/gofakes3 v0.0.5/go.mod h1:LXEzwGw+OHysWLmagleCttX93cJZlT9rBu/icOZjQ54=
go.sia.tech/hostd v1.1.3-0.20241007200428-58d2cc50a7cd h1:kcw3S5gVN8uOnIaVBQMt5UNpRe3z/+xwpmQT5B6+Zbs=
go.sia.tech/hostd v1.1.3-0.20241007200428-58d2cc50a7cd/go.mod h1:ZDezsW1NNGPhodtAWMuzhxQdPg2fcDOgZPi/eSSuPBc=
go.sia.tech/hostd v1.1.3-0.20241028141324-e5eeff133133 h1:tUoU+0VXeYSjzzXLo3Y8olaGVlPzTLxCupmAhTEASG8=
go.sia.tech/hostd v1.1.3-0.20241028141324-e5eeff133133/go.mod h1:xjTKgRZjou9qzvVjuwfAEO6qCJRBU0BSYcXfzntlRmU=
go.sia.tech/jape v0.12.1 h1:xr+o9V8FO8ScRqbSaqYf9bjj1UJ2eipZuNcI1nYousU=
go.sia.tech/jape v0.12.1/go.mod h1:wU+h6Wh5olDjkPXjF0tbZ1GDgoZ6VTi4naFw91yyWC4=
go.sia.tech/mux v1.3.0 h1:hgR34IEkqvfBKUJkAzGi31OADeW2y7D6Bmy/Jcbop9c=
Expand Down Expand Up @@ -97,8 +97,8 @@ golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20190829051458-42f498d34c4d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg=
golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI=
Expand All @@ -114,5 +114,3 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
lukechampine.com/frand v1.5.1 h1:fg0eRtdmGFIxhP5zQJzM1lFDbD6CUfu/f+7WgAZd5/w=
lukechampine.com/frand v1.5.1/go.mod h1:4VstaWc2plN4Mjr10chUD46RAVGWhpkZ5Nja8+Azp0Q=
nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y=
nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
102 changes: 102 additions & 0 deletions internal/rhp/v4/rhp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package rhp

import (
"context"
"errors"
"io"
"net"

"go.sia.tech/core/consensus"
rhp4 "go.sia.tech/core/rhp/v4"
"go.sia.tech/core/types"
rhp "go.sia.tech/coreutils/rhp/v4"
)

var (
// errDialTransport is returned when the worker could not dial the host.
ErrDialTransport = errors.New("could not dial transport")
)

type (
Dialer interface {
Dial(ctx context.Context, hk types.PublicKey, address string) (net.Conn, error)
}
)

type Client struct {
tpool *transportPool
}

func New(dialer Dialer) *Client {
return &Client{
tpool: newTransportPool(dialer),
}
}

func (c *Client) Settings(ctx context.Context, hk types.PublicKey, addr string) (hs rhp4.HostSettings, _ error) {
err := c.tpool.withTransport(ctx, hk, addr, func(c rhp.TransportClient) (err error) {
hs, err = rhp.RPCSettings(ctx, c)
return err
})
return hs, err
}

// ReadSector reads a sector from the host.
func (c *Client) ReadSector(ctx context.Context, prices rhp4.HostPrices, token rhp4.AccountToken, w io.Writer, root types.Hash256, offset, length uint64) (rhp.RPCReadSectorResult, error) {
panic("not implemented")
}

// WriteSector writes a sector to the host.
func (c *Client) WriteSector(ctx context.Context, prices rhp4.HostPrices, token rhp4.AccountToken, rl rhp.ReaderLen, duration uint64) (rhp.RPCWriteSectorResult, error) {
panic("not implemented")
}

// VerifySector verifies that the host is properly storing a sector
func (c *Client) VerifySector(ctx context.Context, prices rhp4.HostPrices, token rhp4.AccountToken, root types.Hash256) (rhp.RPCVerifySectorResult, error) {
panic("not implemented")
}

// FreeSectors removes sectors from a contract.
func (c *Client) FreeSectors(ctx context.Context, cs consensus.State, prices rhp4.HostPrices, sk types.PrivateKey, contract rhp.ContractRevision, indices []uint64) (rhp.RPCFreeSectorsResult, error) {
panic("not implemented")
}

// AppendSectors appends sectors a host is storing to a contract.
func (c *Client) AppendSectors(ctx context.Context, cs consensus.State, prices rhp4.HostPrices, sk types.PrivateKey, contract rhp.ContractRevision, roots []types.Hash256) (rhp.RPCAppendSectorsResult, error) {
panic("not implemented")
}

// FundAccounts funds accounts on the host.
func (c *Client) FundAccounts(ctx context.Context, cs consensus.State, signer rhp.ContractSigner, contract rhp.ContractRevision, deposits []rhp4.AccountDeposit) (rhp.RPCFundAccountResult, error) {
panic("not implemented")
}

// LatestRevision returns the latest revision of a contract.
func (c *Client) LatestRevision(ctx context.Context, contractID types.FileContractID) (types.V2FileContract, error) {
panic("not implemented")
}

// SectorRoots returns the sector roots for a contract.
func (c *Client) SectorRoots(ctx context.Context, cs consensus.State, prices rhp4.HostPrices, signer rhp.ContractSigner, contract rhp.ContractRevision, offset, length uint64) (rhp.RPCSectorRootsResult, error) {
panic("not implemented")
}

// AccountBalance returns the balance of an account.
func (c *Client) AccountBalance(ctx context.Context, account rhp4.Account) (types.Currency, error) {
panic("not implemented")
}

// FormContract forms a contract with a host
func (c *Client) FormContract(ctx context.Context, tp rhp.TxPool, signer rhp.FormContractSigner, cs consensus.State, p rhp4.HostPrices, hostKey types.PublicKey, hostAddress types.Address, params rhp4.RPCFormContractParams) (rhp.RPCFormContractResult, error) {
panic("not implemented")
}

// RenewContract renews a contract with a host.
func (c *Client) RenewContract(ctx context.Context, tp rhp.TxPool, signer rhp.FormContractSigner, cs consensus.State, p rhp4.HostPrices, existing types.V2FileContract, params rhp4.RPCRenewContractParams) (rhp.RPCRenewContractResult, error) {
panic("not implemented")
}

// RefreshContract refreshes a contract with a host.
func (c *Client) RefreshContract(ctx context.Context, tp rhp.TxPool, signer rhp.FormContractSigner, cs consensus.State, p rhp4.HostPrices, existing types.V2FileContract, params rhp4.RPCRefreshContractParams) (rhp.RPCRefreshContractResult, error) {
panic("not implemented")
}
Loading

0 comments on commit 45abf51

Please sign in to comment.