From 643895b6e73b8eb99b1b4954692b9845748618d9 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 28 Oct 2024 16:13:43 +0100 Subject: [PATCH] rhp4: add client --- go.mod | 11 ++-- go.sum | 22 ++++--- internal/rhp/v4/rhp.go | 44 ++++++++++++++ internal/rhp/v4/transport.go | 111 +++++++++++++++++++++++++++++++++++ internal/test/e2e/host.go | 6 +- 5 files changed, 173 insertions(+), 21 deletions(-) create mode 100644 internal/rhp/v4/rhp.go create mode 100644 internal/rhp/v4/transport.go diff --git a/go.mod b/go.mod index 9c47a08c9..b97e9ada9 100644 --- a/go.mod +++ b/go.mod @@ -14,10 +14,10 @@ require ( github.com/mattn/go-sqlite3 v1.14.24 github.com/montanaflynn/stats v0.7.1 github.com/shopspring/decimal v1.4.0 - go.sia.tech/core v0.5.0 - go.sia.tech/coreutils v0.5.0 + go.sia.tech/core v0.5.1-0.20241028140321-8319d4147268 + go.sia.tech/coreutils v0.5.1-0.20241028141200-29e49cd3cb7e go.sia.tech/gofakes3 v0.0.5 - go.sia.tech/hostd v1.1.3-0.20241007200428-58d2cc50a7cd + go.sia.tech/hostd v1.1.3-0.20241028141324-e5eeff133133 go.sia.tech/jape v0.12.1 go.sia.tech/mux v1.3.0 go.sia.tech/web/renterd v0.65.0 @@ -31,7 +31,7 @@ require ( require ( filippo.io/edwards25519 v1.1.0 // indirect - github.com/cloudflare/cloudflare-go v0.104.0 // indirect + github.com/cloudflare/cloudflare-go v0.107.0 // indirect github.com/goccy/go-json v0.10.3 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect @@ -45,7 +45,6 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/text v0.19.0 // indirect - golang.org/x/time v0.6.0 // indirect + golang.org/x/time v0.7.0 // indirect golang.org/x/tools v0.23.0 // indirect - nhooyr.io/websocket v1.8.17 // indirect ) diff --git a/go.sum b/go.sum index 55115cd4f..38dc3a531 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= -github.com/cloudflare/cloudflare-go v0.104.0 h1:R/lB0dZupaZbOgibAH/BRrkFbZ6Acn/WsKg2iX2xXuY= -github.com/cloudflare/cloudflare-go v0.104.0/go.mod h1:pfUQ4PIG4ISI0/Mmc21Bp86UnFU0ktmPf3iTgbSL+cM= +github.com/cloudflare/cloudflare-go v0.107.0 h1:cMDIw2tzt6TXCJyMFVyP+BPOVkIfMvcKjhMNSNvuEPc= +github.com/cloudflare/cloudflare-go v0.107.0/go.mod h1:5cYGzVBqNTLxMYSLdVjuSs5LJL517wJDSvMPWUrzHzc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -55,14 +55,14 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0= go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= -go.sia.tech/core v0.5.0 h1:feLC7DSCF+PhU157s/94106hFKyiGrGQ9HC3/dF/l7E= -go.sia.tech/core v0.5.0/go.mod h1:P3C1BWa/7J4XgdzWuaYHBvLo2RzZ0UBaJM4TG1GWB2g= -go.sia.tech/coreutils v0.5.0 h1:/xKxdw83iZy0jjLzI2NGHyG4azyjK5DJscxpkr6nIGQ= -go.sia.tech/coreutils v0.5.0/go.mod h1:VYM4FcmlhVrpDGvglLHjRW+gitoaxPNLvp5mL2quilo= +go.sia.tech/core v0.5.1-0.20241028140321-8319d4147268 h1:Afh3x9rg6pI183LQVYIGQ3quhrRgHWez4987JSKmTpk= +go.sia.tech/core v0.5.1-0.20241028140321-8319d4147268/go.mod h1:P3C1BWa/7J4XgdzWuaYHBvLo2RzZ0UBaJM4TG1GWB2g= +go.sia.tech/coreutils v0.5.1-0.20241028141200-29e49cd3cb7e h1:vhqvgo9X6VWiJblgXuTooUhercsPEhizHVGWfh2E5VE= +go.sia.tech/coreutils v0.5.1-0.20241028141200-29e49cd3cb7e/go.mod h1:YlsVY5XQRfTEp2FiaOiOq0KpmWGzJ5stnuESJR/pktg= go.sia.tech/gofakes3 v0.0.5 h1:vFhVBUFbKE9ZplvLE2w4TQxFMQyF8qvgxV4TaTph+Vw= go.sia.tech/gofakes3 v0.0.5/go.mod h1:LXEzwGw+OHysWLmagleCttX93cJZlT9rBu/icOZjQ54= -go.sia.tech/hostd v1.1.3-0.20241007200428-58d2cc50a7cd h1:kcw3S5gVN8uOnIaVBQMt5UNpRe3z/+xwpmQT5B6+Zbs= -go.sia.tech/hostd v1.1.3-0.20241007200428-58d2cc50a7cd/go.mod h1:ZDezsW1NNGPhodtAWMuzhxQdPg2fcDOgZPi/eSSuPBc= +go.sia.tech/hostd v1.1.3-0.20241028141324-e5eeff133133 h1:tUoU+0VXeYSjzzXLo3Y8olaGVlPzTLxCupmAhTEASG8= +go.sia.tech/hostd v1.1.3-0.20241028141324-e5eeff133133/go.mod h1:xjTKgRZjou9qzvVjuwfAEO6qCJRBU0BSYcXfzntlRmU= go.sia.tech/jape v0.12.1 h1:xr+o9V8FO8ScRqbSaqYf9bjj1UJ2eipZuNcI1nYousU= go.sia.tech/jape v0.12.1/go.mod h1:wU+h6Wh5olDjkPXjF0tbZ1GDgoZ6VTi4naFw91yyWC4= go.sia.tech/mux v1.3.0 h1:hgR34IEkqvfBKUJkAzGi31OADeW2y7D6Bmy/Jcbop9c= @@ -97,8 +97,8 @@ golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= -golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= +golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20190829051458-42f498d34c4d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg= golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI= @@ -114,5 +114,3 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= lukechampine.com/frand v1.5.1 h1:fg0eRtdmGFIxhP5zQJzM1lFDbD6CUfu/f+7WgAZd5/w= lukechampine.com/frand v1.5.1/go.mod h1:4VstaWc2plN4Mjr10chUD46RAVGWhpkZ5Nja8+Azp0Q= -nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y= -nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/internal/rhp/v4/rhp.go b/internal/rhp/v4/rhp.go new file mode 100644 index 000000000..08d46526a --- /dev/null +++ b/internal/rhp/v4/rhp.go @@ -0,0 +1,44 @@ +package rhp + +import ( + "context" + "errors" + "net" + + rhp4 "go.sia.tech/core/rhp/v4" + "go.sia.tech/core/types" + rhp "go.sia.tech/coreutils/rhp/v4" +) + +var ( + // errDialTransport is returned when the worker could not dial the host. + ErrDialTransport = errors.New("could not dial transport") +) + +type ( + Dialer interface { + Dial(ctx context.Context, hk types.PublicKey, address string) (net.Conn, error) + } +) + +type Client struct { + tpool *transportPool +} + +func New(dialer Dialer) *Client { + return &Client{ + tpool: newTransportPool(dialer), + } +} + +func (c *Client) Settings(ctx context.Context, hk types.PublicKey, addr string) (hs rhp4.HostSettings, _ error) { + err := c.tpool.withTransport(hk, addr, func(t *transport) error { + client, err := t.Dial(ctx) + if err != nil { + return err + } + hs, err = rhp.RPCSettings(ctx, client) + return err + }) + return hs, err +} diff --git a/internal/rhp/v4/transport.go b/internal/rhp/v4/transport.go new file mode 100644 index 000000000..7aea8c6e2 --- /dev/null +++ b/internal/rhp/v4/transport.go @@ -0,0 +1,111 @@ +package rhp + +import ( + "context" + "fmt" + "sync" + "time" + + "go.sia.tech/core/types" + rhp "go.sia.tech/coreutils/rhp/v4" +) + +type transportPool struct { + dialer Dialer + + mu sync.Mutex + pool map[string]*transport +} + +func newTransportPool(dialer Dialer) *transportPool { + return &transportPool{ + dialer: dialer, + pool: make(map[string]*transport), + } +} + +func (p *transportPool) withTransport(hk types.PublicKey, addr string, fn func(*transport) error) (err error) { + // fetch or create transport + p.mu.Lock() + t, found := p.pool[addr] + if !found { + t = &transport{ + dialer: p.dialer, + hk: hk, + addr: addr, + } + p.pool[addr] = t + } + t.refCount++ + p.mu.Unlock() + + // execute function + err = func() (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic (withTransport): %v", r) + } + }() + return fn(t) + }() + + // Decrement refcounter again and clean up pool. + p.mu.Lock() + t.refCount-- + if t.refCount == 0 { + // Cleanup + if t.t != nil { + _ = t.t.Close() + t.t = nil + } + delete(p.pool, addr) + } + p.mu.Unlock() + return err +} + +// transportPool is a pool of rhpv4.TransportClients which allows for reusing +// them. +func dialTransport(ctx context.Context, dialer Dialer, addr string, hk types.PublicKey) (rhp.TransportClient, error) { + // Dial host. + conn, err := dialer.Dial(ctx, hk, addr) + if err != nil { + return nil, err + } + + // Upgrade to rhpv4.Transport. + return rhp.UpgradeConn(ctx, conn, hk) +} + +type transport struct { + dialer Dialer + refCount uint64 // locked by pool + + mu sync.Mutex + hk types.PublicKey + addr string + t rhp.TransportClient +} + +// DialStream dials a new stream on the transport. +func (t *transport) Dial(ctx context.Context) (rhp.TransportClient, error) { + t.mu.Lock() + defer t.mu.Unlock() + if t.t == nil { + start := time.Now() + + // dial host + conn, err := t.dialer.Dial(ctx, t.hk, t.addr) + if err != nil { + return nil, err + } + + // upgrade conn + newTransport, err := rhp.UpgradeConn(ctx, conn, t.hk) + if err != nil { + return nil, fmt.Errorf("UpgradeConn: %w: %w (%v)", ErrDialTransport, err, time.Since(start)) + } + t.t = newTransport + } + return t.t, nil +} diff --git a/internal/test/e2e/host.go b/internal/test/e2e/host.go index 1ba6b5acf..a1c99a722 100644 --- a/internal/test/e2e/host.go +++ b/internal/test/e2e/host.go @@ -283,7 +283,7 @@ func NewHost(privKey types.PrivateKey, dir string, network *consensus.Network, g return nil, fmt.Errorf("failed to create rhp3 listener: %w", err) } - settings, err := settings.NewConfigManager(privKey, db, cm, s, wallet, settings.WithValidateNetAddress(false)) + settings, err := settings.NewConfigManager(privKey, db, cm, s, wallet, storage, settings.WithValidateNetAddress(false)) if err != nil { return nil, fmt.Errorf("failed to create settings manager: %w", err) } @@ -296,13 +296,13 @@ func NewHost(privKey types.PrivateKey, dir string, network *consensus.Network, g registry := registry.NewManager(privKey, db, zap.NewNop()) accounts := accounts.NewManager(db, settings) - rhpv2, err := rhpv2.NewSessionHandler(rhp2Listener, privKey, rhp3Listener.Addr().String(), cm, s, wallet, contracts, settings, storage) + rhpv2, err := rhpv2.NewSessionHandler(rhp2Listener, privKey, rhp3Listener.Addr().String(), cm, s, wallet, contracts, settings, storage, log) if err != nil { return nil, fmt.Errorf("failed to create rhpv2 session handler: %w", err) } go rhpv2.Serve() - rhpv3, err := rhpv3.NewSessionHandler(rhp3Listener, privKey, cm, s, wallet, accounts, contracts, registry, storage, settings) + rhpv3, err := rhpv3.NewSessionHandler(rhp3Listener, privKey, cm, s, wallet, accounts, contracts, registry, storage, settings, log) if err != nil { return nil, fmt.Errorf("failed to create rhpv3 session handler: %w", err) }