From b2a8133d9c343966c90ea821a03faa0a35eaec4b Mon Sep 17 00:00:00 2001 From: dkeysil Date: Fri, 8 Mar 2024 10:23:59 +0100 Subject: [PATCH] JsonRPC Cache Add cache test Add json rpc cache to executable Make cache config optional, debug logs in cache, fix dispatcher url path Refactor json rpc cache Fix cache keys Use mappers from the forta-core-go Move jsonrpc changes of proxy to cache package Tweak events polling in cache Use go-cache package Rename cbe (Combined Block Event) to BlockData Add retries to r2_client Check 404 error body, check if node has assigned bots Inject sdk cache configuration to agents Refactor Fix registry usage Add logging Update forta-core-go Metrics draft Fix nil panic Fix params string Add log on cache miss Improve polling Increase bucket time if no bots assigned Decrease max backoff to 10 seconds Fix bug in r2 client Retry on 'Block too old' Clean metrics in jsonrpc cache --- clients/blocksdata/r2_client.go | 122 +++++++++++ clients/interfaces.go | 5 + clients/mocks/mock_clients.go | 39 ++++ cmd/json-rpc/main.go | 23 ++ config/config.go | 6 + config/defaults.go | 1 + config/env.go | 9 +- go.mod | 7 +- go.sum | 6 +- services/components/containers/definitions.go | 5 + services/components/metrics/metrics.go | 22 ++ services/json-rpc/cache/cache.go | 96 ++++++++ services/json-rpc/cache/cache_test.go | 105 +++++++++ services/json-rpc/cache/json_rpc_cache.go | 205 ++++++++++++++++++ .../json-rpc/cache/json_rpc_cache_test.go | 116 ++++++++++ services/json-rpc/cache/jsonrpc.go | 122 +++++++++++ services/json-rpc/decode_test.go | 1 + services/supervisor/services.go | 3 + 18 files changed, 886 insertions(+), 7 deletions(-) create mode 100644 clients/blocksdata/r2_client.go create mode 100644 services/json-rpc/cache/cache.go create mode 100644 services/json-rpc/cache/cache_test.go create mode 100644 services/json-rpc/cache/json_rpc_cache.go create mode 100644 services/json-rpc/cache/json_rpc_cache_test.go create mode 100644 services/json-rpc/cache/jsonrpc.go diff --git a/clients/blocksdata/r2_client.go b/clients/blocksdata/r2_client.go new file mode 100644 index 00000000..fd5b5912 --- /dev/null +++ b/clients/blocksdata/r2_client.go @@ -0,0 +1,122 @@ +package blocksdata + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/url" + "time" + + "github.com/andybalholm/brotli" + backoff "github.com/cenkalti/backoff/v4" + "github.com/forta-network/forta-core-go/protocol" + "github.com/forta-network/forta-core-go/utils/httpclient" + "google.golang.org/protobuf/proto" +) + +const ( + minBackoff = 1 * time.Second + maxBackoff = 10 * time.Second + maxElapsedTime = 5 * time.Minute +) + +type blocksDataClient struct { + dispatcherURL *url.URL +} + +func NewBlocksDataClient(dispatcherURL string) *blocksDataClient { + u, _ := url.Parse(dispatcherURL) + + return &blocksDataClient{ + dispatcherURL: u, + } +} + +type PresignedURLItem struct { + Bucket int64 `json:"bucket"` + PresignedURL string `json:"presignedURL"` + ExpiresAt int64 `json:"expiresAt"` +} + +func (c *blocksDataClient) GetBlocksData(bucket int64) (_ *protocol.BlocksData, err error) { + dispatcherUrl, err := url.JoinPath(c.dispatcherURL.String(), fmt.Sprintf("%d", bucket)) + if err != nil { + return nil, err + } + + bo := backoff.NewExponentialBackOff() + bo.InitialInterval = minBackoff + bo.MaxInterval = maxBackoff + bo.MaxElapsedTime = maxElapsedTime + + var item PresignedURLItem + + err = backoff.Retry(func() error { + resp, err := httpclient.Default.Get(dispatcherUrl) + if err != nil { + return err + } + + defer resp.Body.Close() + + b, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + + if resp.StatusCode == 404 && bytes.Contains(b, []byte("too old")) { + return fmt.Errorf("%s", b) + } + + if resp.StatusCode != 200 { + return fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, b) + } + + err = json.Unmarshal(b, &item) + if err != nil { + return err + } + + if item.ExpiresAt < time.Now().Unix() { + return backoff.Permanent(fmt.Errorf("presigned URL expired")) + } + + return nil + }, bo) + + if err != nil { + return nil, err + } + + var blocks protocol.BlocksData + + err = backoff.Retry(func() error { + resp, err := httpclient.Default.Get(item.PresignedURL) + if err != nil { + return err + } + + if resp.StatusCode != 200 { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + b, err := io.ReadAll(brotli.NewReader(resp.Body)) + if err != nil { + return err + } + + err = proto.Unmarshal(b, &blocks) + if err != nil { + return backoff.Permanent(err) + } + + return nil + }, bo) + + if err != nil { + return nil, err + } + + return &blocks, nil +} diff --git a/clients/interfaces.go b/clients/interfaces.go index 68671499..0c550456 100644 --- a/clients/interfaces.go +++ b/clients/interfaces.go @@ -7,6 +7,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/events" "github.com/forta-network/forta-core-go/domain" + "github.com/forta-network/forta-core-go/protocol" "github.com/forta-network/forta-node/clients/docker" "github.com/forta-network/forta-node/config" "github.com/golang/protobuf/proto" @@ -68,3 +69,7 @@ type IPAuthenticator interface { FindContainerNameFromRemoteAddr(ctx context.Context, hostPort string) (string, error) FindAgentByContainerName(containerName string) (*config.AgentConfig, error) } + +type BlocksDataClient interface { + GetBlocksData(bucket int64) (*protocol.BlocksData, error) +} diff --git a/clients/mocks/mock_clients.go b/clients/mocks/mock_clients.go index 20daf87a..85ef8ce4 100644 --- a/clients/mocks/mock_clients.go +++ b/clients/mocks/mock_clients.go @@ -12,6 +12,7 @@ import ( types "github.com/docker/docker/api/types" events "github.com/docker/docker/api/types/events" domain "github.com/forta-network/forta-core-go/domain" + protocol "github.com/forta-network/forta-core-go/protocol" docker "github.com/forta-network/forta-node/clients/docker" config "github.com/forta-network/forta-node/config" gomock "github.com/golang/mock/gomock" @@ -708,3 +709,41 @@ func (mr *MockIPAuthenticatorMockRecorder) FindContainerNameFromRemoteAddr(ctx, mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindContainerNameFromRemoteAddr", reflect.TypeOf((*MockIPAuthenticator)(nil).FindContainerNameFromRemoteAddr), ctx, hostPort) } + +// MockBlocksDataClient is a mock of BlocksDataClient interface. +type MockBlocksDataClient struct { + ctrl *gomock.Controller + recorder *MockBlocksDataClientMockRecorder +} + +// MockBlocksDataClientMockRecorder is the mock recorder for MockBlocksDataClient. +type MockBlocksDataClientMockRecorder struct { + mock *MockBlocksDataClient +} + +// NewMockBlocksDataClient creates a new mock instance. +func NewMockBlocksDataClient(ctrl *gomock.Controller) *MockBlocksDataClient { + mock := &MockBlocksDataClient{ctrl: ctrl} + mock.recorder = &MockBlocksDataClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBlocksDataClient) EXPECT() *MockBlocksDataClientMockRecorder { + return m.recorder +} + +// GetBlocksData mocks base method. +func (m *MockBlocksDataClient) GetBlocksData(bucket int64) (*protocol.BlocksData, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetBlocksData", bucket) + ret0, _ := ret[0].(*protocol.BlocksData) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetBlocksData indicates an expected call of GetBlocksData. +func (mr *MockBlocksDataClientMockRecorder) GetBlocksData(bucket interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlocksData", reflect.TypeOf((*MockBlocksDataClient)(nil).GetBlocksData), bucket) +} diff --git a/cmd/json-rpc/main.go b/cmd/json-rpc/main.go index ebe86633..37a780b5 100644 --- a/cmd/json-rpc/main.go +++ b/cmd/json-rpc/main.go @@ -8,29 +8,52 @@ import ( "github.com/forta-network/forta-node/config" "github.com/forta-network/forta-node/healthutils" "github.com/forta-network/forta-node/services" + "github.com/forta-network/forta-node/services/components/registry" jrp "github.com/forta-network/forta-node/services/json-rpc" + jrpcache "github.com/forta-network/forta-node/services/json-rpc/cache" ) func initJsonRpcProxy(ctx context.Context, cfg config.Config) (*jrp.JsonRpcProxy, error) { return jrp.NewJsonRpcProxy(ctx, cfg) } +func initJsonRpcCache(ctx context.Context, cfg config.Config, botRegistry registry.BotRegistry) (*jrpcache.JsonRpcCache, error) { + return jrpcache.NewJsonRpcCache(ctx, cfg.JsonRpcCache, botRegistry) +} + func initServices(ctx context.Context, cfg config.Config) ([]services.Service, error) { // can't dial localhost - need to dial host gateway from container cfg.Scan.JsonRpc.Url = utils.ConvertToDockerHostURL(cfg.Scan.JsonRpc.Url) cfg.JsonRpcProxy.JsonRpc.Url = utils.ConvertToDockerHostURL(cfg.JsonRpcProxy.JsonRpc.Url) + cfg.Registry.JsonRpc.Url = utils.ConvertToDockerHostURL(cfg.Registry.JsonRpc.Url) proxy, err := initJsonRpcProxy(ctx, cfg) if err != nil { return nil, err } + key, err := config.LoadKeyInContainer(cfg) + if err != nil { + return nil, err + } + + botRegistry, err := registry.New(cfg, key.Address) + if err != nil { + return nil, err + } + + cache, err := initJsonRpcCache(ctx, cfg, botRegistry) + if err != nil { + return nil, err + } + return []services.Service{ health.NewService( ctx, "", healthutils.DefaultHealthServerErrHandler, health.CheckerFrom(summarizeReports, proxy), ), proxy, + cache, }, nil } diff --git a/config/config.go b/config/config.go index 8041d6c7..525f5c04 100644 --- a/config/config.go +++ b/config/config.go @@ -44,6 +44,11 @@ type JsonRpcProxyConfig struct { RateLimitConfig *RateLimitConfig `yaml:"rateLimit" json:"rateLimit"` } +type JsonRpcCacheConfig struct { + DispatcherURL string `yaml:"dispatcherUrl" json:"dispatcherUrl" default:"https://dispatcher.forta.network/batch" validate:"omitempty,url"` + CacheExpirePeriodSeconds int `yaml:"cacheExpirePeriodSeconds" json:"cacheExpirePeriodSeconds" default:"300"` +} + type LogConfig struct { Level string `yaml:"level" json:"level" default:"info" ` MaxLogSize string `yaml:"maxLogSize" json:"maxLogSize" default:"50m" ` @@ -227,6 +232,7 @@ type Config struct { Registry RegistryConfig `yaml:"registry" json:"registry"` Publish PublisherConfig `yaml:"publish" json:"publish"` JsonRpcProxy JsonRpcProxyConfig `yaml:"jsonRpcProxy" json:"jsonRpcProxy"` + JsonRpcCache JsonRpcCacheConfig `yaml:"jsonRpcCache" json:"jsonRpcCache"` PublicAPIProxy PublicAPIProxyConfig `yaml:"publicApiProxy" json:"publicApiProxy"` Log LogConfig `yaml:"log" json:"log"` ResourcesConfig ResourcesConfig `yaml:"resources" json:"resources"` diff --git a/config/defaults.go b/config/defaults.go index 70145b46..3987b401 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -14,5 +14,6 @@ const ( DefaultPublicAPIProxyPort = "8535" DefaultJSONRPCProxyPort = "8545" DefaultBotHealthCheckPort = "8565" + DefaultBotJSONRPCCachePort = "8575" DefaultFortaNodeBinaryPath = "/forta-node" // the path for the common binary in the container image ) diff --git a/config/env.go b/config/env.go index 493a1347..cb8f4364 100644 --- a/config/env.go +++ b/config/env.go @@ -6,8 +6,6 @@ const ( EnvReleaseInfo = "FORTA_RELEASE_INFO" // Agent env vars - EnvJsonRpcHost = "JSON_RPC_HOST" - EnvJsonRpcPort = "JSON_RPC_PORT" EnvJWTProviderHost = "FORTA_JWT_PROVIDER_HOST" EnvJWTProviderPort = "FORTA_JWT_PROVIDER_PORT" EnvPublicAPIProxyHost = "FORTA_PUBLIC_API_PROXY_HOST" @@ -20,4 +18,11 @@ const ( EnvFortaShardID = "FORTA_SHARD_ID" EnvFortaShardCount = "FORTA_SHARD_COUNT" EnvFortaTokenExchangeURL = "FORTA_TOKEN_EXCHANGE_URL" + + EnvJsonRpcHost = "JSON_RPC_HOST" + EnvJsonRpcPort = "JSON_RPC_PORT" + EnvCacheJsonRpcCachePort = "JSON_RPC_CACHE_PORT" + EnvCacheRequestTimeout = "JSON_RPC_CACHE_TIMEOUT" + EnvCacheRequestInterval = "JSON_RPC_CACHE_INTERVAL" + EnvCacheSupportedChains = "JSON_RPC_CACHE_SUPPORTED_CHAINS" ) diff --git a/go.mod b/go.mod index 875a5eef..fd31eba8 100644 --- a/go.mod +++ b/go.mod @@ -39,12 +39,15 @@ require ( replace github.com/docker/docker => github.com/moby/moby v20.10.25+incompatible require ( + github.com/andybalholm/brotli v1.1.0 + github.com/cenkalti/backoff/v4 v4.1.3 github.com/docker/docker v1.6.2 github.com/docker/go-connections v0.4.0 - github.com/forta-network/forta-core-go v0.0.0-20240306085049-a1ac54ae90f5 + github.com/forta-network/forta-core-go v0.0.0-20240315154515-c71e1f3bd5e2 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.39.0 + google.golang.org/protobuf v1.28.1 ) require ( @@ -60,7 +63,6 @@ require ( github.com/blang/semver/v4 v4.0.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect - github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.0.4 // indirect github.com/coreos/go-systemd/v22 v22.4.0 // indirect @@ -286,7 +288,6 @@ require ( golang.org/x/tools v0.2.0 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect - google.golang.org/protobuf v1.28.1 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 1c11dbbe..02c846d2 100644 --- a/go.sum +++ b/go.sum @@ -109,6 +109,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= @@ -329,8 +331,8 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= -github.com/forta-network/forta-core-go v0.0.0-20240306085049-a1ac54ae90f5 h1:bj2OqjhoCRKUYlSKySH3kWanC77QnlWZsQrSCaw7FDg= -github.com/forta-network/forta-core-go v0.0.0-20240306085049-a1ac54ae90f5/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo= +github.com/forta-network/forta-core-go v0.0.0-20240315154515-c71e1f3bd5e2 h1:v+snSZVsMUPtPX6pI5oxULTWiAfJ1igeE0Iqilma7/Y= +github.com/forta-network/forta-core-go v0.0.0-20240315154515-c71e1f3bd5e2/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo= github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0= github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= diff --git a/services/components/containers/definitions.go b/services/components/containers/definitions.go index 83fd07ad..af8ef816 100644 --- a/services/components/containers/definitions.go +++ b/services/components/containers/definitions.go @@ -5,6 +5,7 @@ import ( "github.com/forta-network/forta-node/clients/docker" "github.com/forta-network/forta-node/config" + jrpcache "github.com/forta-network/forta-node/services/json-rpc/cache" ) // Label values @@ -44,6 +45,10 @@ func NewBotContainerConfig( config.EnvFortaBotOwner: botConfig.Owner, config.EnvFortaHealthCheckPort: config.DefaultBotHealthCheckPort, config.EnvFortaTokenExchangeURL: tokenExchangeURL, + config.EnvCacheJsonRpcCachePort: config.DefaultBotJSONRPCCachePort, + config.EnvCacheRequestTimeout: jrpcache.BotCacheRequestTimeoutSeconds, + config.EnvCacheRequestInterval: jrpcache.BotCacheRequestIntervalSeconds, + config.EnvCacheSupportedChains: jrpcache.BotCacheSupportedChains, } if botConfig.ChainID > 0 { env[config.EnvFortaChainID] = fmt.Sprintf("%d", botConfig.ChainID) diff --git a/services/components/metrics/metrics.go b/services/components/metrics/metrics.go index fa0e540f..638e352b 100644 --- a/services/components/metrics/metrics.go +++ b/services/components/metrics/metrics.go @@ -42,6 +42,18 @@ func CreateAgentMetricV2(agt config.AgentConfig, metric string, value float64, c } } +func CreateDetailedAgentMetricV2(agt config.AgentConfig, metric string, value float64, details string, chainID int64) *protocol.AgentMetric { + return &protocol.AgentMetric{ + AgentId: agt.ID, + Timestamp: time.Now().Format(time.RFC3339), + Name: metric, + Value: value, + ShardId: agt.ShardID(), + ChainId: chainID, + Details: details, + } +} + func CreateEventMetric(t time.Time, id string, metric string, details string) *protocol.AgentMetric { return &protocol.AgentMetric{ AgentId: id, @@ -52,6 +64,16 @@ func CreateEventMetric(t time.Time, id string, metric string, details string) *p } } +func CreateSystemMetric(metric string, value float64, details string) *protocol.AgentMetric { + return &protocol.AgentMetric{ + AgentId: "system", + Timestamp: time.Now().Format(time.RFC3339), + Name: metric, + Value: value, + Details: details, + } +} + func CreateAgentResourcesMetric(agt config.AgentConfig, t time.Time, metric string, value float64) *protocol.AgentMetric { return &protocol.AgentMetric{ AgentId: agt.ID, diff --git a/services/json-rpc/cache/cache.go b/services/json-rpc/cache/cache.go new file mode 100644 index 00000000..194754e7 --- /dev/null +++ b/services/json-rpc/cache/cache.go @@ -0,0 +1,96 @@ +package json_rpc_cache + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/forta-network/forta-core-go/domain" + "github.com/forta-network/forta-core-go/protocol" + "github.com/patrickmn/go-cache" + log "github.com/sirupsen/logrus" +) + +type inMemory struct { + cache *cache.Cache +} + +func NewCache(expire time.Duration) *inMemory { + return &inMemory{ + cache: cache.New(expire, expire), + } +} + +func (c *inMemory) Append(blocksData *protocol.BlocksData) { + for _, event := range blocksData.Blocks { + chainID := event.ChainID + + // eth_blockNumber + method := "eth_blockNumber" + params := "[]" + + if val, ok := c.cache.Get(cacheKey(chainID, method, params)); ok { + blockNumber, ok := val.(string) + + // if the new block number is later than the cached one, update the inMemory + if ok && isLater(blockNumber, event.Block.Number) { + c.cache.SetDefault(cacheKey(chainID, method, params), event.Block.Number) + } + } else { + c.cache.SetDefault(cacheKey(chainID, method, params), event.Block.Number) + } + + log.Debugf("caching block number. chainID: %d method: %s params: %s", chainID, method, params) + + // eth_getBlockByNumber + method = "eth_getBlockByNumber" + params = fmt.Sprintf(`["%s",true]`, event.Block.Number) + log.Debugf("caching block. chainID: %d method: %s params: %s", chainID, method, params) + + block := domain.BlockFromBlockData(event) + c.cache.SetDefault(cacheKey(chainID, method, params), block) + + // eth_getLogs + method = "eth_getLogs" + params = fmt.Sprintf(`[{"fromBlock":"%s","toBlock":"%s"}]`, event.Block.Number, event.Block.Number) + + log.Debugf("caching logs. chainID: %d method: %s params: %s", chainID, method, params) + + logs := domain.LogsFromBlockData(event) + c.cache.SetDefault(cacheKey(chainID, method, params), logs) + + // trace_block + method = "trace_block" + params = fmt.Sprintf(`["%s"]`, event.Block.Number) + + log.Debugf("caching traces. chainID: %d method: %s params: %s", chainID, method, params) + + traces, err := domain.TracesFromBlockData(event) + if err == nil { + c.cache.SetDefault(cacheKey(chainID, method, params), traces) + } + } +} + +func (c *inMemory) Get(chainId uint64, method string, params string) (interface{}, bool) { + return c.cache.Get(cacheKey(chainId, method, params)) +} + +func cacheKey(chainId uint64, method, params string) string { + return fmt.Sprintf("%d-%s-%s", chainId, method, params) +} + +func isLater(actual, new string) bool { + actualInt, err := strconv.ParseInt(strings.Replace(actual, "0x", "", -1), 16, 64) + if err != nil { + return false + } + + newInt, err := strconv.ParseInt(strings.Replace(new, "0x", "", -1), 16, 64) + if err != nil { + return false + } + + return newInt > actualInt +} diff --git a/services/json-rpc/cache/cache_test.go b/services/json-rpc/cache/cache_test.go new file mode 100644 index 00000000..12434451 --- /dev/null +++ b/services/json-rpc/cache/cache_test.go @@ -0,0 +1,105 @@ +package json_rpc_cache + +import ( + "testing" + "time" + + "github.com/forta-network/forta-core-go/protocol" + "github.com/stretchr/testify/assert" +) + +func TestCache(t *testing.T) { + cache := NewCache(time.Millisecond * 500) + + cache.Append(blocks) + + blockNumber, ok := cache.Get(1, "eth_blockNumber", "[]") + assert.True(t, ok) + assert.Equal(t, "1", blockNumber) + + blockNumber, ok = cache.Get(2, "eth_blockNumber", "[]") + assert.True(t, ok) + assert.Equal(t, "101", blockNumber) + + time.Sleep(time.Second) + + blockNumber, ok = cache.Get(1, "eth_blockNumber", "[]") + assert.False(t, ok) + assert.Empty(t, blockNumber) +} + +var blocks = &protocol.BlocksData{ + Blocks: []*protocol.BlockData{ + { + ChainID: 1, + Block: &protocol.BlockWithTransactions{ + Hash: "0xaaaa", + Number: "1", + Transactions: []*protocol.Transaction{ + { + Hash: "0xbbbb", + From: "0xcccc", + }, + }, + Uncles: []string{"0xdddd"}, + }, + Logs: []*protocol.LogEntry{ + { + Address: "0xcccc", + Topics: []string{"0xeeee"}, + }, + }, + Traces: []*protocol.Trace{ + { + Action: &protocol.TraceAction{ + From: "0xcccc", + }, + Result: &protocol.TraceResult{ + Address: "0xcccc", + }, + TraceAddress: []int64{1}, + }, + }, + }, + { + ChainID: 2, + Block: &protocol.BlockWithTransactions{ + Hash: "0xffff", + Number: "100", + Transactions: []*protocol.Transaction{ + { + Hash: "0x1111", + From: "0x2222", + }, + }, + Uncles: []string{"0x3333"}, + }, + Logs: []*protocol.LogEntry{}, + Traces: []*protocol.Trace{ + { + TraceAddress: []int64{2}, + }, + }, + }, + { + ChainID: 2, + Block: &protocol.BlockWithTransactions{ + Hash: "0xfffd", + Number: "101", + Transactions: []*protocol.Transaction{ + { + Hash: "0x1112", + From: "0x2223", + }, + }, + Uncles: []string{"0x3333"}, + }, + Logs: []*protocol.LogEntry{}, + Traces: []*protocol.Trace{ + { + TraceAddress: []int64{1}, + }, + }, + }, + }, +} diff --git a/services/json-rpc/cache/json_rpc_cache.go b/services/json-rpc/cache/json_rpc_cache.go new file mode 100644 index 00000000..3f4c9c6f --- /dev/null +++ b/services/json-rpc/cache/json_rpc_cache.go @@ -0,0 +1,205 @@ +package json_rpc_cache + +import ( + "context" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/forta-network/forta-core-go/domain" + "github.com/forta-network/forta-core-go/protocol" + "github.com/forta-network/forta-core-go/utils" + "github.com/forta-network/forta-node/clients" + "github.com/forta-network/forta-node/clients/blocksdata" + "github.com/forta-network/forta-node/clients/messaging" + "github.com/forta-network/forta-node/config" + "github.com/forta-network/forta-node/services/components/metrics" + "github.com/forta-network/forta-node/services/components/registry" + log "github.com/sirupsen/logrus" +) + +const ( + ChainIDHeader = "X-Forta-Chain-ID" + + // These values will be injected into the agent container to configure bot cache + // BotCacheRequestTimeout timeout until the bot must fallback to the RPC Node + // Value in seconds and can be a float. + BotCacheRequestTimeoutSeconds = "20" + // BotCacheRequestInterval interval between bot requests + // Value in seconds and can be a float. + BotCacheRequestIntervalSeconds = "1" + // BotCacheSupportedChains comma separated list of supported chains + // Chains' data not filtered on the cache side. + BotCacheSupportedChains = "1,137,56,43114,42161,10,250,8453" +) + +type JsonRpcCache struct { + ctx context.Context + cfg config.JsonRpcCacheConfig + botAuthenticator clients.IPAuthenticator + botRegistry registry.BotRegistry + msgClient clients.MessageClient + + server *http.Server + + cache *inMemory + + blocksDataClient clients.BlocksDataClient +} + +func NewJsonRpcCache(ctx context.Context, cfg config.JsonRpcCacheConfig, botRegistry registry.BotRegistry) (*JsonRpcCache, error) { + botAuthenticator, err := clients.NewBotAuthenticator(ctx) + if err != nil { + return nil, err + } + + return &JsonRpcCache{ + ctx: ctx, + cfg: cfg, + botAuthenticator: botAuthenticator, + botRegistry: botRegistry, + msgClient: messaging.NewClient("json-rpc-cache", fmt.Sprintf("%s:%s", config.DockerNatsContainerName, config.DefaultNatsPort)), + }, nil +} + +func (c *JsonRpcCache) Start() error { + c.cache = NewCache(time.Duration(c.cfg.CacheExpirePeriodSeconds) * time.Second) + + c.server = &http.Server{ + Addr: ":8575", + Handler: c.Handler(), + } + + c.blocksDataClient = blocksdata.NewBlocksDataClient(c.cfg.DispatcherURL) + + utils.GoListenAndServe(c.server) + + go c.pollBlocksData() + + return nil +} + +func (p *JsonRpcCache) Stop() error { + if p.server != nil { + return p.server.Close() + } + return nil +} + +func (p *JsonRpcCache) Name() string { + return "json-rpc-cache" +} + +func (c *JsonRpcCache) Handler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + t := time.Now() + req, err := decodeBody(r) + if err != nil { + writeBadRequest(w, req, err) + return + } + + agentConfig, err := c.botAuthenticator.FindAgentFromRemoteAddr(r.RemoteAddr) + if agentConfig == nil || err != nil { + writeUnauthorized(w, req) + return + } + + defer func() { + if err != nil { + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + metrics.CreateAgentMetricV1(*agentConfig, domain.MetricJSONRPCCachePollError, 1), + }, + }, + ) + } + }() + + chainID, err := strconv.ParseInt(r.Header.Get(ChainIDHeader), 10, 64) + if err != nil { + writeBadRequest(w, req, fmt.Errorf("missing or invalid chain id header")) + return + } + + details := fmt.Sprintf("chainID: %d method: %s params: %s", chainID, req.Method, string(req.Params)) + log.Debugf("jsonrpc cache request. %s", details) + + result, ok := c.cache.Get(uint64(chainID), req.Method, string(req.Params)) + if !ok { + log.Debugf("cache miss. %s", details) + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + metrics.CreateDetailedAgentMetricV2(*agentConfig, domain.MetricJSONRPCCacheMiss, 1, details, chainID), + }, + }, + ) + writeNotFound(w, req) + return + } + + err = writeJsonResponse(w, req, result) + if err != nil { + log.WithError(err).Error("failed to write jsonrpc response body") + writeInternalError(w, req, err) + return + } + + since := float64(time.Since(t).Milliseconds()) + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + metrics.CreateDetailedAgentMetricV2(*agentConfig, domain.MetricJSONRPCCacheHit, 1, details, chainID), + metrics.CreateDetailedAgentMetricV2(*agentConfig, domain.MetricJSONRPCCacheLatency, since, details, chainID), + }, + }, + ) + }) +} + +func (c *JsonRpcCache) pollBlocksData() { + bucket := time.Now().Truncate(time.Second * 10).Unix() + + for { + // wait for the next bucket + <-time.After(time.Duration(bucket-time.Now().Unix()) * time.Second) + + agents, err := c.botRegistry.LoadAssignedBots() + if err == nil && len(agents) == 0 { + log.Warn("No agents assigned to the scanner, skipping polling for BlocksData") + bucket += 10 // 10 seconds + continue + } + + log.Infof("Polling BlocksData from dispatcher. bucket: %d", bucket) + + // blocksDataClient internally retries on failure and to not block on the retry, we run it in a goroutine + go func(b int64) { + blocksData, err := c.blocksDataClient.GetBlocksData(b) + if err != nil { + c.msgClient.PublishProto(messaging.SubjectMetricAgent, + metrics.CreateEventMetric(time.Now(), "system", domain.MetricJSONRPCCachePollError, err.Error())) + log.WithError(err).Errorf("Failed to get BlocksData from dispatcher. bucket: %d", b) + return + } + + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + metrics.CreateSystemMetric(domain.MetricJSONRPCCachePollSuccess, float64(len(blocksData.Blocks)), fmt.Sprintf("%d", b)), + metrics.CreateSystemMetric(domain.MetricJSONRPCCacheSize, float64(c.cache.cache.ItemCount()), ""), + }, + }, + ) + + log.Infof("Added BlocksData to local cache. bucket: %d blocksData: %d", b, len(blocksData.Blocks)) + c.cache.Append(blocksData) + }(bucket) + + bucket += 10 // 10 seconds + } +} diff --git a/services/json-rpc/cache/json_rpc_cache_test.go b/services/json-rpc/cache/json_rpc_cache_test.go new file mode 100644 index 00000000..ef162cc9 --- /dev/null +++ b/services/json-rpc/cache/json_rpc_cache_test.go @@ -0,0 +1,116 @@ +package json_rpc_cache + +import ( + "bytes" + "context" + "encoding/json" + "net/http/httptest" + "testing" + "time" + + mock_clients "github.com/forta-network/forta-node/clients/mocks" + "github.com/forta-network/forta-node/config" + mock_registry "github.com/forta-network/forta-node/services/components/registry/mocks" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + testBotCfg = &config.AgentConfig{Owner: "test-owner", ID: "test-id"} +) + +func TestJsonRpcCache(t *testing.T) { + ctrl := gomock.NewController(t) + blocksDataClient := mock_clients.NewMockBlocksDataClient(ctrl) + authenticator := mock_clients.NewMockIPAuthenticator(ctrl) + botRegistry := mock_registry.NewMockBotRegistry(ctrl) + msgClient := mock_clients.NewMockMessageClient(ctrl) + + botRegistry.EXPECT().LoadAssignedBots().Return([]config.AgentConfig{*testBotCfg}, nil).AnyTimes() + msgClient.EXPECT().PublishProto(gomock.Any(), gomock.Any()).AnyTimes() + + count := 0 + appended := make(chan struct{}) + blocksDataClient.EXPECT().GetBlocksData(gomock.Any()).Return(blocks, nil).Do(func(any) { + count++ + if count == 2 { + close(appended) + } + }).AnyTimes() + + botRemoteAddr := "1.1.1.1:1111" + + authenticator.EXPECT().FindAgentFromRemoteAddr(botRemoteAddr).Return(testBotCfg, nil) + + jrpCache := JsonRpcCache{ + ctx: context.TODO(), + botAuthenticator: authenticator, + blocksDataClient: blocksDataClient, + cfg: config.JsonRpcCacheConfig{ + CacheExpirePeriodSeconds: 300, + }, + cache: NewCache(300 * time.Second), + botRegistry: botRegistry, + msgClient: msgClient, + } + + go jrpCache.pollBlocksData() + + <-appended + + jrpReq := jsonRpcReq{ + ID: json.RawMessage("1"), + Method: "eth_blockNumber", + Params: json.RawMessage("[]"), + } + b, err := json.Marshal(jrpReq) + require.NoError(t, err) + + r := httptest.NewRequest("POST", "/", bytes.NewBuffer(b)) + r.RemoteAddr = botRemoteAddr + r.Header.Set("X-Forta-Chain-ID", "1") + rw := httptest.NewRecorder() + + jrpCache.Handler().ServeHTTP(rw, r) + + require.Equal(t, 200, rw.Code) + + b = rw.Body.Bytes() + var resp jsonRpcResp + require.NoError(t, json.Unmarshal(b, &resp)) + require.Nil(t, resp.Error) + + assert.Equal(t, jrpReq.ID, resp.ID) + assert.Equal(t, json.RawMessage(`"1"`), resp.Result) +} + +func TestJsonRpcCache_NoAgentsAssigned(t *testing.T) { + ctrl := gomock.NewController(t) + blocksDataClient := mock_clients.NewMockBlocksDataClient(ctrl) + authenticator := mock_clients.NewMockIPAuthenticator(ctrl) + botRegistry := mock_registry.NewMockBotRegistry(ctrl) + + loaded := make(chan struct{}) + botRegistry.EXPECT().LoadAssignedBots().Return([]config.AgentConfig{}, nil).AnyTimes().Do(func() { + close(loaded) + }) + blocksDataClient.EXPECT().GetBlocksData(gomock.Any()).Return(blocks, nil).Times(0) + + jrpCache := JsonRpcCache{ + ctx: context.TODO(), + botAuthenticator: authenticator, + blocksDataClient: blocksDataClient, + cfg: config.JsonRpcCacheConfig{ + CacheExpirePeriodSeconds: 300, + }, + cache: NewCache(300 * time.Second), + botRegistry: botRegistry, + } + + go jrpCache.pollBlocksData() + <-loaded + + require.Equal(t, 0, jrpCache.cache.cache.ItemCount()) + +} diff --git a/services/json-rpc/cache/jsonrpc.go b/services/json-rpc/cache/jsonrpc.go new file mode 100644 index 00000000..bcabefc4 --- /dev/null +++ b/services/json-rpc/cache/jsonrpc.go @@ -0,0 +1,122 @@ +package json_rpc_cache + +import ( + "encoding/json" + "fmt" + "net/http" + + log "github.com/sirupsen/logrus" +) + +type jsonRpcReq struct { + ID json.RawMessage `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` +} + +type jsonRpcResp struct { + ID json.RawMessage `json:"id"` + JsonRPC string `json:"jsonrpc"` + Result json.RawMessage `json:"result"` + Error *jsonRpcError `json:"error,omitempty"` +} + +type errorResponse struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id"` + Error jsonRpcError `json:"error"` +} + +type jsonRpcError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func decodeBody(req *http.Request) (*jsonRpcReq, error) { + var decodedBody jsonRpcReq + if err := json.NewDecoder(req.Body).Decode(&decodedBody); err != nil { + return nil, fmt.Errorf("failed to decode json-rpc request body") + } + return &decodedBody, nil +} + +func writeJsonResponse(w http.ResponseWriter, req *jsonRpcReq, result any) error { + b, err := json.Marshal(result) + if err != nil { + return err + } + + w.WriteHeader(http.StatusOK) + + return json.NewEncoder(w).Encode(&jsonRpcResp{ + ID: req.ID, + JsonRPC: "2.0", + Result: b, + }) +} + +func writeBadRequest(w http.ResponseWriter, req *jsonRpcReq, err error) { + if req == nil { + http.Error(w, "bad request", http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusBadRequest) + + if err := json.NewEncoder(w).Encode(&errorResponse{ + JSONRPC: "2.0", + ID: req.ID, + Error: jsonRpcError{ + Code: -32600, + Message: err.Error(), + }, + }); err != nil { + log.WithError(err).Error("failed to write jsonrpc error response body") + } +} + +func writeUnauthorized(w http.ResponseWriter, req *jsonRpcReq) { + w.WriteHeader(http.StatusUnauthorized) + + if err := json.NewEncoder(w).Encode(&errorResponse{ + JSONRPC: "2.0", + ID: req.ID, + Error: jsonRpcError{ + Code: -32000, + Message: "unauthorized", + }, + }); err != nil { + log.WithError(err).Error("failed to write jsonrpc error response body") + } +} + +func writeNotFound(w http.ResponseWriter, req *jsonRpcReq) { + w.WriteHeader(http.StatusOK) + + if err := json.NewEncoder(w).Encode(&jsonRpcResp{ + ID: req.ID, + JsonRPC: "2.0", + Result: nil, + Error: &jsonRpcError{ + Code: -32603, + Message: "result not found in cache", + }, + }); err != nil { + log.WithError(err).Error("failed to write jsonrpc error response body") + } +} + +func writeInternalError(w http.ResponseWriter, req *jsonRpcReq, err error) { + w.WriteHeader(http.StatusInternalServerError) + + if err := json.NewEncoder(w).Encode(&errorResponse{ + JSONRPC: "2.0", + ID: req.ID, + Error: jsonRpcError{ + Code: -32603, + Message: err.Error(), + }, + }); err != nil { + log.WithError(err).Error("failed to write jsonrpc error response body") + } +} diff --git a/services/json-rpc/decode_test.go b/services/json-rpc/decode_test.go index 72600377..818975b8 100644 --- a/services/json-rpc/decode_test.go +++ b/services/json-rpc/decode_test.go @@ -23,5 +23,6 @@ func TestDecodeAndReplaceBody(t *testing.T) { // still can read body because it was replaced b, err := io.ReadAll(req.Body) + r.NoError(err) r.Equal(bodyStr, string(b)) } diff --git a/services/supervisor/services.go b/services/supervisor/services.go index ccb94d0c..319be1e6 100644 --- a/services/supervisor/services.go +++ b/services/supervisor/services.go @@ -299,6 +299,9 @@ func (sup *SupervisorService) start() error { Ports: map[string]string{ "": config.DefaultHealthPort, // random host port }, + Files: map[string][]byte{ + "passphrase": []byte(sup.config.Passphrase), + }, DialHost: true, NetworkID: nodeNetworkID, LinkNetworkIDs: []string{natsNetworkID},