Skip to content

Commit

Permalink
Merge branch 'develop' into BCFR-945-improve-contract-reader-logs
Browse files Browse the repository at this point in the history
  • Loading branch information
silaslenihan authored Sep 30, 2024
2 parents 8783ee8 + 19690b0 commit 192c823
Show file tree
Hide file tree
Showing 15 changed files with 319 additions and 31 deletions.
5 changes: 5 additions & 0 deletions .changeset/honest-cameras-cross.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Implementing evm specific token data encoder for CCIP #internal
8 changes: 8 additions & 0 deletions .changeset/moody-rules-agree.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"chainlink": patch
---

- register polling subscription to avoid subscription leaking when rpc client gets closed.
- add a temporary special treatment for SubscribeNewHead before we replace it with SubscribeToHeads. Add a goroutine that forwards new head from poller to caller channel.
- fix a deadlock in poller, by using a new lock for subs slice in rpc client.
#bugfix
38 changes: 38 additions & 0 deletions core/capabilities/ccip/ccipevm/tokendata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package ccipevm

import (
"context"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers"
)

type usdcAttestationPayload struct {
Message []byte
Attestation []byte
}

func (m usdcAttestationPayload) AbiString() string {
return `
[{
"components": [
{"name": "message", "type": "bytes"},
{"name": "attestation", "type": "bytes"}
],
"type": "tuple"
}]`
}

type EVMTokenDataEncoder struct{}

func NewEVMTokenDataEncoder() EVMTokenDataEncoder {
return EVMTokenDataEncoder{}
}

func (e EVMTokenDataEncoder) EncodeUSDC(_ context.Context, message cciptypes.Bytes, attestation cciptypes.Bytes) (cciptypes.Bytes, error) {
return abihelpers.EncodeAbiStruct(usdcAttestationPayload{
Message: message,
Attestation: attestation,
})
}
73 changes: 73 additions & 0 deletions core/capabilities/ccip/ccipevm/tokendata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ccipevm

import (
"testing"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/stretchr/testify/require"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers"
)

func Test_EVMTokenDataEncoder(t *testing.T) {
var empty usdcAttestationPayload
encoder := NewEVMTokenDataEncoder()

//https://testnet.snowtrace.io/tx/0xeeb0ad6b26bacd1570a9361724a36e338f4aacf1170dec64399220b7483b7eed/eventlog?chainid=43113
//https://iris-api-sandbox.circle.com/v1/attestations/0x69fb1b419d648cf6c9512acad303746dc85af3b864af81985c76764aba60bf6b
realMessage, err := cciptypes.NewBytesFromString("0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000f8000000000000000100000006000000000004ac0d000000000000000000000000eb08f243e5d3fcff26a9e38ae5520a669f4019d00000000000000000000000009f3b8679c73c2fef8b59b4f3444d4e156fb70aa5000000000000000000000000c08835adf4884e51ff076066706e407506826d9d000000000000000000000000000000005425890298aed601595a70ab815c96711a31bc650000000000000000000000004f32ae7f112c26b109357785e5c66dc5d747fbce00000000000000000000000000000000000000000000000000000000000000640000000000000000000000007a4d8f8c18762d362e64b411d7490fba112811cd0000000000000000")
require.NoError(t, err)
realAttestation, err := cciptypes.NewBytesFromString("0xee466fbd340596aa56e3e40d249869573e4008d84d795b4f2c3cba8649083d08653d38190d0df7e0ee12ae685df2f806d100a03b3716ab1ff2013c7201f1c2d01c9af959b55a4b52dbd0319eed69ce9ace25259830e0b1bff79faf0c9c5d1b5e6d6304e824d657db38f802bcff3e97d0bd30f2ffc62b62381f52c1668ceaa5a73a1b")
require.NoError(t, err)

tt := []struct {
name string
message []byte
attestation []byte
}{
{
name: "empty both fields",
message: nil,
attestation: []byte{},
},
{
name: "empty attestation",
message: []byte("message"),
attestation: nil,
},
{
name: "both attestation and message are set",
message: realMessage,
attestation: realAttestation,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
got, err := encoder.EncodeUSDC(tests.Context(t), tc.message, tc.attestation)
require.NoError(t, err)

decoded, err := abihelpers.ABIDecode(empty.AbiString(), got)
require.NoError(t, err)

converted := abi.ConvertType(decoded[0], &empty)
casted, ok := converted.(*usdcAttestationPayload)
require.True(t, ok)

if tc.message == nil {
require.Empty(t, casted.Message)
} else {
require.Equal(t, tc.message, casted.Message)
}

if tc.attestation == nil {
require.Empty(t, casted.Attestation)
} else {
require.Equal(t, tc.attestation, casted.Attestation)
}
})
}
}
4 changes: 2 additions & 2 deletions core/capabilities/ccip/oraclecreator/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3confighelper"

"github.com/smartcontractkit/chainlink-ccip/execute/tokendata"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/ccipevm"
evmconfig "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/configs/evm"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/ocrimpls"
Expand All @@ -33,6 +32,7 @@ import (
"github.com/smartcontractkit/chainlink-ccip/pluginconfig"

"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
Expand Down Expand Up @@ -242,7 +242,7 @@ func (i *pluginOracleCreator) createFactoryAndTransmitter(
ccipevm.NewExecutePluginCodecV1(),
ccipevm.NewMessageHasherV1(),
i.homeChainReader,
&tokendata.NoopTokenDataObserver{},
ccipevm.NewEVMTokenDataEncoder(),
ccipevm.NewGasEstimateProvider(),
contractReaders,
chainWriters,
Expand Down
47 changes: 42 additions & 5 deletions core/chains/evm/client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ type rpcClient struct {
ws rawclient
http *rawclient

stateMu sync.RWMutex // protects state* fields
stateMu sync.RWMutex // protects state* fields
subsSliceMu sync.RWMutex // protects subscription slice

// Need to track subscriptions because closing the RPC does not (always?)
// close the underlying subscription
Expand Down Expand Up @@ -317,8 +318,8 @@ func (r *rpcClient) getRPCDomain() string {

// registerSub adds the sub to the rpcClient list
func (r *rpcClient) registerSub(sub ethereum.Subscription, stopInFLightCh chan struct{}) error {
r.stateMu.Lock()
defer r.stateMu.Unlock()
r.subsSliceMu.Lock()
defer r.subsSliceMu.Unlock()
// ensure that the `sub` belongs to current life cycle of the `rpcClient` and it should not be killed due to
// previous `DisconnectAll` call.
select {
Expand All @@ -335,12 +336,16 @@ func (r *rpcClient) registerSub(sub ethereum.Subscription, stopInFLightCh chan s
// DisconnectAll disconnects all clients connected to the rpcClient
func (r *rpcClient) DisconnectAll() {
r.stateMu.Lock()
defer r.stateMu.Unlock()
if r.ws.rpc != nil {
r.ws.rpc.Close()
}
r.cancelInflightRequests()
r.stateMu.Unlock()

r.subsSliceMu.Lock()
r.unsubscribeAll()
r.subsSliceMu.Unlock()

r.chainInfoLock.Lock()
r.latestChainInfo = commonclient.ChainInfo{}
r.chainInfoLock.Unlock()
Expand Down Expand Up @@ -496,11 +501,30 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp
if r.newHeadsPollInterval > 0 {
interval := r.newHeadsPollInterval
timeout := interval
poller, _ := commonclient.NewPoller[*evmtypes.Head](interval, r.latestBlock, timeout, r.rpcLog)
poller, pollerCh := commonclient.NewPoller[*evmtypes.Head](interval, r.latestBlock, timeout, r.rpcLog)
if err = poller.Start(ctx); err != nil {
return nil, err
}

// NOTE this is a temporary special treatment for SubscribeNewHead before we refactor head tracker to use SubscribeToHeads
// as we need to forward new head from the poller channel to the channel passed from caller.
go func() {
for head := range pollerCh {
select {
case channel <- head:
// forwarding new head to the channel passed from caller
case <-poller.Err():
// return as poller returns error
return
}
}
}()

err = r.registerSub(&poller, chStopInFlight)
if err != nil {
return nil, err
}

lggr.Debugf("Polling new heads over http")
return &poller, nil
}
Expand Down Expand Up @@ -547,6 +571,11 @@ func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.H
return nil, nil, err
}

err = r.registerSub(&poller, chStopInFlight)
if err != nil {
return nil, nil, err
}

lggr.Debugf("Polling new heads over http")
return channel, &poller, nil
}
Expand Down Expand Up @@ -579,6 +608,8 @@ func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.H
}

func (r *rpcClient) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *evmtypes.Head, commontypes.Subscription, error) {
ctx, cancel, chStopInFlight, _, _ := r.acquireQueryCtx(ctx, r.rpcTimeout)
defer cancel()
interval := r.finalizedBlockPollInterval
if interval == 0 {
return nil, nil, errors.New("FinalizedBlockPollInterval is 0")
Expand All @@ -588,6 +619,12 @@ func (r *rpcClient) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *evmt
if err := poller.Start(ctx); err != nil {
return nil, nil, err
}

err := r.registerSub(&poller, chStopInFlight)
if err != nil {
return nil, nil, err
}

return channel, &poller, nil
}

Expand Down
Loading

0 comments on commit 192c823

Please sign in to comment.