Skip to content

Commit

Permalink
Merge pull request #837 from forta-network/caner/forta-1405-inject-ch…
Browse files Browse the repository at this point in the history
…ainshard-config-into-container

Implement v2 sharding and chain ID detection
  • Loading branch information
canercidam authored Dec 20, 2023
2 parents 82f4e31 + fc58d6a commit 659bc79
Show file tree
Hide file tree
Showing 15 changed files with 608 additions and 207 deletions.
14 changes: 10 additions & 4 deletions config/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ type AgentConfig struct {
}

type ShardConfig struct {
ShardID uint `yaml:"shardId" json:"shardId"`
Shards uint `yaml:"shards" json:"shards"`
Target uint `yaml:"target" json:"target"`
ShardID uint `yaml:"shardId" json:"shardId"`
Shards uint `yaml:"shards" json:"shards"`
Target uint `yaml:"target" json:"target"`
ChainID int64 `yaml:"chainId" json:"chainId"`
}

func (ac AgentConfig) ShardID() int32 {
Expand Down Expand Up @@ -111,8 +112,13 @@ func (ac AgentConfig) ContainerName() string {
_, digest := utils.SplitImageRef(ac.Image)

parts := []string{ContainerNamePrefix, "agent", utils.ShortenString(ac.ID, 8), utils.ShortenString(digest, 4)}

if ac.ProtocolVersion >= 2 {
parts = append(parts, "c"+strconv.Itoa(ac.ChainID)) // append the chain id
}

if ac.IsSharded() {
parts = append(parts, strconv.Itoa(int(ac.ShardConfig.ShardID))) // append the shard id at the end
parts = append(parts, "s"+strconv.Itoa(int(ac.ShardConfig.ShardID))) // append the shard id at the end
}
return strings.Join(parts, "-")
}
Expand Down
2 changes: 1 addition & 1 deletion config/agents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestAgentConfig_ContainerNameSharded(t *testing.T) {
Target: 4,
},
}
assert.Equal(t, "forta-agent-0x04f65c-de86-3", cfg.ContainerName())
assert.Equal(t, "forta-agent-0x04f65c-de86-s3", cfg.ContainerName())
}

func TestAgentConfig_Equal(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const (
EnvFortaBotID = "FORTA_BOT_ID"
EnvFortaBotOwner = "FORTA_BOT_OWNER"
EnvFortaChainID = "FORTA_CHAIN_ID"
EnvFortaShardID = "FORTA_SHARD_ID"
EnvFortaShardCount = "FORTA_SHARD_COUNT"
)

// EnvDefaults contain default values for one env.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ replace github.com/docker/docker => github.com/moby/moby v20.10.25+incompatible
require (
github.com/docker/docker v1.6.2
github.com/docker/go-connections v0.4.0
github.com/forta-network/forta-core-go v0.0.0-20231213130038-116dfba0ee5c
github.com/forta-network/forta-core-go v0.0.0-20231213214626-dcfc36f15fa9
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.39.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwU
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/forta-network/forta-core-go v0.0.0-20231213130038-116dfba0ee5c h1:UHD7Mf60j95HucfsKDMBgHus9RQGeyrSJ10SbR4NBJ4=
github.com/forta-network/forta-core-go v0.0.0-20231213130038-116dfba0ee5c/go.mod h1:F4O7Yxhs0WSB9iFLiCVSYMPI1wOKuLMZIk8dbVg+J/Y=
github.com/forta-network/forta-core-go v0.0.0-20231213214626-dcfc36f15fa9 h1:jnkVaA/1kRYDKe5f8qNSRNT3c3SdPoIcP1IDsRRm9zE=
github.com/forta-network/forta-core-go v0.0.0-20231213214626-dcfc36f15fa9/go.mod h1:F4O7Yxhs0WSB9iFLiCVSYMPI1wOKuLMZIk8dbVg+J/Y=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
Expand Down
40 changes: 24 additions & 16 deletions services/components/containers/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,35 @@ func NewBotContainerConfig(
) docker.ContainerConfig {
limits := config.GetAgentResourceLimits(resourcesConfig)

env := map[string]string{
config.EnvJsonRpcHost: config.DockerJSONRPCProxyContainerName,
config.EnvJsonRpcPort: config.DefaultJSONRPCProxyPort,
config.EnvJWTProviderHost: config.DockerJWTProviderContainerName,
config.EnvJWTProviderPort: config.DefaultJWTProviderPort,
config.EnvPublicAPIProxyHost: config.DockerPublicAPIProxyContainerName,
config.EnvPublicAPIProxyPort: config.DefaultPublicAPIProxyPort,
config.EnvAgentGrpcPort: botConfig.GrpcPort(),
config.EnvFortaBotID: botConfig.ID,
config.EnvFortaBotOwner: botConfig.Owner,
}
if botConfig.ChainID > 0 {
env[config.EnvFortaChainID] = fmt.Sprintf("%d", botConfig.ChainID)
}
if botConfig.IsSharded() {
env[config.EnvFortaShardID] = fmt.Sprintf("%d", botConfig.ShardID())
env[config.EnvFortaShardCount] = fmt.Sprintf("%d", botConfig.ShardConfig.Shards)
}

return docker.ContainerConfig{
Name: botConfig.ContainerName(),
Image: botConfig.Image,
NetworkID: networkID,
LinkNetworkIDs: []string{},
Env: map[string]string{
config.EnvJsonRpcHost: config.DockerJSONRPCProxyContainerName,
config.EnvJsonRpcPort: config.DefaultJSONRPCProxyPort,
config.EnvJWTProviderHost: config.DockerJWTProviderContainerName,
config.EnvJWTProviderPort: config.DefaultJWTProviderPort,
config.EnvPublicAPIProxyHost: config.DockerPublicAPIProxyContainerName,
config.EnvPublicAPIProxyPort: config.DefaultPublicAPIProxyPort,
config.EnvAgentGrpcPort: botConfig.GrpcPort(),
config.EnvFortaBotID: botConfig.ID,
config.EnvFortaBotOwner: botConfig.Owner,
config.EnvFortaChainID: fmt.Sprintf("%d", botConfig.ChainID),
},
MaxLogFiles: logConfig.MaxLogFiles,
MaxLogSize: logConfig.MaxLogSize,
CPUQuota: limits.CPUQuota,
Memory: limits.Memory,
Env: env,
MaxLogFiles: logConfig.MaxLogFiles,
MaxLogSize: logConfig.MaxLogSize,
CPUQuota: limits.CPUQuota,
Memory: limits.Memory,
Labels: map[string]string{
docker.LabelFortaIsBot: LabelValueFortaIsBot,
docker.LabelFortaSupervisorStrategyVersion: LabelValueStrategyVersion,
Expand Down
50 changes: 50 additions & 0 deletions services/components/containers/definitions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package containers

import (
"testing"

"github.com/forta-network/forta-node/config"
"github.com/stretchr/testify/require"
)

func TestContainerEnvVar_ChainID(t *testing.T) {
r := require.New(t)

botConfig := config.AgentConfig{
ChainID: 0,
}
containerConfig := NewBotContainerConfig("", botConfig, config.LogConfig{}, config.ResourcesConfig{})
env := containerConfig.Env
r.Equal("", env[config.EnvFortaChainID])

botConfig = config.AgentConfig{
ChainID: 137,
}
containerConfig = NewBotContainerConfig("", botConfig, config.LogConfig{}, config.ResourcesConfig{})
env = containerConfig.Env
r.Equal("137", env[config.EnvFortaChainID])
}

func TestContainerEnvVar_Sharding(t *testing.T) {
r := require.New(t)

botConfig := config.AgentConfig{
ShardConfig: nil,
}
containerConfig := NewBotContainerConfig("", botConfig, config.LogConfig{}, config.ResourcesConfig{})
env := containerConfig.Env
r.Equal("", env[config.EnvFortaShardID])
r.Equal("", env[config.EnvFortaShardCount])

botConfig = config.AgentConfig{
ShardConfig: &config.ShardConfig{
ShardID: 0,
Shards: 2,
Target: 3,
},
}
containerConfig = NewBotContainerConfig("", botConfig, config.LogConfig{}, config.ResourcesConfig{})
env = containerConfig.Env
r.Equal("0", env[config.EnvFortaShardID])
r.Equal("2", env[config.EnvFortaShardCount])
}
6 changes: 6 additions & 0 deletions services/components/lifecycle/bot_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ func (blm *botLifecycleManager) ExitInactiveBots(ctx context.Context) error {
blm.lifecycleMetrics.StatusInactive(config.AgentConfig{ID: inactiveBotID})
continue
}

// TODO: Do not support inactive bot restarts until v2 health check support.
if botConfig.ProtocolVersion >= 2 {
continue
}

inactiveCfgs = append(inactiveCfgs, botConfig)
logger.Info("killing inactive bot for reinitialization")
if err := blm.botClient.StopBot(ctx, botConfig); err != nil {
Expand Down
78 changes: 13 additions & 65 deletions store/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/forta-network/forta-core-go/registry"
"github.com/forta-network/forta-core-go/utils"
"github.com/forta-network/forta-node/config"
"github.com/forta-network/forta-node/store/sharding"
)

var (
Expand All @@ -25,9 +26,6 @@ var (
)

const (
keyDefaultChainSetting = "default"
minShardCount = 1

// This is the force reload interval that helps us ignore the on-chain assignment
// list hash. This helps avoid getting stuck with bad state.
//
Expand Down Expand Up @@ -148,18 +146,6 @@ func (rs *registryStore) FindAgentGlobally(agentID string) (*config.AgentConfig,
return botCfg, err
}

// returns shard id for an index, distributed evenly in an increased order.
// Example:
// Target: 6, Shards: 3
// should be [0,0,0,0,0,0,1,1,1,1,1,1,2,2,2,2,2,2]
func calculateShardID(target, idx uint) uint {
if target == 0 {
return 0
}

return idx / target
}

func (rs *registryStore) getLoadedBot(manifest string) (config.AgentConfig, bool) {
for _, loadedBot := range rs.loadedBots {
if manifest == loadedBot.Manifest {
Expand Down Expand Up @@ -217,7 +203,17 @@ func (rs *registryStore) loadAssignment(assignment *registry.Assignment) (*confi
}

botCfg.Owner = assignment.AgentOwner
botCfg.ShardConfig = populateShardConfig(assignment, agentData, rs.cfg.ChainID)

if botCfg.ProtocolVersion >= 2 {
var ok bool
botCfg.ShardConfig, ok = sharding.CalculateShardConfigV2(assignment, agentData)
if !ok {
return nil, fmt.Errorf("%w: invalid sharding config", errInvalidBot)
}
botCfg.ChainID = int(botCfg.ShardConfig.ChainID)
} else {
botCfg.ShardConfig = sharding.CalculateShardConfig(assignment, agentData, rs.cfg.ChainID)
}

return botCfg, nil
}
Expand Down Expand Up @@ -266,54 +262,6 @@ func NewRegistryStore(ctx context.Context, cfg config.Config) (*registryStore, e
}, nil
}

func populateShardConfig(assignment *registry.Assignment, agentManifest *manifest.SignedAgentManifest, chainID int) *config.ShardConfig {
var (
target, shards uint
)

// check if there is a default chain setting
chainSetting, ok := agentManifest.Manifest.ChainSettings[keyDefaultChainSetting]
// if not a sharded bot, shard is always 0
if ok {
target = chainSetting.Target
shards = chainSetting.Shards
}

// check if there is a chain setting for the scanner's chain
chainIDStr := strconv.FormatInt(int64(chainID), 10)
chainSetting, ok = agentManifest.Manifest.ChainSettings[chainIDStr]
// if not a sharded bot, shard is always 0
if ok {
target = chainSetting.Target
shards = chainSetting.Shards
}

// if no sharding specified, shard count is 1 and target is total assigns
if shards == 0 {
target = uint(assignment.AssignedScanners)

return createShardConfig(0, minShardCount, target)
}

// fallback for target, calculate it from shard to assign ratio.
// target defaults to total assigns / shards
if target == 0 && shards != 0 {
target = uint(uint64(assignment.AssignedScanners) / uint64(shards))
}

shardID := calculateShardID(target, uint(assignment.ScannerIndex))

return createShardConfig(shardID, shards, target)
}

func createShardConfig(shardID, shards, target uint) *config.ShardConfig {
return &config.ShardConfig{
ShardID: shardID,
Target: target,
Shards: shards,
}
}

type privateRegistryStore struct {
ctx context.Context
cfg config.Config
Expand Down Expand Up @@ -367,7 +315,7 @@ func (rs *privateRegistryStore) GetAgentsIfChanged(scanner string) ([]config.Age
shardConfig := &config.ShardConfig{
Shards: shardedBot.Shards,
Target: shardedBot.Target,
ShardID: calculateShardID(shardedBot.Target, botIdx),
ShardID: sharding.CalculateShardID(shardedBot.Target, botIdx),
}

agentID := strconv.Itoa(len(agentConfigs) + i + 1)
Expand Down
Loading

0 comments on commit 659bc79

Please sign in to comment.