From 3017789e54f74565df4c82afaa9a88c2cb1c9760 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Mon, 5 Aug 2024 18:10:40 +1000 Subject: [PATCH] fix: address issues raised from code review --- cmd/lotus-gateway/main.go | 18 ++-- gateway/handler.go | 182 +++++++++++++++++++++++--------------- gateway/handler_test.go | 2 +- gateway/node.go | 109 ++++++++++++----------- gateway/node_test.go | 4 +- gateway/proxy_eth.go | 16 ++-- gateway/proxy_fil.go | 12 +-- itests/gateway_test.go | 150 ++++++++++++++++--------------- 8 files changed, 274 insertions(+), 219 deletions(-) diff --git a/cmd/lotus-gateway/main.go b/cmd/lotus-gateway/main.go index 92194d16a55..2e7a1efb431 100644 --- a/cmd/lotus-gateway/main.go +++ b/cmd/lotus-gateway/main.go @@ -124,12 +124,12 @@ var runCmd = &cli.Command{ &cli.DurationFlag{ Name: "api-max-lookback", Usage: "maximum duration allowable for tipset lookbacks", - Value: gateway.DefaultLookbackCap, + Value: gateway.DefaultMaxLookbackDuration, }, &cli.Int64Flag{ Name: "api-wait-lookback-limit", Usage: "maximum number of blocks to search back through for message inclusion", - Value: int64(gateway.DefaultStateWaitLookbackLimit), + Value: int64(gateway.DefaultMaxMessageLookbackEpochs), }, &cli.Int64Flag{ Name: "rate-limit", @@ -154,7 +154,7 @@ var runCmd = &cli.Command{ }, &cli.Int64Flag{ Name: "conn-per-minute", - Usage: "A hard limit on the number of incomming connections (requests) to accept per remote host per minute. Use 0 to disable", + Usage: "A hard limit on the number of incoming connections (requests) to accept per remote host per minute. Use 0 to disable", Value: 0, }, &cli.IntFlag{ @@ -212,13 +212,19 @@ var runCmd = &cli.Command{ gwapi := gateway.NewNode( api, gateway.WithEthSubHandler(subHnd), - gateway.WithLookbackCap(lookbackCap), - gateway.WithStateWaitLookbackLimit(waitLookback), + gateway.WithMaxLookbackDuration(lookbackCap), + gateway.WithMaxMessageLookbackEpochs(waitLookback), gateway.WithRateLimit(globalRateLimit), gateway.WithRateLimitTimeout(rateLimitTimeout), gateway.WithEthMaxFiltersPerConn(maxFiltersPerConn), ) - handler, err := gateway.Handler(gwapi, api, perConnectionRateLimit, perHostConnectionsPerMinute, serverOptions...) + handler, err := gateway.Handler( + gwapi, + api, + gateway.WithPerConnectionAPIRateLimit(perConnectionRateLimit), + gateway.WithPerHostConnectionsPerMinute(perHostConnectionsPerMinute), + gateway.WithJsonrpcServerOptions(serverOptions...), + ) if err != nil { return xerrors.Errorf("failed to set up gateway HTTP handler") } diff --git a/gateway/handler.go b/gateway/handler.go index eef8473288f..78310edb4fd 100644 --- a/gateway/handler.go +++ b/gateway/handler.go @@ -37,33 +37,62 @@ type ShutdownHandler interface { Shutdown(ctx context.Context) error } -var _ ShutdownHandler = &statefulCallHandler{} -var _ ShutdownHandler = &RateLimitHandler{} +var _ ShutdownHandler = (*statefulCallHandler)(nil) +var _ ShutdownHandler = (*RateLimitHandler)(nil) -// Handler returns a gateway http.Handler, to be mounted as-is on the server. The handler is -// returned as a ShutdownHandler which allows for graceful shutdown of the handler via its -// Shutdown method. +// handlerOptions holds the options for the Handler function. +type handlerOptions struct { + perConnectionAPIRateLimit int + perHostConnectionsPerMinute int + jsonrpcServerOptions []jsonrpc.ServerOption +} + +// HandlerOption is a functional option for configuring the Handler. +type HandlerOption func(*handlerOptions) + +// WithPerConnectionAPIRateLimit sets the per connection API rate limit. // // The handler will limit the number of API calls per minute within a single WebSocket connection // (where API calls are weighted by their relative expense), and the number of connections per // minute from a single host. +func WithPerConnectionAPIRateLimit(limit int) HandlerOption { + return func(opts *handlerOptions) { + opts.perConnectionAPIRateLimit = limit + } +} + +// WithPerHostConnectionsPerMinute sets the per host connections per minute limit. // -// Connection limiting is a hard limit that will reject requests with a 429 status code if the limit -// is exceeded. API call limiting is a soft limit that will delay requests if the limit is exceeded. -func Handler( - gwapi lapi.Gateway, - api lapi.FullNode, - perConnectionAPIRateLimit int, - perHostConnectionsPerMinute int, - opts ...jsonrpc.ServerOption, -) (ShutdownHandler, error) { +// Connection limiting is a hard limit that will reject requests with a http.StatusTooManyRequests +// status code if the limit is exceeded. API call limiting is a soft limit that will delay requests +// if the limit is exceeded. +func WithPerHostConnectionsPerMinute(limit int) HandlerOption { + return func(opts *handlerOptions) { + opts.perHostConnectionsPerMinute = limit + } +} - m := mux.NewRouter() +// WithJsonrpcServerOptions sets the JSON-RPC server options. +func WithJsonrpcServerOptions(options ...jsonrpc.ServerOption) HandlerOption { + return func(opts *handlerOptions) { + opts.jsonrpcServerOptions = options + } +} - opts = append(opts, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors)) +// Handler returns a gateway http.Handler, to be mounted as-is on the server. The handler is +// returned as a ShutdownHandler which allows for graceful shutdown of the handler via its +// Shutdown method. +func Handler(gwapi lapi.Gateway, api lapi.FullNode, options ...HandlerOption) (ShutdownHandler, error) { + opts := &handlerOptions{} + for _, option := range options { + option(opts) + } + + m := mux.NewRouter() + rpcopts := append(opts.jsonrpcServerOptions, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors)) serveRpc := func(path string, hnd interface{}) { - rpcServer := jsonrpc.NewServer(opts...) + rpcServer := jsonrpc.NewServer(rpcopts...) rpcServer.Register("Filecoin", hnd) rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover") @@ -91,11 +120,11 @@ func Handler( m.PathPrefix("/").Handler(http.DefaultServeMux) handler := &statefulCallHandler{m} - if perConnectionAPIRateLimit > 0 && perHostConnectionsPerMinute > 0 { + if opts.perConnectionAPIRateLimit > 0 || opts.perHostConnectionsPerMinute > 0 { return NewRateLimitHandler( handler, - perConnectionAPIRateLimit, - perHostConnectionsPerMinute, + opts.perConnectionAPIRateLimit, + opts.perHostConnectionsPerMinute, connectionLimiterCleanupInterval, ), nil } @@ -125,14 +154,15 @@ type hostLimiter struct { } type RateLimitHandler struct { - cancelFunc context.CancelFunc - mu sync.Mutex - limiters map[string]*hostLimiter - perConnectionAPILimit rate.Limit - perHostConnectionsPerMinute int - next http.Handler - cleanupInterval time.Duration - expiryDuration time.Duration + cancelFunc context.CancelFunc + limiters map[string]*hostLimiter + limitersLk sync.Mutex + perConnectionAPILimit rate.Limit + perHostConnectionsLimit rate.Limit + perHostConnectionsLimitBurst int + next http.Handler + cleanupInterval time.Duration + expiryDuration time.Duration } // NewRateLimitHandler creates a new RateLimitHandler that wraps the @@ -149,85 +179,95 @@ func NewRateLimitHandler( ctx, cancel := context.WithCancel(context.Background()) h := &RateLimitHandler{ - cancelFunc: cancel, - limiters: make(map[string]*hostLimiter), - perConnectionAPILimit: rate.Inf, - perHostConnectionsPerMinute: perHostConnectionsPerMinute, - next: next, - cleanupInterval: cleanupInterval, - expiryDuration: 5 * cleanupInterval, + cancelFunc: cancel, + limiters: make(map[string]*hostLimiter), + perConnectionAPILimit: rate.Inf, + perHostConnectionsLimit: rate.Inf, + next: next, + cleanupInterval: cleanupInterval, + expiryDuration: 5 * cleanupInterval, } if perConnectionAPIRateLimit > 0 { h.perConnectionAPILimit = rate.Every(time.Second / time.Duration(perConnectionAPIRateLimit)) } + if perHostConnectionsPerMinute > 0 { + h.perHostConnectionsLimit = rate.Every(time.Minute / time.Duration(perHostConnectionsPerMinute)) + h.perHostConnectionsLimitBurst = perHostConnectionsPerMinute + } go h.cleanupExpiredLimiters(ctx) return h } -func (h *RateLimitHandler) getLimits(host string) *hostLimiter { - h.mu.Lock() - defer h.mu.Unlock() - - entry, exists := h.limiters[host] - if !exists { - var limiter *rate.Limiter - if h.perHostConnectionsPerMinute > 0 { - requestLimit := rate.Every(time.Minute / time.Duration(h.perHostConnectionsPerMinute)) - limiter = rate.NewLimiter(requestLimit, h.perHostConnectionsPerMinute) - } - entry = &hostLimiter{ - limiter: limiter, - lastAccess: time.Now(), +func (h *RateLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if h.perHostConnectionsLimit != rate.Inf { + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return } - h.limiters[host] = entry - } else { - entry.lastAccess = time.Now() - } - - return entry -} -func (h *RateLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - host, _, err := net.SplitHostPort(r.RemoteAddr) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } + h.limitersLk.Lock() + entry, exists := h.limiters[host] + if !exists { + entry = &hostLimiter{ + limiter: rate.NewLimiter(h.perHostConnectionsLimit, h.perHostConnectionsLimitBurst), + lastAccess: time.Now(), + } + h.limiters[host] = entry + } else { + entry.lastAccess = time.Now() + } + h.limitersLk.Unlock() - limits := h.getLimits(host) - if limits.limiter != nil && !limits.limiter.Allow() { - w.WriteHeader(http.StatusTooManyRequests) - return + if !entry.limiter.Allow() { + w.WriteHeader(http.StatusTooManyRequests) + return + } } if h.perConnectionAPILimit != rate.Inf { // new rate limiter for each connection, to throttle a single WebSockets connection; // allow for a burst of MaxRateLimitTokens apiLimiter := rate.NewLimiter(h.perConnectionAPILimit, MaxRateLimitTokens) - r = r.WithContext(context.WithValue(r.Context(), perConnectionAPIRateLimiterKey, apiLimiter)) + r = r.WithContext(setPerConnectionAPIRateLimiter(r.Context(), apiLimiter)) } h.next.ServeHTTP(w, r) } +// setPerConnectionAPIRateLimiter sets the rate limiter in the context. +func setPerConnectionAPIRateLimiter(ctx context.Context, limiter *rate.Limiter) context.Context { + return context.WithValue(ctx, perConnectionAPIRateLimiterKey, limiter) +} + +// getPerConnectionAPIRateLimiter retrieves the rate limiter from the context. +func getPerConnectionAPIRateLimiter(ctx context.Context) (*rate.Limiter, bool) { + limiter, ok := ctx.Value(perConnectionAPIRateLimiterKey).(*rate.Limiter) + return limiter, ok +} + +// cleanupExpiredLimiters periodically checks for limiters that have expired and removes them. func (h *RateLimitHandler) cleanupExpiredLimiters(ctx context.Context) { if h.cleanupInterval == 0 { return } - for { + ticker := time.NewTicker(h.cleanupInterval) + defer ticker.Stop() + + for ctx.Err() == nil { select { case <-ctx.Done(): return - case <-time.After(h.cleanupInterval): - h.mu.Lock() + case <-ticker.C: + h.limitersLk.Lock() now := time.Now() for host, entry := range h.limiters { if now.Sub(entry.lastAccess) > h.expiryDuration { delete(h.limiters, host) } } - h.mu.Unlock() + h.limitersLk.Unlock() } } } diff --git a/gateway/handler_test.go b/gateway/handler_test.go index 65e56836c0a..4599df96f8f 100644 --- a/gateway/handler_test.go +++ b/gateway/handler_test.go @@ -13,7 +13,7 @@ import ( func TestRequestRateLimiterHandler(t *testing.T) { var callCount int h := gateway.NewRateLimitHandler( - http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) { + http.HandlerFunc(func(http.ResponseWriter, *http.Request) { callCount++ }), 0, // api rate diff --git a/gateway/node.go b/gateway/node.go index 0fd0fa57ea7..e3e2c6a4715 100644 --- a/gateway/node.go +++ b/gateway/node.go @@ -35,17 +35,17 @@ import ( var log = logger.Logger("gateway") const ( - DefaultLookbackCap = time.Hour * 24 - DefaultStateWaitLookbackLimit = abi.ChainEpoch(20) - DefaultRateLimitTimeout = time.Second * 5 - DefaultEthMaxFiltersPerConn = 16 - basicRateLimitTokens = 1 - walletRateLimitTokens = 1 - chainRateLimitTokens = 2 - stateRateLimitTokens = 3 - - // MaxRateLimitTokens is the number of tokens consumed for the most expensive types of operations - MaxRateLimitTokens = stateRateLimitTokens + DefaultMaxLookbackDuration = time.Hour * 24 // Default duration that a gateway request can look back in chain history + DefaultMaxMessageLookbackEpochs = abi.ChainEpoch(20) // Default number of epochs that a gateway message lookup can look back in chain history + DefaultRateLimitTimeout = time.Second * 5 // Default timeout for rate limiting requests; where a request would take longer to wait than this value, it will be retjected + DefaultEthMaxFiltersPerConn = 16 // Default maximum number of ETH filters and subscriptions per websocket connection + + basicRateLimitTokens = 1 + walletRateLimitTokens = 1 + chainRateLimitTokens = 2 + stateRateLimitTokens = 3 + + MaxRateLimitTokens = stateRateLimitTokens // Number of tokens consumed for the most expensive types of operations ) // TargetAPI defines the API methods that the Node depends on @@ -164,14 +164,14 @@ type TargetAPI interface { var _ TargetAPI = *new(api.FullNode) // gateway depends on latest type Node struct { - target TargetAPI - subHnd *EthSubHandler - lookbackCap time.Duration - stateWaitLookbackLimit abi.ChainEpoch - rateLimiter *rate.Limiter - rateLimitTimeout time.Duration - ethMaxFiltersPerConn int - errLookback error + target TargetAPI + subHnd *EthSubHandler + maxLookbackDuration time.Duration + maxMessageLookbackEpochs abi.ChainEpoch + rateLimiter *rate.Limiter + rateLimitTimeout time.Duration + ethMaxFiltersPerConn int + errLookback error } var ( @@ -182,43 +182,43 @@ var ( _ full.StateModuleAPI = (*Node)(nil) ) -type Options struct { - subHandler *EthSubHandler - lookbackCap time.Duration - stateWaitLookbackLimit abi.ChainEpoch - rateLimit int - rateLimitTimeout time.Duration - ethMaxFiltersPerConn int +type options struct { + subHandler *EthSubHandler + maxLookbackDuration time.Duration + maxMessageLookbackEpochs abi.ChainEpoch + rateLimit int + rateLimitTimeout time.Duration + ethMaxFiltersPerConn int } -type Option func(*Options) +type Option func(*options) // WithEthSubHandler sets the Ethereum subscription handler for the gateway node. This is used for // the RPC reverse handler for EthSubscribe calls. func WithEthSubHandler(subHandler *EthSubHandler) Option { - return func(opts *Options) { + return func(opts *options) { opts.subHandler = subHandler } } -// WithLookbackCap sets the maximum lookback duration (time) for state queries. -func WithLookbackCap(lookbackCap time.Duration) Option { - return func(opts *Options) { - opts.lookbackCap = lookbackCap +// WithMaxLookbackDuration sets the maximum lookback duration (time) for state queries. +func WithMaxLookbackDuration(maxLookbackDuration time.Duration) Option { + return func(opts *options) { + opts.maxLookbackDuration = maxLookbackDuration } } -// WithStateWaitLookbackLimit sets the maximum lookback (epochs) for state queries. -func WithStateWaitLookbackLimit(stateWaitLookbackLimit abi.ChainEpoch) Option { - return func(opts *Options) { - opts.stateWaitLookbackLimit = stateWaitLookbackLimit +// WithMaxMessageLookbackEpochs sets the maximum lookback (epochs) for state queries. +func WithMaxMessageLookbackEpochs(maxMessageLookbackEpochs abi.ChainEpoch) Option { + return func(opts *options) { + opts.maxMessageLookbackEpochs = maxMessageLookbackEpochs } } // WithRateLimit sets the maximum number of requests per second globally that will be allowed // before the gateway starts to rate limit requests. func WithRateLimit(rateLimit int) Option { - return func(opts *Options) { + return func(opts *options) { opts.rateLimit = rateLimit } } @@ -226,7 +226,7 @@ func WithRateLimit(rateLimit int) Option { // WithRateLimitTimeout sets the timeout for rate limiting requests such that when rate limiting is // being applied, if the timeout is reached the request will be allowed. func WithRateLimitTimeout(rateLimitTimeout time.Duration) Option { - return func(opts *Options) { + return func(opts *options) { opts.rateLimitTimeout = rateLimitTimeout } } @@ -234,18 +234,18 @@ func WithRateLimitTimeout(rateLimitTimeout time.Duration) Option { // WithEthMaxFiltersPerConn sets the maximum number of Ethereum filters and subscriptions that can // be maintained per websocket connection. func WithEthMaxFiltersPerConn(ethMaxFiltersPerConn int) Option { - return func(opts *Options) { + return func(opts *options) { opts.ethMaxFiltersPerConn = ethMaxFiltersPerConn } } // NewNode creates a new gateway node. func NewNode(api TargetAPI, opts ...Option) *Node { - options := &Options{ - lookbackCap: DefaultLookbackCap, - stateWaitLookbackLimit: DefaultStateWaitLookbackLimit, - rateLimitTimeout: DefaultRateLimitTimeout, - ethMaxFiltersPerConn: DefaultEthMaxFiltersPerConn, + options := &options{ + maxLookbackDuration: DefaultMaxLookbackDuration, + maxMessageLookbackEpochs: DefaultMaxMessageLookbackEpochs, + rateLimitTimeout: DefaultRateLimitTimeout, + ethMaxFiltersPerConn: DefaultEthMaxFiltersPerConn, } for _, opt := range opts { opt(options) @@ -256,14 +256,14 @@ func NewNode(api TargetAPI, opts ...Option) *Node { limit = rate.Every(time.Second / time.Duration(options.rateLimit)) } return &Node{ - target: api, - subHnd: options.subHandler, - lookbackCap: options.lookbackCap, - stateWaitLookbackLimit: options.stateWaitLookbackLimit, - rateLimiter: rate.NewLimiter(limit, MaxRateLimitTokens), // allow for a burst of MaxRateLimitTokens - rateLimitTimeout: options.rateLimitTimeout, - errLookback: fmt.Errorf("lookbacks of more than %s are disallowed", options.lookbackCap), - ethMaxFiltersPerConn: options.ethMaxFiltersPerConn, + target: api, + subHnd: options.subHandler, + maxLookbackDuration: options.maxLookbackDuration, + maxMessageLookbackEpochs: options.maxMessageLookbackEpochs, + rateLimiter: rate.NewLimiter(limit, MaxRateLimitTokens), // allow for a burst of MaxRateLimitTokens + rateLimitTimeout: options.rateLimitTimeout, + errLookback: fmt.Errorf("lookbacks of more than %s are disallowed", options.maxLookbackDuration), + ethMaxFiltersPerConn: options.ethMaxFiltersPerConn, } } @@ -303,7 +303,7 @@ func (gw *Node) checkTipsetHeight(ts *types.TipSet, h abi.ChainEpoch) error { } func (gw *Node) checkTimestamp(at time.Time) error { - if time.Since(at) > gw.lookbackCap { + if time.Since(at) > gw.maxLookbackDuration { return gw.errLookback } return nil @@ -312,7 +312,8 @@ func (gw *Node) checkTimestamp(at time.Time) error { func (gw *Node) limit(ctx context.Context, tokens int) error { ctx2, cancel := context.WithTimeout(ctx, gw.rateLimitTimeout) defer cancel() - if perConnLimiter, ok := ctx2.Value(perConnectionAPIRateLimiterKey).(*rate.Limiter); ok { + + if perConnLimiter, ok := getPerConnectionAPIRateLimiter(ctx); ok { err := perConnLimiter.WaitN(ctx2, tokens) if err != nil { return fmt.Errorf("connection limited. %w", err) diff --git a/gateway/node_test.go b/gateway/node_test.go index 99c46d8aa8d..fd70f8f214b 100644 --- a/gateway/node_test.go +++ b/gateway/node_test.go @@ -23,7 +23,7 @@ import ( func TestGatewayAPIChainGetTipSetByHeight(t *testing.T) { ctx := context.Background() - lookbackTimestamp := uint64(time.Now().Unix()) - uint64(DefaultLookbackCap.Seconds()) + lookbackTimestamp := uint64(time.Now().Unix()) - uint64(DefaultMaxLookbackDuration.Seconds()) type args struct { h abi.ChainEpoch tskh abi.ChainEpoch @@ -264,7 +264,7 @@ func TestGatewayLimitTokensRate(t *testing.T) { ctx := context.Background() mock := &mockGatewayDepsAPI{} tokens := 3 - var rateLimit = 200 + rateLimit := 200 rateLimitTimeout := time.Second / time.Duration(rateLimit/3) // large enough to not be hit a := NewNode(mock, WithRateLimit(rateLimit), WithRateLimitTimeout(rateLimitTimeout)) diff --git a/gateway/proxy_eth.go b/gateway/proxy_eth.go index 323499deae0..5d823b2c010 100644 --- a/gateway/proxy_eth.go +++ b/gateway/proxy_eth.go @@ -23,7 +23,7 @@ import ( "github.com/filecoin-project/lotus/chain/types/ethtypes" ) -var ErrTooManyFilters = errors.New("too many subscriptions and filters for this connection") +var ErrTooManyFilters = errors.New("too many subscriptions and filters per connection") func (gw *Node) EthAccounts(ctx context.Context) ([]ethtypes.EthAddress, error) { // gateway provides public API, so it can't hold user accounts @@ -209,10 +209,10 @@ func (gw *Node) EthGetTransactionByHashLimited(ctx context.Context, txHash *etht return nil, err } if limit == api.LookbackNoLimit { - limit = gw.stateWaitLookbackLimit + limit = gw.maxMessageLookbackEpochs } - if gw.stateWaitLookbackLimit != api.LookbackNoLimit && limit > gw.stateWaitLookbackLimit { - limit = gw.stateWaitLookbackLimit + if gw.maxMessageLookbackEpochs != api.LookbackNoLimit && limit > gw.maxMessageLookbackEpochs { + limit = gw.maxMessageLookbackEpochs } return gw.target.EthGetTransactionByHashLimited(ctx, txHash, limit) @@ -255,10 +255,10 @@ func (gw *Node) EthGetTransactionReceiptLimited(ctx context.Context, txHash etht return nil, err } if limit == api.LookbackNoLimit { - limit = gw.stateWaitLookbackLimit + limit = gw.maxMessageLookbackEpochs } - if gw.stateWaitLookbackLimit != api.LookbackNoLimit && limit > gw.stateWaitLookbackLimit { - limit = gw.stateWaitLookbackLimit + if gw.maxMessageLookbackEpochs != api.LookbackNoLimit && limit > gw.maxMessageLookbackEpochs { + limit = gw.maxMessageLookbackEpochs } return gw.target.EthGetTransactionReceiptLimited(ctx, txHash, limit) @@ -524,6 +524,7 @@ func (gw *Node) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) ok, err := gw.target.EthUninstallFilter(ctx, id) if err != nil { // don't delete the filter, it's "stuck" so should still count towards the limit + log.Warnf("error uninstalling filter: %v", err) return false, err } @@ -608,6 +609,7 @@ func (gw *Node) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionI ok, err := gw.target.EthUnsubscribe(ctx, id) if err != nil { // don't delete the subscription, it's "stuck" so should still count towards the limit + log.Warnf("error unsubscribing: %v", err) return false, err } diff --git a/gateway/proxy_fil.go b/gateway/proxy_fil.go index 9daa0796d3a..59fa511b506 100644 --- a/gateway/proxy_fil.go +++ b/gateway/proxy_fil.go @@ -413,10 +413,10 @@ func (gw *Node) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg ci return nil, err } if limit == api.LookbackNoLimit { - limit = gw.stateWaitLookbackLimit + limit = gw.maxMessageLookbackEpochs } - if gw.stateWaitLookbackLimit != api.LookbackNoLimit && limit > gw.stateWaitLookbackLimit { - limit = gw.stateWaitLookbackLimit + if gw.maxMessageLookbackEpochs != api.LookbackNoLimit && limit > gw.maxMessageLookbackEpochs { + limit = gw.maxMessageLookbackEpochs } if err := gw.checkTipsetKey(ctx, from); err != nil { return nil, err @@ -429,10 +429,10 @@ func (gw *Node) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64 return nil, err } if limit == api.LookbackNoLimit { - limit = gw.stateWaitLookbackLimit + limit = gw.maxMessageLookbackEpochs } - if gw.stateWaitLookbackLimit != api.LookbackNoLimit && limit > gw.stateWaitLookbackLimit { - limit = gw.stateWaitLookbackLimit + if gw.maxMessageLookbackEpochs != api.LookbackNoLimit && limit > gw.maxMessageLookbackEpochs { + limit = gw.maxMessageLookbackEpochs } return gw.target.StateWaitMsg(ctx, msg, confidence, limit, allowReplaced) } diff --git a/itests/gateway_test.go b/itests/gateway_test.go index 340546ca902..b0f3157efe8 100644 --- a/itests/gateway_test.go +++ b/itests/gateway_test.go @@ -37,8 +37,8 @@ import ( ) const ( - maxLookbackCap = time.Duration(math.MaxInt64) - maxStateWaitLookbackLimit = stmgr.LookbackNoLimit + maxLookbackCap = time.Duration(math.MaxInt64) + maxMessageLookbackEpochs = stmgr.LookbackNoLimit ) // TestGatewayWalletMsig tests that API calls to wallet and msig can be made on a lite @@ -206,23 +206,23 @@ type testNodes struct { } type startOptions struct { - blocktime time.Duration - lookbackCap time.Duration - stateWaitLookbackLimit abi.ChainEpoch - fund bool - perConnectionAPIRateLimit int - perHostRequestsPerMinute int - nodeOpts []kit.NodeOpt + blockTime time.Duration + lookbackCap time.Duration + maxMessageLookbackEpochs abi.ChainEpoch + fund bool + perConnectionAPIRateLimit int + perHostConnectionsPerMinute int + nodeOpts []kit.NodeOpt } type startOption func(*startOptions) -func applyStartOptions(opts ...startOption) startOptions { +func newStartOptions(opts ...startOption) startOptions { o := startOptions{ - blocktime: 5 * time.Millisecond, - lookbackCap: maxLookbackCap, - stateWaitLookbackLimit: maxStateWaitLookbackLimit, - fund: false, + blockTime: 5 * time.Millisecond, + lookbackCap: maxLookbackCap, + maxMessageLookbackEpochs: maxMessageLookbackEpochs, + fund: false, } for _, opt := range opts { opt(&o) @@ -244,7 +244,7 @@ func withPerConnectionAPIRateLimit(rateLimit int) startOption { func withPerHostRequestsPerMinute(rateLimit int) startOption { return func(opts *startOptions) { - opts.perHostRequestsPerMinute = rateLimit + opts.perHostConnectionsPerMinute = rateLimit } } @@ -255,7 +255,7 @@ func withNodeOpts(nodeOpts ...kit.NodeOpt) startOption { } func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNodes { - options := applyStartOptions(opts...) + options := newStartOptions(opts...) var ( full *kit.TestFullNode @@ -271,7 +271,7 @@ func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNod // create the full node and the miner. var ens *kit.Ensemble full, miner, ens = kit.EnsembleMinimal(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(options.blocktime) + ens.InterconnectAll().BeginMining(options.blockTime) api.RunningNodeType = api.NodeFull // Create a gateway server in front of the full node @@ -279,10 +279,15 @@ func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNod gwapi := gateway.NewNode( full, gateway.WithEthSubHandler(ethSubHandler), - gateway.WithLookbackCap(options.lookbackCap), - gateway.WithStateWaitLookbackLimit(options.stateWaitLookbackLimit), + gateway.WithMaxLookbackDuration(options.lookbackCap), + gateway.WithMaxMessageLookbackEpochs(options.maxMessageLookbackEpochs), + ) + handler, err := gateway.Handler( + gwapi, + full, + gateway.WithPerConnectionAPIRateLimit(options.perConnectionAPIRateLimit), + gateway.WithPerHostConnectionsPerMinute(options.perHostConnectionsPerMinute), ) - handler, err := gateway.Handler(gwapi, full, options.perConnectionAPIRateLimit, options.perHostRequestsPerMinute) t.Cleanup(func() { _ = handler.Shutdown(ctx) }) require.NoError(t, err) @@ -293,14 +298,14 @@ func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNod // Create a gateway client API that connects to the gateway server var gapi api.Gateway - var _closer jsonrpc.ClientCloser - gapi, _closer, err = client.NewGatewayRPCV1(ctx, "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil, + var rpcCloser jsonrpc.ClientCloser + gapi, rpcCloser, err = client.NewGatewayRPCV1(ctx, "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil, jsonrpc.WithClientHandler("Filecoin", ethSubHandler), jsonrpc.WithClientHandlerAlias("eth_subscription", "Filecoin.EthSubscription"), ) require.NoError(t, err) var closeOnce sync.Once - closer := func() { closeOnce.Do(_closer) } + closer := func() { closeOnce.Do(rpcCloser) } t.Cleanup(closer) nodeOpts := append([]kit.NodeOpt{ @@ -373,13 +378,13 @@ func TestGatewayRateLimits(t *testing.T) { withPerHostRequestsPerMinute(requestsPerMinute), ) - time.Sleep(time.Second) + nodes.full.WaitTillChain(ctx, kit.HeightAtLeast(10)) // let's get going first // ChainHead uses chainRateLimitTokens=2. // But we're also competing with the paymentChannelSettler which listens to the chain uses // ChainGetBlockMessages on each change, which also uses chainRateLimitTokens=2. // So each loop should be 4 tokens. - loops := 10 + loops := 25 tokensPerLoop := 4 start := time.Now() for i := 0; i < loops; i++ { @@ -393,37 +398,40 @@ func TestGatewayRateLimits(t *testing.T) { req.WithinDuration(expectedEnd, time.Now(), allowPad) client := &http.Client{} - url := fmt.Sprintf("http://%s/rpc/v1", nodes.gatewayAddr) jsonPayload := []byte(`{"method":"Filecoin.ChainHead","params":[],"id":1,"jsonrpc":"2.0"}`) var failed bool for i := 0; i < requestsPerMinute*2 && !failed; i++ { - func() { - request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonPayload)) - req.NoError(err) - request.Header.Set("Content-Type", "application/json") - response, err := client.Do(request) - req.NoError(err) - defer func() { _ = response.Body.Close() }() - if http.StatusOK == response.StatusCode { - body, err := io.ReadAll(response.Body) - req.NoError(err) - result := map[string]interface{}{} - req.NoError(json.Unmarshal(body, &result)) - req.NoError(err) - req.NotNil(result["result"]) - height, ok := result["result"].(map[string]interface{})["Height"].(float64) - req.True(ok) - req.Greater(int(height), 0) - } else { - req.Equal(http.StatusTooManyRequests, response.StatusCode) - req.LessOrEqual(i, requestsPerMinute+1) - failed = true - } - }() + status, body := makeManualRpcCall(t, client, nodes.gatewayAddr, string(jsonPayload)) + if http.StatusOK == status { + result := map[string]interface{}{} + req.NoError(json.Unmarshal([]byte(body), &result)) + req.NotNil(result["result"]) + height, ok := result["result"].(map[string]interface{})["Height"].(float64) + req.True(ok) + req.Greater(int(height), 0) + } else { + req.Equal(http.StatusTooManyRequests, status) + req.LessOrEqual(i, requestsPerMinute+1) + failed = true + } } req.True(failed, "expected requests to fail due to rate limiting") } +func makeManualRpcCall(t *testing.T, client *http.Client, gatewayAddr, payload string) (int, string) { + // not available over plain http + url := fmt.Sprintf("http://%s/rpc/v1", gatewayAddr) + request, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(payload))) + require.NoError(t, err) + request.Header.Set("Content-Type", "application/json") + response, err := client.Do(request) + require.NoError(t, err) + defer func() { _ = response.Body.Close() }() + body, err := io.ReadAll(response.Body) + require.NoError(t, err) + return response.StatusCode, string(body) +} + func TestStatefulCallHandling(t *testing.T) { req := require.New(t) @@ -431,21 +439,6 @@ func TestStatefulCallHandling(t *testing.T) { ctx := context.Background() nodes := startNodes(ctx, t) - httpReq := func(payload string) (int, string) { - // not available over plain http - client := &http.Client{} - url := fmt.Sprintf("http://%s/rpc/v1", nodes.gatewayAddr) - request, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(payload))) - req.NoError(err) - request.Header.Set("Content-Type", "application/json") - response, err := client.Do(request) - req.NoError(err) - defer func() { _ = response.Body.Close() }() - body, err := io.ReadAll(response.Body) - req.NoError(err) - return response.StatusCode, string(body) - } - t.Logf("Testing stateful call handling rejection via plain http") for _, typ := range []string{ "EthNewBlockFilter", @@ -470,7 +463,12 @@ func TestStatefulCallHandling(t *testing.T) { expErr = "EthSubscribe not supported: connection doesn't support callbacks" } - status, body := httpReq(`{"method":"Filecoin.` + typ + `","params":[` + params + `],"id":1,"jsonrpc":"2.0"}`) + status, body := makeManualRpcCall( + t, + &http.Client{}, + nodes.gatewayAddr, + `{"method":"Filecoin.`+typ+`","params":[`+params+`],"id":1,"jsonrpc":"2.0"}`, + ) req.Equal(http.StatusOK, status, "not ok for "+typ) req.Contains(body, `{"error":{"code":1,"message":"`+expErr+`"},"id":1,"jsonrpc":"2.0"}`, "unexpected response for "+typ) @@ -532,25 +530,33 @@ func TestStatefulCallHandling(t *testing.T) { t.Logf("Testing 'too many filters' rejection") _, err = nodes.lite.EthNewBlockFilter(ctx) - require.ErrorContains(t, err, "too many subscriptions and filters for this connection") + require.ErrorContains(t, err, "too many subscriptions and filters per connection") _, err = nodes.lite.EthNewPendingTransactionFilter(ctx) - require.ErrorContains(t, err, "too many subscriptions and filters for this connection") + require.ErrorContains(t, err, "too many subscriptions and filters per connection") _, err = nodes.lite.EthNewFilter(ctx, ðtypes.EthFilterSpec{}) - require.ErrorContains(t, err, "too many subscriptions and filters for this connection") + require.ErrorContains(t, err, "too many subscriptions and filters per connection") _, err = nodes.lite.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(req.NoError)) - require.ErrorContains(t, err, "too many subscriptions and filters for this connection") + require.ErrorContains(t, err, "too many subscriptions and filters per connection") t.Logf("Shutting down the lite node") req.NoError(nodes.lite.Stop(ctx)) - nodes.rpcCloser() // once the websocke connection is closed, the server should clean up the filters for us - time.Sleep(time.Second) // unfortunately we have no other way to check for completeness of shutdown and cleanup + nodes.rpcCloser() + // Once the websocket connection is closed, the server should clean up the filters for us. + // Unfortunately we have no other way to check for completeness of shutdown and cleanup. + // * Asynchronously the rpcCloser() call will end the client websockets connection to the gateway. + // * When the gateway recognises the end of the HTTP connection, it will asynchronously make calls + // to the fullnode to clean up the filters. + // * The fullnode will then uninstall the filters and we can finally move on to check it directly + // that this has happened. + // This should happen quickly, but we have no way to synchronously check for it. So we just wait a bit. + time.Sleep(time.Second) t.Logf("Checking that all filters and subs were cleared up by directly talking to full node") ok, err = nodes.full.EthUnsubscribe(ctx, subId2) // unsub on full node, already done req.NoError(err) - req.False(ok) // already unsubbed because of auto-cleanup + req.False(ok) // already unsubscribed because of auto-cleanup for _, fid := range blockFilterIds { _, err = nodes.full.EthGetFilterChanges(ctx, fid)