diff --git a/clients/blocksdata/r2_client.go b/clients/blocksdata/r2_client.go new file mode 100644 index 00000000..fd5b5912 --- /dev/null +++ b/clients/blocksdata/r2_client.go @@ -0,0 +1,122 @@ +package blocksdata + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/url" + "time" + + "github.com/andybalholm/brotli" + backoff "github.com/cenkalti/backoff/v4" + "github.com/forta-network/forta-core-go/protocol" + "github.com/forta-network/forta-core-go/utils/httpclient" + "google.golang.org/protobuf/proto" +) + +const ( + minBackoff = 1 * time.Second + maxBackoff = 10 * time.Second + maxElapsedTime = 5 * time.Minute +) + +type blocksDataClient struct { + dispatcherURL *url.URL +} + +func NewBlocksDataClient(dispatcherURL string) *blocksDataClient { + u, _ := url.Parse(dispatcherURL) + + return &blocksDataClient{ + dispatcherURL: u, + } +} + +type PresignedURLItem struct { + Bucket int64 `json:"bucket"` + PresignedURL string `json:"presignedURL"` + ExpiresAt int64 `json:"expiresAt"` +} + +func (c *blocksDataClient) GetBlocksData(bucket int64) (_ *protocol.BlocksData, err error) { + dispatcherUrl, err := url.JoinPath(c.dispatcherURL.String(), fmt.Sprintf("%d", bucket)) + if err != nil { + return nil, err + } + + bo := backoff.NewExponentialBackOff() + bo.InitialInterval = minBackoff + bo.MaxInterval = maxBackoff + bo.MaxElapsedTime = maxElapsedTime + + var item PresignedURLItem + + err = backoff.Retry(func() error { + resp, err := httpclient.Default.Get(dispatcherUrl) + if err != nil { + return err + } + + defer resp.Body.Close() + + b, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + + if resp.StatusCode == 404 && bytes.Contains(b, []byte("too old")) { + return fmt.Errorf("%s", b) + } + + if resp.StatusCode != 200 { + return fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, b) + } + + err = json.Unmarshal(b, &item) + if err != nil { + return err + } + + if item.ExpiresAt < time.Now().Unix() { + return backoff.Permanent(fmt.Errorf("presigned URL expired")) + } + + return nil + }, bo) + + if err != nil { + return nil, err + } + + var blocks protocol.BlocksData + + err = backoff.Retry(func() error { + resp, err := httpclient.Default.Get(item.PresignedURL) + if err != nil { + return err + } + + if resp.StatusCode != 200 { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + b, err := io.ReadAll(brotli.NewReader(resp.Body)) + if err != nil { + return err + } + + err = proto.Unmarshal(b, &blocks) + if err != nil { + return backoff.Permanent(err) + } + + return nil + }, bo) + + if err != nil { + return nil, err + } + + return &blocks, nil +} diff --git a/clients/interfaces.go b/clients/interfaces.go index 68671499..0c550456 100644 --- a/clients/interfaces.go +++ b/clients/interfaces.go @@ -7,6 +7,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/events" "github.com/forta-network/forta-core-go/domain" + "github.com/forta-network/forta-core-go/protocol" "github.com/forta-network/forta-node/clients/docker" "github.com/forta-network/forta-node/config" "github.com/golang/protobuf/proto" @@ -68,3 +69,7 @@ type IPAuthenticator interface { FindContainerNameFromRemoteAddr(ctx context.Context, hostPort string) (string, error) FindAgentByContainerName(containerName string) (*config.AgentConfig, error) } + +type BlocksDataClient interface { + GetBlocksData(bucket int64) (*protocol.BlocksData, error) +} diff --git a/clients/mocks/mock_clients.go b/clients/mocks/mock_clients.go index 20daf87a..85ef8ce4 100644 --- a/clients/mocks/mock_clients.go +++ b/clients/mocks/mock_clients.go @@ -12,6 +12,7 @@ import ( types "github.com/docker/docker/api/types" events "github.com/docker/docker/api/types/events" domain "github.com/forta-network/forta-core-go/domain" + protocol "github.com/forta-network/forta-core-go/protocol" docker "github.com/forta-network/forta-node/clients/docker" config "github.com/forta-network/forta-node/config" gomock "github.com/golang/mock/gomock" @@ -708,3 +709,41 @@ func (mr *MockIPAuthenticatorMockRecorder) FindContainerNameFromRemoteAddr(ctx, mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindContainerNameFromRemoteAddr", reflect.TypeOf((*MockIPAuthenticator)(nil).FindContainerNameFromRemoteAddr), ctx, hostPort) } + +// MockBlocksDataClient is a mock of BlocksDataClient interface. +type MockBlocksDataClient struct { + ctrl *gomock.Controller + recorder *MockBlocksDataClientMockRecorder +} + +// MockBlocksDataClientMockRecorder is the mock recorder for MockBlocksDataClient. +type MockBlocksDataClientMockRecorder struct { + mock *MockBlocksDataClient +} + +// NewMockBlocksDataClient creates a new mock instance. +func NewMockBlocksDataClient(ctrl *gomock.Controller) *MockBlocksDataClient { + mock := &MockBlocksDataClient{ctrl: ctrl} + mock.recorder = &MockBlocksDataClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBlocksDataClient) EXPECT() *MockBlocksDataClientMockRecorder { + return m.recorder +} + +// GetBlocksData mocks base method. +func (m *MockBlocksDataClient) GetBlocksData(bucket int64) (*protocol.BlocksData, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetBlocksData", bucket) + ret0, _ := ret[0].(*protocol.BlocksData) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetBlocksData indicates an expected call of GetBlocksData. +func (mr *MockBlocksDataClientMockRecorder) GetBlocksData(bucket interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlocksData", reflect.TypeOf((*MockBlocksDataClient)(nil).GetBlocksData), bucket) +} diff --git a/cmd/json-rpc/main.go b/cmd/json-rpc/main.go index ebe86633..37a780b5 100644 --- a/cmd/json-rpc/main.go +++ b/cmd/json-rpc/main.go @@ -8,29 +8,52 @@ import ( "github.com/forta-network/forta-node/config" "github.com/forta-network/forta-node/healthutils" "github.com/forta-network/forta-node/services" + "github.com/forta-network/forta-node/services/components/registry" jrp "github.com/forta-network/forta-node/services/json-rpc" + jrpcache "github.com/forta-network/forta-node/services/json-rpc/cache" ) func initJsonRpcProxy(ctx context.Context, cfg config.Config) (*jrp.JsonRpcProxy, error) { return jrp.NewJsonRpcProxy(ctx, cfg) } +func initJsonRpcCache(ctx context.Context, cfg config.Config, botRegistry registry.BotRegistry) (*jrpcache.JsonRpcCache, error) { + return jrpcache.NewJsonRpcCache(ctx, cfg.JsonRpcCache, botRegistry) +} + func initServices(ctx context.Context, cfg config.Config) ([]services.Service, error) { // can't dial localhost - need to dial host gateway from container cfg.Scan.JsonRpc.Url = utils.ConvertToDockerHostURL(cfg.Scan.JsonRpc.Url) cfg.JsonRpcProxy.JsonRpc.Url = utils.ConvertToDockerHostURL(cfg.JsonRpcProxy.JsonRpc.Url) + cfg.Registry.JsonRpc.Url = utils.ConvertToDockerHostURL(cfg.Registry.JsonRpc.Url) proxy, err := initJsonRpcProxy(ctx, cfg) if err != nil { return nil, err } + key, err := config.LoadKeyInContainer(cfg) + if err != nil { + return nil, err + } + + botRegistry, err := registry.New(cfg, key.Address) + if err != nil { + return nil, err + } + + cache, err := initJsonRpcCache(ctx, cfg, botRegistry) + if err != nil { + return nil, err + } + return []services.Service{ health.NewService( ctx, "", healthutils.DefaultHealthServerErrHandler, health.CheckerFrom(summarizeReports, proxy), ), proxy, + cache, }, nil } diff --git a/config/config.go b/config/config.go index 8041d6c7..525f5c04 100644 --- a/config/config.go +++ b/config/config.go @@ -44,6 +44,11 @@ type JsonRpcProxyConfig struct { RateLimitConfig *RateLimitConfig `yaml:"rateLimit" json:"rateLimit"` } +type JsonRpcCacheConfig struct { + DispatcherURL string `yaml:"dispatcherUrl" json:"dispatcherUrl" default:"https://dispatcher.forta.network/batch" validate:"omitempty,url"` + CacheExpirePeriodSeconds int `yaml:"cacheExpirePeriodSeconds" json:"cacheExpirePeriodSeconds" default:"300"` +} + type LogConfig struct { Level string `yaml:"level" json:"level" default:"info" ` MaxLogSize string `yaml:"maxLogSize" json:"maxLogSize" default:"50m" ` @@ -227,6 +232,7 @@ type Config struct { Registry RegistryConfig `yaml:"registry" json:"registry"` Publish PublisherConfig `yaml:"publish" json:"publish"` JsonRpcProxy JsonRpcProxyConfig `yaml:"jsonRpcProxy" json:"jsonRpcProxy"` + JsonRpcCache JsonRpcCacheConfig `yaml:"jsonRpcCache" json:"jsonRpcCache"` PublicAPIProxy PublicAPIProxyConfig `yaml:"publicApiProxy" json:"publicApiProxy"` Log LogConfig `yaml:"log" json:"log"` ResourcesConfig ResourcesConfig `yaml:"resources" json:"resources"` diff --git a/config/defaults.go b/config/defaults.go index 70145b46..3987b401 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -14,5 +14,6 @@ const ( DefaultPublicAPIProxyPort = "8535" DefaultJSONRPCProxyPort = "8545" DefaultBotHealthCheckPort = "8565" + DefaultBotJSONRPCCachePort = "8575" DefaultFortaNodeBinaryPath = "/forta-node" // the path for the common binary in the container image ) diff --git a/config/env.go b/config/env.go index 493a1347..cb8f4364 100644 --- a/config/env.go +++ b/config/env.go @@ -6,8 +6,6 @@ const ( EnvReleaseInfo = "FORTA_RELEASE_INFO" // Agent env vars - EnvJsonRpcHost = "JSON_RPC_HOST" - EnvJsonRpcPort = "JSON_RPC_PORT" EnvJWTProviderHost = "FORTA_JWT_PROVIDER_HOST" EnvJWTProviderPort = "FORTA_JWT_PROVIDER_PORT" EnvPublicAPIProxyHost = "FORTA_PUBLIC_API_PROXY_HOST" @@ -20,4 +18,11 @@ const ( EnvFortaShardID = "FORTA_SHARD_ID" EnvFortaShardCount = "FORTA_SHARD_COUNT" EnvFortaTokenExchangeURL = "FORTA_TOKEN_EXCHANGE_URL" + + EnvJsonRpcHost = "JSON_RPC_HOST" + EnvJsonRpcPort = "JSON_RPC_PORT" + EnvCacheJsonRpcCachePort = "JSON_RPC_CACHE_PORT" + EnvCacheRequestTimeout = "JSON_RPC_CACHE_TIMEOUT" + EnvCacheRequestInterval = "JSON_RPC_CACHE_INTERVAL" + EnvCacheSupportedChains = "JSON_RPC_CACHE_SUPPORTED_CHAINS" ) diff --git a/go.mod b/go.mod index 875a5eef..fd31eba8 100644 --- a/go.mod +++ b/go.mod @@ -39,12 +39,15 @@ require ( replace github.com/docker/docker => github.com/moby/moby v20.10.25+incompatible require ( + github.com/andybalholm/brotli v1.1.0 + github.com/cenkalti/backoff/v4 v4.1.3 github.com/docker/docker v1.6.2 github.com/docker/go-connections v0.4.0 - github.com/forta-network/forta-core-go v0.0.0-20240306085049-a1ac54ae90f5 + github.com/forta-network/forta-core-go v0.0.0-20240315154515-c71e1f3bd5e2 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.39.0 + google.golang.org/protobuf v1.28.1 ) require ( @@ -60,7 +63,6 @@ require ( github.com/blang/semver/v4 v4.0.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect - github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.0.4 // indirect github.com/coreos/go-systemd/v22 v22.4.0 // indirect @@ -286,7 +288,6 @@ require ( golang.org/x/tools v0.2.0 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect - google.golang.org/protobuf v1.28.1 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 1c11dbbe..02c846d2 100644 --- a/go.sum +++ b/go.sum @@ -109,6 +109,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= @@ -329,8 +331,8 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= -github.com/forta-network/forta-core-go v0.0.0-20240306085049-a1ac54ae90f5 h1:bj2OqjhoCRKUYlSKySH3kWanC77QnlWZsQrSCaw7FDg= -github.com/forta-network/forta-core-go v0.0.0-20240306085049-a1ac54ae90f5/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo= +github.com/forta-network/forta-core-go v0.0.0-20240315154515-c71e1f3bd5e2 h1:v+snSZVsMUPtPX6pI5oxULTWiAfJ1igeE0Iqilma7/Y= +github.com/forta-network/forta-core-go v0.0.0-20240315154515-c71e1f3bd5e2/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo= github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0= github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= diff --git a/services/components/containers/definitions.go b/services/components/containers/definitions.go index 83fd07ad..af8ef816 100644 --- a/services/components/containers/definitions.go +++ b/services/components/containers/definitions.go @@ -5,6 +5,7 @@ import ( "github.com/forta-network/forta-node/clients/docker" "github.com/forta-network/forta-node/config" + jrpcache "github.com/forta-network/forta-node/services/json-rpc/cache" ) // Label values @@ -44,6 +45,10 @@ func NewBotContainerConfig( config.EnvFortaBotOwner: botConfig.Owner, config.EnvFortaHealthCheckPort: config.DefaultBotHealthCheckPort, config.EnvFortaTokenExchangeURL: tokenExchangeURL, + config.EnvCacheJsonRpcCachePort: config.DefaultBotJSONRPCCachePort, + config.EnvCacheRequestTimeout: jrpcache.BotCacheRequestTimeoutSeconds, + config.EnvCacheRequestInterval: jrpcache.BotCacheRequestIntervalSeconds, + config.EnvCacheSupportedChains: jrpcache.BotCacheSupportedChains, } if botConfig.ChainID > 0 { env[config.EnvFortaChainID] = fmt.Sprintf("%d", botConfig.ChainID) diff --git a/services/components/metrics/metrics.go b/services/components/metrics/metrics.go index fa0e540f..638e352b 100644 --- a/services/components/metrics/metrics.go +++ b/services/components/metrics/metrics.go @@ -42,6 +42,18 @@ func CreateAgentMetricV2(agt config.AgentConfig, metric string, value float64, c } } +func CreateDetailedAgentMetricV2(agt config.AgentConfig, metric string, value float64, details string, chainID int64) *protocol.AgentMetric { + return &protocol.AgentMetric{ + AgentId: agt.ID, + Timestamp: time.Now().Format(time.RFC3339), + Name: metric, + Value: value, + ShardId: agt.ShardID(), + ChainId: chainID, + Details: details, + } +} + func CreateEventMetric(t time.Time, id string, metric string, details string) *protocol.AgentMetric { return &protocol.AgentMetric{ AgentId: id, @@ -52,6 +64,16 @@ func CreateEventMetric(t time.Time, id string, metric string, details string) *p } } +func CreateSystemMetric(metric string, value float64, details string) *protocol.AgentMetric { + return &protocol.AgentMetric{ + AgentId: "system", + Timestamp: time.Now().Format(time.RFC3339), + Name: metric, + Value: value, + Details: details, + } +} + func CreateAgentResourcesMetric(agt config.AgentConfig, t time.Time, metric string, value float64) *protocol.AgentMetric { return &protocol.AgentMetric{ AgentId: agt.ID, diff --git a/services/json-rpc/cache/cache.go b/services/json-rpc/cache/cache.go new file mode 100644 index 00000000..194754e7 --- /dev/null +++ b/services/json-rpc/cache/cache.go @@ -0,0 +1,96 @@ +package json_rpc_cache + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/forta-network/forta-core-go/domain" + "github.com/forta-network/forta-core-go/protocol" + "github.com/patrickmn/go-cache" + log "github.com/sirupsen/logrus" +) + +type inMemory struct { + cache *cache.Cache +} + +func NewCache(expire time.Duration) *inMemory { + return &inMemory{ + cache: cache.New(expire, expire), + } +} + +func (c *inMemory) Append(blocksData *protocol.BlocksData) { + for _, event := range blocksData.Blocks { + chainID := event.ChainID + + // eth_blockNumber + method := "eth_blockNumber" + params := "[]" + + if val, ok := c.cache.Get(cacheKey(chainID, method, params)); ok { + blockNumber, ok := val.(string) + + // if the new block number is later than the cached one, update the inMemory + if ok && isLater(blockNumber, event.Block.Number) { + c.cache.SetDefault(cacheKey(chainID, method, params), event.Block.Number) + } + } else { + c.cache.SetDefault(cacheKey(chainID, method, params), event.Block.Number) + } + + log.Debugf("caching block number. chainID: %d method: %s params: %s", chainID, method, params) + + // eth_getBlockByNumber + method = "eth_getBlockByNumber" + params = fmt.Sprintf(`["%s",true]`, event.Block.Number) + log.Debugf("caching block. chainID: %d method: %s params: %s", chainID, method, params) + + block := domain.BlockFromBlockData(event) + c.cache.SetDefault(cacheKey(chainID, method, params), block) + + // eth_getLogs + method = "eth_getLogs" + params = fmt.Sprintf(`[{"fromBlock":"%s","toBlock":"%s"}]`, event.Block.Number, event.Block.Number) + + log.Debugf("caching logs. chainID: %d method: %s params: %s", chainID, method, params) + + logs := domain.LogsFromBlockData(event) + c.cache.SetDefault(cacheKey(chainID, method, params), logs) + + // trace_block + method = "trace_block" + params = fmt.Sprintf(`["%s"]`, event.Block.Number) + + log.Debugf("caching traces. chainID: %d method: %s params: %s", chainID, method, params) + + traces, err := domain.TracesFromBlockData(event) + if err == nil { + c.cache.SetDefault(cacheKey(chainID, method, params), traces) + } + } +} + +func (c *inMemory) Get(chainId uint64, method string, params string) (interface{}, bool) { + return c.cache.Get(cacheKey(chainId, method, params)) +} + +func cacheKey(chainId uint64, method, params string) string { + return fmt.Sprintf("%d-%s-%s", chainId, method, params) +} + +func isLater(actual, new string) bool { + actualInt, err := strconv.ParseInt(strings.Replace(actual, "0x", "", -1), 16, 64) + if err != nil { + return false + } + + newInt, err := strconv.ParseInt(strings.Replace(new, "0x", "", -1), 16, 64) + if err != nil { + return false + } + + return newInt > actualInt +} diff --git a/services/json-rpc/cache/cache_test.go b/services/json-rpc/cache/cache_test.go new file mode 100644 index 00000000..12434451 --- /dev/null +++ b/services/json-rpc/cache/cache_test.go @@ -0,0 +1,105 @@ +package json_rpc_cache + +import ( + "testing" + "time" + + "github.com/forta-network/forta-core-go/protocol" + "github.com/stretchr/testify/assert" +) + +func TestCache(t *testing.T) { + cache := NewCache(time.Millisecond * 500) + + cache.Append(blocks) + + blockNumber, ok := cache.Get(1, "eth_blockNumber", "[]") + assert.True(t, ok) + assert.Equal(t, "1", blockNumber) + + blockNumber, ok = cache.Get(2, "eth_blockNumber", "[]") + assert.True(t, ok) + assert.Equal(t, "101", blockNumber) + + time.Sleep(time.Second) + + blockNumber, ok = cache.Get(1, "eth_blockNumber", "[]") + assert.False(t, ok) + assert.Empty(t, blockNumber) +} + +var blocks = &protocol.BlocksData{ + Blocks: []*protocol.BlockData{ + { + ChainID: 1, + Block: &protocol.BlockWithTransactions{ + Hash: "0xaaaa", + Number: "1", + Transactions: []*protocol.Transaction{ + { + Hash: "0xbbbb", + From: "0xcccc", + }, + }, + Uncles: []string{"0xdddd"}, + }, + Logs: []*protocol.LogEntry{ + { + Address: "0xcccc", + Topics: []string{"0xeeee"}, + }, + }, + Traces: []*protocol.Trace{ + { + Action: &protocol.TraceAction{ + From: "0xcccc", + }, + Result: &protocol.TraceResult{ + Address: "0xcccc", + }, + TraceAddress: []int64{1}, + }, + }, + }, + { + ChainID: 2, + Block: &protocol.BlockWithTransactions{ + Hash: "0xffff", + Number: "100", + Transactions: []*protocol.Transaction{ + { + Hash: "0x1111", + From: "0x2222", + }, + }, + Uncles: []string{"0x3333"}, + }, + Logs: []*protocol.LogEntry{}, + Traces: []*protocol.Trace{ + { + TraceAddress: []int64{2}, + }, + }, + }, + { + ChainID: 2, + Block: &protocol.BlockWithTransactions{ + Hash: "0xfffd", + Number: "101", + Transactions: []*protocol.Transaction{ + { + Hash: "0x1112", + From: "0x2223", + }, + }, + Uncles: []string{"0x3333"}, + }, + Logs: []*protocol.LogEntry{}, + Traces: []*protocol.Trace{ + { + TraceAddress: []int64{1}, + }, + }, + }, + }, +} diff --git a/services/json-rpc/cache/json_rpc_cache.go b/services/json-rpc/cache/json_rpc_cache.go new file mode 100644 index 00000000..3f4c9c6f --- /dev/null +++ b/services/json-rpc/cache/json_rpc_cache.go @@ -0,0 +1,205 @@ +package json_rpc_cache + +import ( + "context" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/forta-network/forta-core-go/domain" + "github.com/forta-network/forta-core-go/protocol" + "github.com/forta-network/forta-core-go/utils" + "github.com/forta-network/forta-node/clients" + "github.com/forta-network/forta-node/clients/blocksdata" + "github.com/forta-network/forta-node/clients/messaging" + "github.com/forta-network/forta-node/config" + "github.com/forta-network/forta-node/services/components/metrics" + "github.com/forta-network/forta-node/services/components/registry" + log "github.com/sirupsen/logrus" +) + +const ( + ChainIDHeader = "X-Forta-Chain-ID" + + // These values will be injected into the agent container to configure bot cache + // BotCacheRequestTimeout timeout until the bot must fallback to the RPC Node + // Value in seconds and can be a float. + BotCacheRequestTimeoutSeconds = "20" + // BotCacheRequestInterval interval between bot requests + // Value in seconds and can be a float. + BotCacheRequestIntervalSeconds = "1" + // BotCacheSupportedChains comma separated list of supported chains + // Chains' data not filtered on the cache side. + BotCacheSupportedChains = "1,137,56,43114,42161,10,250,8453" +) + +type JsonRpcCache struct { + ctx context.Context + cfg config.JsonRpcCacheConfig + botAuthenticator clients.IPAuthenticator + botRegistry registry.BotRegistry + msgClient clients.MessageClient + + server *http.Server + + cache *inMemory + + blocksDataClient clients.BlocksDataClient +} + +func NewJsonRpcCache(ctx context.Context, cfg config.JsonRpcCacheConfig, botRegistry registry.BotRegistry) (*JsonRpcCache, error) { + botAuthenticator, err := clients.NewBotAuthenticator(ctx) + if err != nil { + return nil, err + } + + return &JsonRpcCache{ + ctx: ctx, + cfg: cfg, + botAuthenticator: botAuthenticator, + botRegistry: botRegistry, + msgClient: messaging.NewClient("json-rpc-cache", fmt.Sprintf("%s:%s", config.DockerNatsContainerName, config.DefaultNatsPort)), + }, nil +} + +func (c *JsonRpcCache) Start() error { + c.cache = NewCache(time.Duration(c.cfg.CacheExpirePeriodSeconds) * time.Second) + + c.server = &http.Server{ + Addr: ":8575", + Handler: c.Handler(), + } + + c.blocksDataClient = blocksdata.NewBlocksDataClient(c.cfg.DispatcherURL) + + utils.GoListenAndServe(c.server) + + go c.pollBlocksData() + + return nil +} + +func (p *JsonRpcCache) Stop() error { + if p.server != nil { + return p.server.Close() + } + return nil +} + +func (p *JsonRpcCache) Name() string { + return "json-rpc-cache" +} + +func (c *JsonRpcCache) Handler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + t := time.Now() + req, err := decodeBody(r) + if err != nil { + writeBadRequest(w, req, err) + return + } + + agentConfig, err := c.botAuthenticator.FindAgentFromRemoteAddr(r.RemoteAddr) + if agentConfig == nil || err != nil { + writeUnauthorized(w, req) + return + } + + defer func() { + if err != nil { + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + metrics.CreateAgentMetricV1(*agentConfig, domain.MetricJSONRPCCachePollError, 1), + }, + }, + ) + } + }() + + chainID, err := strconv.ParseInt(r.Header.Get(ChainIDHeader), 10, 64) + if err != nil { + writeBadRequest(w, req, fmt.Errorf("missing or invalid chain id header")) + return + } + + details := fmt.Sprintf("chainID: %d method: %s params: %s", chainID, req.Method, string(req.Params)) + log.Debugf("jsonrpc cache request. %s", details) + + result, ok := c.cache.Get(uint64(chainID), req.Method, string(req.Params)) + if !ok { + log.Debugf("cache miss. %s", details) + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + metrics.CreateDetailedAgentMetricV2(*agentConfig, domain.MetricJSONRPCCacheMiss, 1, details, chainID), + }, + }, + ) + writeNotFound(w, req) + return + } + + err = writeJsonResponse(w, req, result) + if err != nil { + log.WithError(err).Error("failed to write jsonrpc response body") + writeInternalError(w, req, err) + return + } + + since := float64(time.Since(t).Milliseconds()) + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + metrics.CreateDetailedAgentMetricV2(*agentConfig, domain.MetricJSONRPCCacheHit, 1, details, chainID), + metrics.CreateDetailedAgentMetricV2(*agentConfig, domain.MetricJSONRPCCacheLatency, since, details, chainID), + }, + }, + ) + }) +} + +func (c *JsonRpcCache) pollBlocksData() { + bucket := time.Now().Truncate(time.Second * 10).Unix() + + for { + // wait for the next bucket + <-time.After(time.Duration(bucket-time.Now().Unix()) * time.Second) + + agents, err := c.botRegistry.LoadAssignedBots() + if err == nil && len(agents) == 0 { + log.Warn("No agents assigned to the scanner, skipping polling for BlocksData") + bucket += 10 // 10 seconds + continue + } + + log.Infof("Polling BlocksData from dispatcher. bucket: %d", bucket) + + // blocksDataClient internally retries on failure and to not block on the retry, we run it in a goroutine + go func(b int64) { + blocksData, err := c.blocksDataClient.GetBlocksData(b) + if err != nil { + c.msgClient.PublishProto(messaging.SubjectMetricAgent, + metrics.CreateEventMetric(time.Now(), "system", domain.MetricJSONRPCCachePollError, err.Error())) + log.WithError(err).Errorf("Failed to get BlocksData from dispatcher. bucket: %d", b) + return + } + + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + metrics.CreateSystemMetric(domain.MetricJSONRPCCachePollSuccess, float64(len(blocksData.Blocks)), fmt.Sprintf("%d", b)), + metrics.CreateSystemMetric(domain.MetricJSONRPCCacheSize, float64(c.cache.cache.ItemCount()), ""), + }, + }, + ) + + log.Infof("Added BlocksData to local cache. bucket: %d blocksData: %d", b, len(blocksData.Blocks)) + c.cache.Append(blocksData) + }(bucket) + + bucket += 10 // 10 seconds + } +} diff --git a/services/json-rpc/cache/json_rpc_cache_test.go b/services/json-rpc/cache/json_rpc_cache_test.go new file mode 100644 index 00000000..ef162cc9 --- /dev/null +++ b/services/json-rpc/cache/json_rpc_cache_test.go @@ -0,0 +1,116 @@ +package json_rpc_cache + +import ( + "bytes" + "context" + "encoding/json" + "net/http/httptest" + "testing" + "time" + + mock_clients "github.com/forta-network/forta-node/clients/mocks" + "github.com/forta-network/forta-node/config" + mock_registry "github.com/forta-network/forta-node/services/components/registry/mocks" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + testBotCfg = &config.AgentConfig{Owner: "test-owner", ID: "test-id"} +) + +func TestJsonRpcCache(t *testing.T) { + ctrl := gomock.NewController(t) + blocksDataClient := mock_clients.NewMockBlocksDataClient(ctrl) + authenticator := mock_clients.NewMockIPAuthenticator(ctrl) + botRegistry := mock_registry.NewMockBotRegistry(ctrl) + msgClient := mock_clients.NewMockMessageClient(ctrl) + + botRegistry.EXPECT().LoadAssignedBots().Return([]config.AgentConfig{*testBotCfg}, nil).AnyTimes() + msgClient.EXPECT().PublishProto(gomock.Any(), gomock.Any()).AnyTimes() + + count := 0 + appended := make(chan struct{}) + blocksDataClient.EXPECT().GetBlocksData(gomock.Any()).Return(blocks, nil).Do(func(any) { + count++ + if count == 2 { + close(appended) + } + }).AnyTimes() + + botRemoteAddr := "1.1.1.1:1111" + + authenticator.EXPECT().FindAgentFromRemoteAddr(botRemoteAddr).Return(testBotCfg, nil) + + jrpCache := JsonRpcCache{ + ctx: context.TODO(), + botAuthenticator: authenticator, + blocksDataClient: blocksDataClient, + cfg: config.JsonRpcCacheConfig{ + CacheExpirePeriodSeconds: 300, + }, + cache: NewCache(300 * time.Second), + botRegistry: botRegistry, + msgClient: msgClient, + } + + go jrpCache.pollBlocksData() + + <-appended + + jrpReq := jsonRpcReq{ + ID: json.RawMessage("1"), + Method: "eth_blockNumber", + Params: json.RawMessage("[]"), + } + b, err := json.Marshal(jrpReq) + require.NoError(t, err) + + r := httptest.NewRequest("POST", "/", bytes.NewBuffer(b)) + r.RemoteAddr = botRemoteAddr + r.Header.Set("X-Forta-Chain-ID", "1") + rw := httptest.NewRecorder() + + jrpCache.Handler().ServeHTTP(rw, r) + + require.Equal(t, 200, rw.Code) + + b = rw.Body.Bytes() + var resp jsonRpcResp + require.NoError(t, json.Unmarshal(b, &resp)) + require.Nil(t, resp.Error) + + assert.Equal(t, jrpReq.ID, resp.ID) + assert.Equal(t, json.RawMessage(`"1"`), resp.Result) +} + +func TestJsonRpcCache_NoAgentsAssigned(t *testing.T) { + ctrl := gomock.NewController(t) + blocksDataClient := mock_clients.NewMockBlocksDataClient(ctrl) + authenticator := mock_clients.NewMockIPAuthenticator(ctrl) + botRegistry := mock_registry.NewMockBotRegistry(ctrl) + + loaded := make(chan struct{}) + botRegistry.EXPECT().LoadAssignedBots().Return([]config.AgentConfig{}, nil).AnyTimes().Do(func() { + close(loaded) + }) + blocksDataClient.EXPECT().GetBlocksData(gomock.Any()).Return(blocks, nil).Times(0) + + jrpCache := JsonRpcCache{ + ctx: context.TODO(), + botAuthenticator: authenticator, + blocksDataClient: blocksDataClient, + cfg: config.JsonRpcCacheConfig{ + CacheExpirePeriodSeconds: 300, + }, + cache: NewCache(300 * time.Second), + botRegistry: botRegistry, + } + + go jrpCache.pollBlocksData() + <-loaded + + require.Equal(t, 0, jrpCache.cache.cache.ItemCount()) + +} diff --git a/services/json-rpc/cache/jsonrpc.go b/services/json-rpc/cache/jsonrpc.go new file mode 100644 index 00000000..bcabefc4 --- /dev/null +++ b/services/json-rpc/cache/jsonrpc.go @@ -0,0 +1,122 @@ +package json_rpc_cache + +import ( + "encoding/json" + "fmt" + "net/http" + + log "github.com/sirupsen/logrus" +) + +type jsonRpcReq struct { + ID json.RawMessage `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` +} + +type jsonRpcResp struct { + ID json.RawMessage `json:"id"` + JsonRPC string `json:"jsonrpc"` + Result json.RawMessage `json:"result"` + Error *jsonRpcError `json:"error,omitempty"` +} + +type errorResponse struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id"` + Error jsonRpcError `json:"error"` +} + +type jsonRpcError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func decodeBody(req *http.Request) (*jsonRpcReq, error) { + var decodedBody jsonRpcReq + if err := json.NewDecoder(req.Body).Decode(&decodedBody); err != nil { + return nil, fmt.Errorf("failed to decode json-rpc request body") + } + return &decodedBody, nil +} + +func writeJsonResponse(w http.ResponseWriter, req *jsonRpcReq, result any) error { + b, err := json.Marshal(result) + if err != nil { + return err + } + + w.WriteHeader(http.StatusOK) + + return json.NewEncoder(w).Encode(&jsonRpcResp{ + ID: req.ID, + JsonRPC: "2.0", + Result: b, + }) +} + +func writeBadRequest(w http.ResponseWriter, req *jsonRpcReq, err error) { + if req == nil { + http.Error(w, "bad request", http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusBadRequest) + + if err := json.NewEncoder(w).Encode(&errorResponse{ + JSONRPC: "2.0", + ID: req.ID, + Error: jsonRpcError{ + Code: -32600, + Message: err.Error(), + }, + }); err != nil { + log.WithError(err).Error("failed to write jsonrpc error response body") + } +} + +func writeUnauthorized(w http.ResponseWriter, req *jsonRpcReq) { + w.WriteHeader(http.StatusUnauthorized) + + if err := json.NewEncoder(w).Encode(&errorResponse{ + JSONRPC: "2.0", + ID: req.ID, + Error: jsonRpcError{ + Code: -32000, + Message: "unauthorized", + }, + }); err != nil { + log.WithError(err).Error("failed to write jsonrpc error response body") + } +} + +func writeNotFound(w http.ResponseWriter, req *jsonRpcReq) { + w.WriteHeader(http.StatusOK) + + if err := json.NewEncoder(w).Encode(&jsonRpcResp{ + ID: req.ID, + JsonRPC: "2.0", + Result: nil, + Error: &jsonRpcError{ + Code: -32603, + Message: "result not found in cache", + }, + }); err != nil { + log.WithError(err).Error("failed to write jsonrpc error response body") + } +} + +func writeInternalError(w http.ResponseWriter, req *jsonRpcReq, err error) { + w.WriteHeader(http.StatusInternalServerError) + + if err := json.NewEncoder(w).Encode(&errorResponse{ + JSONRPC: "2.0", + ID: req.ID, + Error: jsonRpcError{ + Code: -32603, + Message: err.Error(), + }, + }); err != nil { + log.WithError(err).Error("failed to write jsonrpc error response body") + } +} diff --git a/services/json-rpc/decode_test.go b/services/json-rpc/decode_test.go index 72600377..818975b8 100644 --- a/services/json-rpc/decode_test.go +++ b/services/json-rpc/decode_test.go @@ -23,5 +23,6 @@ func TestDecodeAndReplaceBody(t *testing.T) { // still can read body because it was replaced b, err := io.ReadAll(req.Body) + r.NoError(err) r.Equal(bodyStr, string(b)) } diff --git a/services/supervisor/services.go b/services/supervisor/services.go index ccb94d0c..319be1e6 100644 --- a/services/supervisor/services.go +++ b/services/supervisor/services.go @@ -299,6 +299,9 @@ func (sup *SupervisorService) start() error { Ports: map[string]string{ "": config.DefaultHealthPort, // random host port }, + Files: map[string][]byte{ + "passphrase": []byte(sup.config.Passphrase), + }, DialHost: true, NetworkID: nodeNetworkID, LinkNetworkIDs: []string{natsNetworkID},