Skip to content

Commit

Permalink
Merge branch 'develop' into CM-378-log-event-trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
kidambisrinivas committed Sep 30, 2024
2 parents ecd045e + 02472a6 commit 7562bc1
Show file tree
Hide file tree
Showing 14 changed files with 885 additions and 170 deletions.
8 changes: 5 additions & 3 deletions integration-tests/deployment/ccip/add_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (

func TestAddChainInbound(t *testing.T) {
// 4 chains where the 4th is added after initial deployment.
e := NewEnvironmentWithCRAndJobs(t, logger.TestLogger(t), 4)
require.Equal(t, len(e.Nodes), 5)
e := NewMemoryEnvironmentWithJobs(t, logger.TestLogger(t), 4)
state, err := LoadOnchainState(e.Env, e.Ab)
require.NoError(t, err)
// Take first non-home chain as the new chain.
Expand Down Expand Up @@ -127,6 +126,9 @@ func TestAddChainInbound(t *testing.T) {
ExecuteProposal(t, e.Env, chainInboundExec, state, sel)
}

replayBlocks, err := LatestBlocksByChain(testcontext.Get(t), e.Env.Chains)
require.NoError(t, err)

// Now configure the new chain using deployer key (not transferred to timelock yet).
var offRampEnables []offramp.OffRampSourceChainConfigArgs
for _, source := range initialDeploy {
Expand Down Expand Up @@ -167,7 +169,7 @@ func TestAddChainInbound(t *testing.T) {
}
// Ensure job related logs are up to date.
time.Sleep(30 * time.Second)
require.NoError(t, ReplayAllLogs(e.Nodes, e.Env.Chains))
ReplayLogs(t, e.Env.Offchain, replayBlocks)

// TODO: Send via all inbound lanes and use parallel helper
// Now that the proposal has been executed we expect to be able to send traffic to this new 4th chain.
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/deployment/ccip/add_lane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestAddLane(t *testing.T) {
// TODO: The offchain code doesn't yet support partial lane
// enablement, need to address then re-enable this test.
t.Skip()
e := NewEnvironmentWithCRAndJobs(t, logger.TestLogger(t), 3)
e := NewMemoryEnvironmentWithJobs(t, logger.TestLogger(t), 3)
// Here we have CR + nodes set up, but no CCIP contracts deployed.
state, err := LoadOnchainState(e.Env, e.Ab)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ import (
func Test0002_InitialDeploy(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := ccdeploy.Context(t)
tenv := ccdeploy.NewEnvironmentWithCRAndFeeds(t, lggr, 3)
tenv := ccdeploy.NewMemoryEnvironment(t, lggr, 3)
e := tenv.Env
nodes := tenv.Nodes
chains := e.Chains

state, err := ccdeploy.LoadOnchainState(tenv.Env, tenv.Ab)
require.NoError(t, err)
Expand Down Expand Up @@ -52,7 +50,7 @@ func Test0002_InitialDeploy(t *testing.T) {
require.NoError(t, err)

// Ensure capreg logs are up to date.
require.NoError(t, ccdeploy.ReplayAllLogs(nodes, chains))
ccdeploy.ReplayLogs(t, e.Offchain, tenv.ReplayBlocks)

// Apply the jobs.
for nodeID, jobs := range output.JobSpecs {
Expand Down
130 changes: 69 additions & 61 deletions integration-tests/deployment/ccip/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/smartcontractkit/chainlink-testing-framework/lib/utils/testcontext"
Expand Down Expand Up @@ -64,11 +65,39 @@ type DeployedEnv struct {
Env deployment.Environment
HomeChainSel uint64
FeedChainSel uint64
ReplayBlocks map[uint64]uint64
}

type DeployedTestEnvironment struct {
DeployedEnv
Nodes map[string]memory.Node
func (e *DeployedEnv) SetupJobs(t *testing.T) {
ctx := testcontext.Get(t)
jbs, err := NewCCIPJobSpecs(e.Env.NodeIDs, e.Env.Offchain)
require.NoError(t, err)
for nodeID, jobs := range jbs {
for _, job := range jobs {
// Note these auto-accept
_, err := e.Env.Offchain.ProposeJob(ctx,
&jobv1.ProposeJobRequest{
NodeId: nodeID,
Spec: job,
})
require.NoError(t, err)
}
}
// Wait for plugins to register filters?
// TODO: Investigate how to avoid.
time.Sleep(30 * time.Second)
ReplayLogs(t, e.Env.Offchain, e.ReplayBlocks)
}

func ReplayLogs(t *testing.T, oc deployment.OffchainClient, replayBlocks map[uint64]uint64) {
switch oc := oc.(type) {
case *memory.JobClient:
require.NoError(t, oc.ReplayLogs(replayBlocks))
case *devenv.JobDistributor:
require.NoError(t, oc.ReplayLogs(replayBlocks))
default:
t.Fatalf("unsupported offchain client type %T", oc)
}
}

func SetUpHomeAndFeedChains(t *testing.T, lggr logger.Logger, homeChainSel, feedChainSel uint64, chains map[uint64]deployment.Chain) (deployment.AddressBook, deployment.CapabilityRegistryConfig) {
Expand All @@ -85,11 +114,24 @@ func SetUpHomeAndFeedChains(t *testing.T, lggr logger.Logger, homeChainSel, feed
}
}

// NewEnvironmentWithCRAndFeeds creates a new CCIP environment
func LatestBlocksByChain(ctx context.Context, chains map[uint64]deployment.Chain) (map[uint64]uint64, error) {
latestBlocks := make(map[uint64]uint64)
for _, chain := range chains {
latesthdr, err := chain.Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to get latest header for chain %d", chain.Selector)
}
block := latesthdr.Number.Uint64()
latestBlocks[chain.Selector] = block
}
return latestBlocks, nil
}

// NewMemoryEnvironment creates a new CCIP environment
// with capreg, feeds and nodes set up.
func NewEnvironmentWithCRAndFeeds(t *testing.T, lggr logger.Logger, numChains int) DeployedTestEnvironment {
func NewMemoryEnvironment(t *testing.T, lggr logger.Logger, numChains int) DeployedEnv {
require.GreaterOrEqual(t, numChains, 2, "numChains must be at least 2 for home and feed chains")
ctx := Context(t)
ctx := testcontext.Get(t)
chains := memory.NewMemoryChains(t, numChains)
// Lower chainSel is home chain.
var chainSels []uint64
Expand All @@ -103,6 +145,8 @@ func NewEnvironmentWithCRAndFeeds(t *testing.T, lggr logger.Logger, numChains in
// Take lowest for determinism.
homeChainSel := chainSels[HomeChainIndex]
feedSel := chainSels[FeedChainIndex]
replayBlocks, err := LatestBlocksByChain(ctx, chains)
require.NoError(t, err)
ab, capReg := SetUpHomeAndFeedChains(t, lggr, homeChainSel, feedSel, chains)

nodes := memory.NewNodes(t, zapcore.InfoLevel, chains, 4, 1, capReg)
Expand All @@ -114,55 +158,21 @@ func NewEnvironmentWithCRAndFeeds(t *testing.T, lggr logger.Logger, numChains in
}

e := memory.NewMemoryEnvironmentFromChainsNodes(t, lggr, chains, nodes)
return DeployedTestEnvironment{
DeployedEnv: DeployedEnv{
Ab: ab,
Env: e,
HomeChainSel: homeChainSel,
FeedChainSel: feedSel,
},
Nodes: nodes,
return DeployedEnv{
Ab: ab,
Env: e,
HomeChainSel: homeChainSel,
FeedChainSel: feedSel,
ReplayBlocks: replayBlocks,
}
}

func NewEnvironmentWithCRAndJobs(t *testing.T, lggr logger.Logger, numChains int) DeployedTestEnvironment {
ctx := Context(t)
e := NewEnvironmentWithCRAndFeeds(t, lggr, numChains)
jbs, err := NewCCIPJobSpecs(e.Env.NodeIDs, e.Env.Offchain)
require.NoError(t, err)
for nodeID, jobs := range jbs {
for _, job := range jobs {
// Note these auto-accept
_, err := e.Env.Offchain.ProposeJob(ctx,
&jobv1.ProposeJobRequest{
NodeId: nodeID,
Spec: job,
})
require.NoError(t, err)
}
}
// Wait for plugins to register filters?
// TODO: Investigate how to avoid.
time.Sleep(30 * time.Second)

// Ensure job related logs are up to date.
require.NoError(t, ReplayAllLogs(e.Nodes, e.Env.Chains))
func NewMemoryEnvironmentWithJobs(t *testing.T, lggr logger.Logger, numChains int) DeployedEnv {
e := NewMemoryEnvironment(t, lggr, numChains)
e.SetupJobs(t)
return e
}

func ReplayAllLogs(nodes map[string]memory.Node, chains map[uint64]deployment.Chain) error {
blockBySel := make(map[uint64]uint64)
for sel := range chains {
blockBySel[sel] = 1
}
for _, node := range nodes {
if err := node.ReplayLogs(blockBySel); err != nil {
return err
}
}
return nil
}

func SendRequest(t *testing.T, e deployment.Environment, state CCIPOnChainState, src, dest uint64, testRouter bool) uint64 {
msg := router.ClientEVM2AnyMessage{
Receiver: common.LeftPadBytes(state.Chains[dest].Receiver.Address().Bytes(), 32),
Expand Down Expand Up @@ -228,8 +238,8 @@ func (d DeployedLocalDevEnvironment) RestartChainlinkNodes(t *testing.T) error {
return errGrp.Wait()
}

func NewDeployedLocalDevEnvironment(t *testing.T, lggr logger.Logger) DeployedLocalDevEnvironment {
ctx := Context(t)
func NewLocalDevEnvironment(t *testing.T, lggr logger.Logger) DeployedEnv {
ctx := testcontext.Get(t)
// create a local docker environment with simulated chains and job-distributor
// we cannot create the chainlink nodes yet as we need to deploy the capability registry first
envConfig, testEnv, cfg := devenv.CreateDockerEnv(t)
Expand All @@ -243,6 +253,8 @@ func NewDeployedLocalDevEnvironment(t *testing.T, lggr logger.Logger) DeployedLo
require.NotEmpty(t, homeChainSel, "homeChainSel should not be empty")
feedSel := envConfig.FeedChainSelector
require.NotEmpty(t, feedSel, "feedSel should not be empty")
replayBlocks, err := LatestBlocksByChain(ctx, chains)
require.NoError(t, err)
ab, capReg := SetUpHomeAndFeedChains(t, lggr, homeChainSel, feedSel, chains)

// start the chainlink nodes with the CR address
Expand All @@ -252,20 +264,16 @@ func NewDeployedLocalDevEnvironment(t *testing.T, lggr logger.Logger) DeployedLo
e, don, err := devenv.NewEnvironment(ctx, lggr, *envConfig)
require.NoError(t, err)
require.NotNil(t, e)
require.NotNil(t, don)
zeroLogLggr := logging.GetTestLogger(t)
// fund the nodes
devenv.FundNodes(t, zeroLogLggr, testEnv, cfg, don.PluginNodes())

return DeployedLocalDevEnvironment{
DeployedEnv: DeployedEnv{
Ab: ab,
Env: *e,
HomeChainSel: homeChainSel,
FeedChainSel: feedSel,
},
DON: don,
testEnv: testEnv,
return DeployedEnv{
Ab: ab,
Env: *e,
HomeChainSel: homeChainSel,
FeedChainSel: feedSel,
ReplayBlocks: replayBlocks,
}
}

Expand Down
32 changes: 31 additions & 1 deletion integration-tests/deployment/devenv/build_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors"
"github.com/rs/zerolog"
chainselectors "github.com/smartcontractkit/chain-selectors"
"github.com/stretchr/testify/require"
"github.com/subosito/gotenv"
"golang.org/x/sync/errgroup"

"github.com/smartcontractkit/chainlink-testing-framework/lib/utils/testcontext"

"github.com/smartcontractkit/chainlink-testing-framework/lib/utils/conversions"
"github.com/smartcontractkit/chainlink-testing-framework/seth"
Expand Down Expand Up @@ -207,7 +211,10 @@ func StartChainlinkNodes(
InternalIP: n.API.InternalIP(),
}
}
envConfig.nodeInfo = nodeInfo
if envConfig == nil {
envConfig = &EnvironmentConfig{}
}
envConfig.JDConfig.nodeInfo = nodeInfo
return nil
}

Expand Down Expand Up @@ -345,3 +352,26 @@ func CreateChainConfigFromNetworks(
}
return chains
}

// RestartChainlinkNodes restarts the chainlink nodes in the test environment
func RestartChainlinkNodes(t *testing.T, env *test_env.CLClusterTestEnv) error {
errGrp := errgroup.Group{}
if env == nil || env.ClCluster == nil {
return errors.Wrap(errors.New("no testenv or clcluster found "), "error restarting node")
}
for _, n := range env.ClCluster.Nodes {
n := n
errGrp.Go(func() error {
if err := n.Container.Terminate(testcontext.Get(t)); err != nil {
return err
}
err := n.RestartContainer()
if err != nil {
return err
}
return nil
})

}
return errGrp.Wait()
}
Loading

0 comments on commit 7562bc1

Please sign in to comment.