From 38346128d8bbf0317c15725959c9c93e5c041d0e Mon Sep 17 00:00:00 2001 From: Michael Tsitrin Date: Thu, 22 Jun 2023 20:36:50 +0300 Subject: [PATCH] added subscription limits check --- rpc/client/client.go | 19 +++++++++++++++---- rpc/json/service.go | 5 ++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/rpc/client/client.go b/rpc/client/client.go index 8a0ca70b1..28dae2c06 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -3,10 +3,13 @@ package client import ( "context" "errors" + "fmt" "sort" "time" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" + rconfig "github.com/dymensionxyz/dymint/config" abciconv "github.com/dymensionxyz/dymint/conv/abci" "github.com/dymensionxyz/dymint/mempool" @@ -97,10 +100,8 @@ func (c *Client) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.Re // This code is a local client, so we can assume that subscriber is "" subscriber := "" //ctx.RemoteAddr() - if c.EventBus.NumClients() >= c.config.MaxSubscriptionClients { - return nil, fmt.Errorf("max_subscription_clients %d reached", c.config.MaxSubscriptionClients) - } else if c.EventBus.NumClientSubscriptions(subscriber) >= c.config.MaxSubscriptionsPerClient { - return nil, fmt.Errorf("max_subscriptions_per_client %d reached", c.config.MaxSubscriptionsPerClient) + if err := c.IsSubscriptionAllowed(subscriber); err != nil { + return nil, sdkerrors.Wrap(err, "subscription not allowed") } // Subscribe to tx being committed in block. @@ -878,6 +879,16 @@ func (c *Client) normalizeHeight(height *int64) uint64 { return heightValue } +func (c *Client) IsSubscriptionAllowed(subscriber string) error { + if c.EventBus.NumClients() >= c.config.MaxSubscriptionClients { + return fmt.Errorf("max_subscription_clients %d reached", c.config.MaxSubscriptionClients) + } else if c.EventBus.NumClientSubscriptions(subscriber) >= c.config.MaxSubscriptionsPerClient { + return fmt.Errorf("max_subscriptions_per_client %d reached", c.config.MaxSubscriptionsPerClient) + } + + return nil +} + func validatePerPage(perPagePtr *int) int { if perPagePtr == nil { // no per_page parameter return defaultPerPage diff --git a/rpc/json/service.go b/rpc/json/service.go index e99ed98be..2ca67b739 100644 --- a/rpc/json/service.go +++ b/rpc/json/service.go @@ -9,6 +9,7 @@ import ( "strconv" "time" + "cosmossdk.io/errors" "github.com/gorilla/rpc/v2/json2" "github.com/tendermint/tendermint/libs/pubsub" tmquery "github.com/tendermint/tendermint/libs/pubsub/query" @@ -91,7 +92,9 @@ func newService(c *client.Client, l log.Logger) *service { func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsConn, subscriptionID []byte) (*ctypes.ResultSubscribe, error) { addr := req.RemoteAddr - // TODO(tzdybal): pass config and check subscriptions limits + if err := s.client.IsSubscriptionAllowed(addr); err != nil { + return nil, errors.Wrap(err, "subscription not allowed") + } q, err := tmquery.New(args.Query) if err != nil {