Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement v2 sharding and chain ID detection #837

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions config/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ type AgentConfig struct {
}

type ShardConfig struct {
ShardID uint `yaml:"shardId" json:"shardId"`
Shards uint `yaml:"shards" json:"shards"`
Target uint `yaml:"target" json:"target"`
ShardID uint `yaml:"shardId" json:"shardId"`
Shards uint `yaml:"shards" json:"shards"`
Target uint `yaml:"target" json:"target"`
ChainID int64 `yaml:"chainId" json:"chainId"`
}

func (ac AgentConfig) ShardID() int32 {
Expand Down Expand Up @@ -111,8 +112,13 @@ func (ac AgentConfig) ContainerName() string {
_, digest := utils.SplitImageRef(ac.Image)

parts := []string{ContainerNamePrefix, "agent", utils.ShortenString(ac.ID, 8), utils.ShortenString(digest, 4)}

if ac.ProtocolVersion >= 2 {
parts = append(parts, "c"+strconv.Itoa(ac.ChainID)) // append the chain id
}

if ac.IsSharded() {
parts = append(parts, strconv.Itoa(int(ac.ShardConfig.ShardID))) // append the shard id at the end
parts = append(parts, "s"+strconv.Itoa(int(ac.ShardConfig.ShardID))) // append the shard id at the end
}
return strings.Join(parts, "-")
}
Expand Down
2 changes: 1 addition & 1 deletion config/agents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestAgentConfig_ContainerNameSharded(t *testing.T) {
Target: 4,
},
}
assert.Equal(t, "forta-agent-0x04f65c-de86-3", cfg.ContainerName())
assert.Equal(t, "forta-agent-0x04f65c-de86-s3", cfg.ContainerName())
}

func TestAgentConfig_Equal(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const (
EnvFortaBotID = "FORTA_BOT_ID"
EnvFortaBotOwner = "FORTA_BOT_OWNER"
EnvFortaChainID = "FORTA_CHAIN_ID"
EnvFortaShardID = "FORTA_SHARD_ID"
EnvFortaShardCount = "FORTA_SHARD_COUNT"
)

// EnvDefaults contain default values for one env.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ replace github.com/docker/docker => github.com/moby/moby v20.10.25+incompatible
require (
github.com/docker/docker v1.6.2
github.com/docker/go-connections v0.4.0
github.com/forta-network/forta-core-go v0.0.0-20231213130038-116dfba0ee5c
github.com/forta-network/forta-core-go v0.0.0-20231213214626-dcfc36f15fa9
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.39.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwU
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/forta-network/forta-core-go v0.0.0-20231213130038-116dfba0ee5c h1:UHD7Mf60j95HucfsKDMBgHus9RQGeyrSJ10SbR4NBJ4=
github.com/forta-network/forta-core-go v0.0.0-20231213130038-116dfba0ee5c/go.mod h1:F4O7Yxhs0WSB9iFLiCVSYMPI1wOKuLMZIk8dbVg+J/Y=
github.com/forta-network/forta-core-go v0.0.0-20231213214626-dcfc36f15fa9 h1:jnkVaA/1kRYDKe5f8qNSRNT3c3SdPoIcP1IDsRRm9zE=
github.com/forta-network/forta-core-go v0.0.0-20231213214626-dcfc36f15fa9/go.mod h1:F4O7Yxhs0WSB9iFLiCVSYMPI1wOKuLMZIk8dbVg+J/Y=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
Expand Down
40 changes: 24 additions & 16 deletions services/components/containers/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,35 @@ func NewBotContainerConfig(
) docker.ContainerConfig {
limits := config.GetAgentResourceLimits(resourcesConfig)

env := map[string]string{
config.EnvJsonRpcHost: config.DockerJSONRPCProxyContainerName,
config.EnvJsonRpcPort: config.DefaultJSONRPCProxyPort,
config.EnvJWTProviderHost: config.DockerJWTProviderContainerName,
config.EnvJWTProviderPort: config.DefaultJWTProviderPort,
config.EnvPublicAPIProxyHost: config.DockerPublicAPIProxyContainerName,
config.EnvPublicAPIProxyPort: config.DefaultPublicAPIProxyPort,
config.EnvAgentGrpcPort: botConfig.GrpcPort(),
config.EnvFortaBotID: botConfig.ID,
config.EnvFortaBotOwner: botConfig.Owner,
}
if botConfig.ChainID > 0 {
env[config.EnvFortaChainID] = fmt.Sprintf("%d", botConfig.ChainID)
}
if botConfig.IsSharded() {
env[config.EnvFortaShardID] = fmt.Sprintf("%d", botConfig.ShardID())
env[config.EnvFortaShardCount] = fmt.Sprintf("%d", botConfig.ShardConfig.Shards)
}

return docker.ContainerConfig{
Name: botConfig.ContainerName(),
Image: botConfig.Image,
NetworkID: networkID,
LinkNetworkIDs: []string{},
Env: map[string]string{
config.EnvJsonRpcHost: config.DockerJSONRPCProxyContainerName,
config.EnvJsonRpcPort: config.DefaultJSONRPCProxyPort,
config.EnvJWTProviderHost: config.DockerJWTProviderContainerName,
config.EnvJWTProviderPort: config.DefaultJWTProviderPort,
config.EnvPublicAPIProxyHost: config.DockerPublicAPIProxyContainerName,
config.EnvPublicAPIProxyPort: config.DefaultPublicAPIProxyPort,
config.EnvAgentGrpcPort: botConfig.GrpcPort(),
config.EnvFortaBotID: botConfig.ID,
config.EnvFortaBotOwner: botConfig.Owner,
config.EnvFortaChainID: fmt.Sprintf("%d", botConfig.ChainID),
},
MaxLogFiles: logConfig.MaxLogFiles,
MaxLogSize: logConfig.MaxLogSize,
CPUQuota: limits.CPUQuota,
Memory: limits.Memory,
Env: env,
MaxLogFiles: logConfig.MaxLogFiles,
MaxLogSize: logConfig.MaxLogSize,
CPUQuota: limits.CPUQuota,
Memory: limits.Memory,
Labels: map[string]string{
docker.LabelFortaIsBot: LabelValueFortaIsBot,
docker.LabelFortaSupervisorStrategyVersion: LabelValueStrategyVersion,
Expand Down
50 changes: 50 additions & 0 deletions services/components/containers/definitions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package containers

import (
"testing"

"github.com/forta-network/forta-node/config"
"github.com/stretchr/testify/require"
)

func TestContainerEnvVar_ChainID(t *testing.T) {
r := require.New(t)

botConfig := config.AgentConfig{
ChainID: 0,
}
containerConfig := NewBotContainerConfig("", botConfig, config.LogConfig{}, config.ResourcesConfig{})
env := containerConfig.Env
r.Equal("", env[config.EnvFortaChainID])

botConfig = config.AgentConfig{
ChainID: 137,
}
containerConfig = NewBotContainerConfig("", botConfig, config.LogConfig{}, config.ResourcesConfig{})
env = containerConfig.Env
r.Equal("137", env[config.EnvFortaChainID])
}

func TestContainerEnvVar_Sharding(t *testing.T) {
r := require.New(t)

botConfig := config.AgentConfig{
ShardConfig: nil,
}
containerConfig := NewBotContainerConfig("", botConfig, config.LogConfig{}, config.ResourcesConfig{})
env := containerConfig.Env
r.Equal("", env[config.EnvFortaShardID])
r.Equal("", env[config.EnvFortaShardCount])

botConfig = config.AgentConfig{
ShardConfig: &config.ShardConfig{
ShardID: 0,
Shards: 2,
Target: 3,
},
}
containerConfig = NewBotContainerConfig("", botConfig, config.LogConfig{}, config.ResourcesConfig{})
env = containerConfig.Env
r.Equal("0", env[config.EnvFortaShardID])
r.Equal("2", env[config.EnvFortaShardCount])
}
6 changes: 6 additions & 0 deletions services/components/lifecycle/bot_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ func (blm *botLifecycleManager) ExitInactiveBots(ctx context.Context) error {
blm.lifecycleMetrics.StatusInactive(config.AgentConfig{ID: inactiveBotID})
continue
}

// TODO: Do not support inactive bot restarts until v2 health check support.
if botConfig.ProtocolVersion >= 2 {
continue
}

inactiveCfgs = append(inactiveCfgs, botConfig)
logger.Info("killing inactive bot for reinitialization")
if err := blm.botClient.StopBot(ctx, botConfig); err != nil {
Expand Down
78 changes: 13 additions & 65 deletions store/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/forta-network/forta-core-go/registry"
"github.com/forta-network/forta-core-go/utils"
"github.com/forta-network/forta-node/config"
"github.com/forta-network/forta-node/store/sharding"
)

var (
Expand All @@ -25,9 +26,6 @@ var (
)

const (
keyDefaultChainSetting = "default"
minShardCount = 1

// This is the force reload interval that helps us ignore the on-chain assignment
// list hash. This helps avoid getting stuck with bad state.
//
Expand Down Expand Up @@ -148,18 +146,6 @@ func (rs *registryStore) FindAgentGlobally(agentID string) (*config.AgentConfig,
return botCfg, err
}

// returns shard id for an index, distributed evenly in an increased order.
// Example:
// Target: 6, Shards: 3
// should be [0,0,0,0,0,0,1,1,1,1,1,1,2,2,2,2,2,2]
func calculateShardID(target, idx uint) uint {
if target == 0 {
return 0
}

return idx / target
}

func (rs *registryStore) getLoadedBot(manifest string) (config.AgentConfig, bool) {
for _, loadedBot := range rs.loadedBots {
if manifest == loadedBot.Manifest {
Expand Down Expand Up @@ -217,7 +203,17 @@ func (rs *registryStore) loadAssignment(assignment *registry.Assignment) (*confi
}

botCfg.Owner = assignment.AgentOwner
botCfg.ShardConfig = populateShardConfig(assignment, agentData, rs.cfg.ChainID)

if botCfg.ProtocolVersion >= 2 {
var ok bool
botCfg.ShardConfig, ok = sharding.CalculateShardConfigV2(assignment, agentData)
if !ok {
return nil, fmt.Errorf("%w: invalid sharding config", errInvalidBot)
}
botCfg.ChainID = int(botCfg.ShardConfig.ChainID)
} else {
botCfg.ShardConfig = sharding.CalculateShardConfig(assignment, agentData, rs.cfg.ChainID)
}

return botCfg, nil
}
Expand Down Expand Up @@ -266,54 +262,6 @@ func NewRegistryStore(ctx context.Context, cfg config.Config) (*registryStore, e
}, nil
}

func populateShardConfig(assignment *registry.Assignment, agentManifest *manifest.SignedAgentManifest, chainID int) *config.ShardConfig {
var (
target, shards uint
)

// check if there is a default chain setting
chainSetting, ok := agentManifest.Manifest.ChainSettings[keyDefaultChainSetting]
// if not a sharded bot, shard is always 0
if ok {
target = chainSetting.Target
shards = chainSetting.Shards
}

// check if there is a chain setting for the scanner's chain
chainIDStr := strconv.FormatInt(int64(chainID), 10)
chainSetting, ok = agentManifest.Manifest.ChainSettings[chainIDStr]
// if not a sharded bot, shard is always 0
if ok {
target = chainSetting.Target
shards = chainSetting.Shards
}

// if no sharding specified, shard count is 1 and target is total assigns
if shards == 0 {
target = uint(assignment.AssignedScanners)

return createShardConfig(0, minShardCount, target)
}

// fallback for target, calculate it from shard to assign ratio.
// target defaults to total assigns / shards
if target == 0 && shards != 0 {
target = uint(uint64(assignment.AssignedScanners) / uint64(shards))
}

shardID := calculateShardID(target, uint(assignment.ScannerIndex))

return createShardConfig(shardID, shards, target)
}

func createShardConfig(shardID, shards, target uint) *config.ShardConfig {
return &config.ShardConfig{
ShardID: shardID,
Target: target,
Shards: shards,
}
}

type privateRegistryStore struct {
ctx context.Context
cfg config.Config
Expand Down Expand Up @@ -367,7 +315,7 @@ func (rs *privateRegistryStore) GetAgentsIfChanged(scanner string) ([]config.Age
shardConfig := &config.ShardConfig{
Shards: shardedBot.Shards,
Target: shardedBot.Target,
ShardID: calculateShardID(shardedBot.Target, botIdx),
ShardID: sharding.CalculateShardID(shardedBot.Target, botIdx),
}

agentID := strconv.Itoa(len(agentConfigs) + i + 1)
Expand Down
Loading
Loading