From 3e9e058da4f2ba4b51286d4f05aa7efcba179e79 Mon Sep 17 00:00:00 2001 From: Sri Kidambi <1702865+kidambisrinivas@users.noreply.github.com> Date: Mon, 30 Sep 2024 21:29:56 +0100 Subject: [PATCH] Log Event Trigger Capability (#14308) * Log Event Trigger Capability * Minor refactoring * Moved main script to plugins/cmd * Added initial implementation for UnregisterTrigger * Create NewContractReader in RegisterTrigger flow of the trigger capability * Refactoring to integrate with ChainReader QueryKey API * Integrate with ChainReader QueryKey interface * Minor changes * Send cursor in QueryKey in subsequent calls * Test utils for LOOP capability * Happy path test for log event trigger capability * Float64 fix in values * Happy path integration test for Log Event Trigger Capability * Fix code lint annotations * Addressed PR comments * Added changeset * Addressed Lint errors * Addressed PR comments * Addressed more lint issues * Simplified trigger ctx creation and cancel flows * Added comment * Addressed PR comments * Implemented Start/Close pattern in logEventTrigger and used stopChan to track listener * Addressed more PR comments * Handled errors from Info and Close methods * Fixed lint errors and pass ctx to Info * Handle race conditions in log event trigger service * Fixed lint errors * Minor change * Test fix and lint fixes * Move EVM specific tests out of chain-agnostic capability * Set block time * Check existence of trigger in slow path * Complete usage of services.Service with StartOnce and StopOnce with tests updated * Wait for all goroutines to exit in test * Cleanup logpoller and headtracker after test --- .changeset/chilly-crews-retire.md | 5 + .../capabilities/triggers/logevent/service.go | 157 +++++++++++++ core/capabilities/triggers/logevent/store.go | 82 +++++++ .../capabilities/triggers/logevent/trigger.go | 210 ++++++++++++++++++ .../capabilities/log_event_trigger_test.go | 89 ++++++++ .../evm/capabilities/testutils/backend.go | 120 ++++++++++ .../capabilities/testutils/chain_reader.go | 169 ++++++++++++++ core/services/relay/evm/chain_reader.go | 1 - .../capabilities/log-event-trigger/main.go | 118 ++++++++++ 9 files changed, 950 insertions(+), 1 deletion(-) create mode 100644 .changeset/chilly-crews-retire.md create mode 100644 core/capabilities/triggers/logevent/service.go create mode 100644 core/capabilities/triggers/logevent/store.go create mode 100644 core/capabilities/triggers/logevent/trigger.go create mode 100644 core/services/relay/evm/capabilities/log_event_trigger_test.go create mode 100644 core/services/relay/evm/capabilities/testutils/backend.go create mode 100644 core/services/relay/evm/capabilities/testutils/chain_reader.go create mode 100644 plugins/cmd/capabilities/log-event-trigger/main.go diff --git a/.changeset/chilly-crews-retire.md b/.changeset/chilly-crews-retire.md new file mode 100644 index 00000000000..28b531a9ddb --- /dev/null +++ b/.changeset/chilly-crews-retire.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#added log-event-trigger LOOPP capability, using ChainReader diff --git a/core/capabilities/triggers/logevent/service.go b/core/capabilities/triggers/logevent/service.go new file mode 100644 index 00000000000..7ed4855e097 --- /dev/null +++ b/core/capabilities/triggers/logevent/service.go @@ -0,0 +1,157 @@ +package logevent + +import ( + "context" + "errors" + "fmt" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" +) + +const ID = "log-event-trigger-%s-%s@1.0.0" + +const defaultSendChannelBufferSize = 1000 + +// Log Event Trigger Capability Input +type Input struct { +} + +// Log Event Trigger Capabilities Manager +// Manages different log event triggers using an underlying triggerStore +type TriggerService struct { + services.StateMachine + capabilities.CapabilityInfo + capabilities.Validator[RequestConfig, Input, capabilities.TriggerResponse] + lggr logger.Logger + triggers CapabilitiesStore[logEventTrigger, capabilities.TriggerResponse] + relayer core.Relayer + logEventConfig Config + stopCh services.StopChan +} + +// Common capability level config across all workflows +type Config struct { + ChainID string `json:"chainId"` + Network string `json:"network"` + LookbackBlocks uint64 `json:"lookbakBlocks"` + PollPeriod uint32 `json:"pollPeriod"` +} + +func (config Config) Version(capabilityVersion string) string { + return fmt.Sprintf(capabilityVersion, config.Network, config.ChainID) +} + +var _ capabilities.TriggerCapability = (*TriggerService)(nil) +var _ services.Service = &TriggerService{} + +// Creates a new Cron Trigger Service. +// Scheduling will commence on calling .Start() +func NewTriggerService(ctx context.Context, + lggr logger.Logger, + relayer core.Relayer, + logEventConfig Config) (*TriggerService, error) { + l := logger.Named(lggr, "LogEventTriggerCapabilityService") + + logEventStore := NewCapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]() + + s := &TriggerService{ + lggr: l, + triggers: logEventStore, + relayer: relayer, + logEventConfig: logEventConfig, + stopCh: make(services.StopChan), + } + var err error + s.CapabilityInfo, err = s.Info(ctx) + if err != nil { + return s, err + } + s.Validator = capabilities.NewValidator[RequestConfig, Input, capabilities.TriggerResponse](capabilities.ValidatorArgs{Info: s.CapabilityInfo}) + return s, nil +} + +func (s *TriggerService) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { + return capabilities.NewCapabilityInfo( + s.logEventConfig.Version(ID), + capabilities.CapabilityTypeTrigger, + "A trigger that listens for specific contract log events and starts a workflow run.", + ) +} + +// Register a new trigger +// Can register triggers before the service is actively scheduling +func (s *TriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { + if req.Config == nil { + return nil, errors.New("config is required to register a log event trigger") + } + reqConfig, err := s.ValidateConfig(req.Config) + if err != nil { + return nil, err + } + // Add log event trigger with Contract details to CapabilitiesStore + var respCh chan capabilities.TriggerResponse + ok := s.IfNotStopped(func() { + respCh, err = s.triggers.InsertIfNotExists(req.TriggerID, func() (*logEventTrigger, chan capabilities.TriggerResponse, error) { + l, ch, tErr := newLogEventTrigger(ctx, s.lggr, req.Metadata.WorkflowID, reqConfig, s.logEventConfig, s.relayer) + if tErr != nil { + return l, ch, tErr + } + tErr = l.Start(ctx) + return l, ch, tErr + }) + }) + if !ok { + return nil, fmt.Errorf("cannot create new trigger since LogEventTriggerService has been stopped") + } + if err != nil { + return nil, fmt.Errorf("create new trigger failed %w", err) + } + s.lggr.Infow("RegisterTrigger", "triggerId", req.TriggerID, "WorkflowID", req.Metadata.WorkflowID) + return respCh, nil +} + +func (s *TriggerService) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error { + trigger, ok := s.triggers.Read(req.TriggerID) + if !ok { + return fmt.Errorf("triggerId %s not found", req.TriggerID) + } + // Close callback channel and stop log event trigger listener + err := trigger.Close() + if err != nil { + return fmt.Errorf("error closing trigger %s (chainID %s): %w", req.TriggerID, s.logEventConfig.ChainID, err) + } + // Remove from triggers context + s.triggers.Delete(req.TriggerID) + s.lggr.Infow("UnregisterTrigger", "triggerId", req.TriggerID, "WorkflowID", req.Metadata.WorkflowID) + return nil +} + +// Start the service. +func (s *TriggerService) Start(ctx context.Context) error { + return s.StartOnce("LogEventTriggerCapabilityService", func() error { + s.lggr.Info("Starting LogEventTriggerCapabilityService") + return nil + }) +} + +// Close stops the Service. +// After this call the Service cannot be started again, +// The service will need to be re-built to start scheduling again. +func (s *TriggerService) Close() error { + return s.StopOnce("LogEventTriggerCapabilityService", func() error { + s.lggr.Infow("Stopping LogEventTriggerCapabilityService") + triggers := s.triggers.ReadAll() + return services.MultiCloser(triggers).Close() + }) +} + +func (s *TriggerService) HealthReport() map[string]error { + return map[string]error{s.Name(): s.Healthy()} +} + +func (s *TriggerService) Name() string { + return s.lggr.Name() +} diff --git a/core/capabilities/triggers/logevent/store.go b/core/capabilities/triggers/logevent/store.go new file mode 100644 index 00000000000..ac9d3741cd1 --- /dev/null +++ b/core/capabilities/triggers/logevent/store.go @@ -0,0 +1,82 @@ +package logevent + +import ( + "fmt" + "sync" +) + +type RegisterCapabilityFn[T any, Resp any] func() (*T, chan Resp, error) + +// Interface of the capabilities store +type CapabilitiesStore[T any, Resp any] interface { + Read(capabilityID string) (value *T, ok bool) + ReadAll() (values []*T) + Write(capabilityID string, value *T) + InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error) + Delete(capabilityID string) +} + +// Implementation for the CapabilitiesStore interface +type capabilitiesStore[T any, Resp any] struct { + mu sync.RWMutex + capabilities map[string]*T +} + +var _ CapabilitiesStore[string, string] = (CapabilitiesStore[string, string])(nil) + +// Constructor for capabilitiesStore struct implementing CapabilitiesStore interface +func NewCapabilitiesStore[T any, Resp any]() CapabilitiesStore[T, Resp] { + return &capabilitiesStore[T, Resp]{ + capabilities: map[string]*T{}, + } +} + +func (cs *capabilitiesStore[T, Resp]) Read(capabilityID string) (value *T, ok bool) { + cs.mu.RLock() + defer cs.mu.RUnlock() + trigger, ok := cs.capabilities[capabilityID] + return trigger, ok +} + +func (cs *capabilitiesStore[T, Resp]) ReadAll() (values []*T) { + cs.mu.RLock() + defer cs.mu.RUnlock() + vals := make([]*T, 0) + for _, v := range cs.capabilities { + vals = append(vals, v) + } + return vals +} + +func (cs *capabilitiesStore[T, Resp]) Write(capabilityID string, value *T) { + cs.mu.Lock() + defer cs.mu.Unlock() + cs.capabilities[capabilityID] = value +} + +func (cs *capabilitiesStore[T, Resp]) InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error) { + cs.mu.RLock() + _, ok := cs.capabilities[capabilityID] + cs.mu.RUnlock() + if ok { + return nil, fmt.Errorf("capabilityID %v already exists", capabilityID) + } + cs.mu.Lock() + defer cs.mu.Unlock() + _, ok = cs.capabilities[capabilityID] + if ok { + return nil, fmt.Errorf("capabilityID %v already exists", capabilityID) + } + value, respCh, err := fn() + if err != nil { + return nil, fmt.Errorf("error registering capability: %v", err) + } + cs.capabilities[capabilityID] = value + return respCh, nil +} + +func (cs *capabilitiesStore[T, Resp]) Delete(capabilityID string) { + cs.mu.Lock() + defer cs.mu.Unlock() + delete(cs.capabilities, capabilityID) +} diff --git a/core/capabilities/triggers/logevent/trigger.go b/core/capabilities/triggers/logevent/trigger.go new file mode 100644 index 00000000000..9a0e1d036c7 --- /dev/null +++ b/core/capabilities/triggers/logevent/trigger.go @@ -0,0 +1,210 @@ +package logevent + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + "github.com/smartcontractkit/chainlink-common/pkg/values" +) + +// Log Event Trigger Capability Request Config Details +type RequestConfig struct { + ContractName string `json:"contractName"` + ContractAddress string `json:"contractAddress"` + ContractEventName string `json:"contractEventName"` + // Log Event Trigger capability takes in a []byte as ContractReaderConfig + // to not depend on evm ChainReaderConfig type and be chain agnostic + ContractReaderConfig map[string]any `json:"contractReaderConfig"` +} + +// LogEventTrigger struct to listen for Contract events using ContractReader gRPC client +// in a loop with a periodic delay of pollPeriod milliseconds, which is specified in +// the job spec +type logEventTrigger struct { + ch chan<- capabilities.TriggerResponse + lggr logger.Logger + + // Contract address and Event Signature to monitor for + reqConfig *RequestConfig + contractReader types.ContractReader + relayer core.Relayer + startBlockNum uint64 + + // Log Event Trigger config with pollPeriod and lookbackBlocks + logEventConfig Config + ticker *time.Ticker + stopChan services.StopChan + done chan bool +} + +// Construct for logEventTrigger struct +func newLogEventTrigger(ctx context.Context, + lggr logger.Logger, + workflowID string, + reqConfig *RequestConfig, + logEventConfig Config, + relayer core.Relayer) (*logEventTrigger, chan capabilities.TriggerResponse, error) { + jsonBytes, err := json.Marshal(reqConfig.ContractReaderConfig) + if err != nil { + return nil, nil, err + } + + // Create a New Contract Reader client, which brings a corresponding ContractReader gRPC service + // in Chainlink Core service + contractReader, err := relayer.NewContractReader(ctx, jsonBytes) + if err != nil { + return nil, nil, + fmt.Errorf("error fetching contractReader for chainID %s from relayerSet: %w", logEventConfig.ChainID, err) + } + + // Bind Contract in ContractReader + boundContracts := []types.BoundContract{{Name: reqConfig.ContractName, Address: reqConfig.ContractAddress}} + err = contractReader.Bind(ctx, boundContracts) + if err != nil { + return nil, nil, err + } + + // Get current block HEAD/tip of the blockchain to start polling from + latestHead, err := relayer.LatestHead(ctx) + if err != nil { + return nil, nil, fmt.Errorf("error getting latestHead from relayer client: %w", err) + } + height, err := strconv.ParseUint(latestHead.Height, 10, 64) + if err != nil { + return nil, nil, fmt.Errorf("invalid height in latestHead from relayer client: %w", err) + } + startBlockNum := uint64(0) + if height > logEventConfig.LookbackBlocks { + startBlockNum = height - logEventConfig.LookbackBlocks + } + + // Setup callback channel, logger and ticker to poll ContractReader + callbackCh := make(chan capabilities.TriggerResponse, defaultSendChannelBufferSize) + ticker := time.NewTicker(time.Duration(logEventConfig.PollPeriod) * time.Millisecond) + + // Initialise a Log Event Trigger + l := &logEventTrigger{ + ch: callbackCh, + lggr: logger.Named(lggr, fmt.Sprintf("LogEventTrigger.%s", workflowID)), + + reqConfig: reqConfig, + contractReader: contractReader, + relayer: relayer, + startBlockNum: startBlockNum, + + logEventConfig: logEventConfig, + ticker: ticker, + stopChan: make(services.StopChan), + done: make(chan bool), + } + return l, callbackCh, nil +} + +func (l *logEventTrigger) Start(ctx context.Context) error { + go l.listen() + return nil +} + +// Start to contract events and trigger workflow runs +func (l *logEventTrigger) listen() { + ctx, cancel := l.stopChan.NewCtx() + defer cancel() + defer close(l.done) + + // Listen for events from lookbackPeriod + var logs []types.Sequence + var err error + logData := make(map[string]any) + cursor := "" + limitAndSort := query.LimitAndSort{ + SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, + } + for { + select { + case <-ctx.Done(): + l.lggr.Infow("Closing trigger server for (waiting for waitGroup)", "ChainID", l.logEventConfig.ChainID, + "ContractName", l.reqConfig.ContractName, + "ContractAddress", l.reqConfig.ContractAddress, + "ContractEventName", l.reqConfig.ContractEventName) + return + case t := <-l.ticker.C: + l.lggr.Infow("Polling event logs from ContractReader using QueryKey at", "time", t, + "startBlockNum", l.startBlockNum, + "cursor", cursor) + if cursor != "" { + limitAndSort.Limit = query.Limit{Cursor: cursor} + } + logs, err = l.contractReader.QueryKey( + ctx, + types.BoundContract{Name: l.reqConfig.ContractName, Address: l.reqConfig.ContractAddress}, + query.KeyFilter{ + Key: l.reqConfig.ContractEventName, + Expressions: []query.Expression{ + query.Confidence(primitives.Finalized), + query.Block(fmt.Sprintf("%d", l.startBlockNum), primitives.Gte), + }, + }, + limitAndSort, + &logData, + ) + if err != nil { + l.lggr.Errorw("QueryKey failure", "err", err) + continue + } + // ChainReader QueryKey API provides logs including the cursor value and not + // after the cursor value. If the response only consists of the log corresponding + // to the cursor and no log after it, then we understand that there are no new + // logs + if len(logs) == 1 && logs[0].Cursor == cursor { + l.lggr.Infow("No new logs since", "cursor", cursor) + continue + } + for _, log := range logs { + if log.Cursor == cursor { + continue + } + triggerResp := createTriggerResponse(log, l.logEventConfig.Version(ID)) + l.ch <- triggerResp + cursor = log.Cursor + } + } + } +} + +// Create log event trigger capability response +func createTriggerResponse(log types.Sequence, version string) capabilities.TriggerResponse { + wrappedPayload, err := values.WrapMap(log) + if err != nil { + return capabilities.TriggerResponse{ + Err: fmt.Errorf("error wrapping trigger event: %s", err), + } + } + return capabilities.TriggerResponse{ + Event: capabilities.TriggerEvent{ + TriggerType: version, + ID: log.Cursor, + Outputs: wrappedPayload, + }, + } +} + +// Close contract event listener for the current contract +// This function is called when UnregisterTrigger is called individually +// for a specific ContractAddress and EventName +// When the whole capability service is stopped, stopChan of the service +// is closed, which would stop all triggers +func (l *logEventTrigger) Close() error { + close(l.stopChan) + <-l.done + return nil +} diff --git a/core/services/relay/evm/capabilities/log_event_trigger_test.go b/core/services/relay/evm/capabilities/log_event_trigger_test.go new file mode 100644 index 00000000000..f2104529b7f --- /dev/null +++ b/core/services/relay/evm/capabilities/log_event_trigger_test.go @@ -0,0 +1,89 @@ +package logevent_test + +import ( + "math/big" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + commonmocks "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/triggers/logevent" + coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils" +) + +// Test for Log Event Trigger Capability happy path for EVM +func TestLogEventTriggerEVMHappyPath(t *testing.T) { + th := testutils.NewContractReaderTH(t) + + logEventConfig := logevent.Config{ + ChainID: th.BackendTH.ChainID.String(), + Network: "evm", + LookbackBlocks: 1000, + PollPeriod: 1000, + } + + // Create a new contract reader to return from mock relayer + ctx := coretestutils.Context(t) + + // Fetch latest head from simulated backend to return from mock relayer + height, err := th.BackendTH.EVMClient.LatestBlockHeight(ctx) + require.NoError(t, err) + block, err := th.BackendTH.EVMClient.BlockByNumber(ctx, height) + require.NoError(t, err) + + // Mock relayer to return a New ContractReader instead of gRPC client of a ContractReader + relayer := commonmocks.NewRelayer(t) + relayer.On("NewContractReader", mock.Anything, th.LogEmitterContractReaderCfg).Return(th.LogEmitterContractReader, nil).Once() + relayer.On("LatestHead", mock.Anything).Return(commontypes.Head{ + Height: height.String(), + Hash: block.Hash().Bytes(), + Timestamp: block.Time(), + }, nil).Once() + + // Create Log Event Trigger Service and register trigger + logEventTriggerService, err := logevent.NewTriggerService(ctx, + th.BackendTH.Lggr, + relayer, + logEventConfig) + require.NoError(t, err) + + // Start the service + servicetest.Run(t, logEventTriggerService) + + log1Ch, err := logEventTriggerService.RegisterTrigger(ctx, th.LogEmitterRegRequest) + require.NoError(t, err) + + expectedLogVal := int64(10) + + // Send a blockchain transaction that emits logs + done := make(chan struct{}) + t.Cleanup(func() { <-done }) + go func() { + defer close(done) + _, err = + th.LogEmitterContract.EmitLog1(th.BackendTH.ContractsOwner, []*big.Int{big.NewInt(expectedLogVal)}) + assert.NoError(t, err) + th.BackendTH.Backend.Commit() + th.BackendTH.Backend.Commit() + th.BackendTH.Backend.Commit() + }() + + // Wait for logs with a timeout + _, output, err := testutils.WaitForLog(th.BackendTH.Lggr, log1Ch, 15*time.Second) + require.NoError(t, err) + th.BackendTH.Lggr.Infow("EmitLog", "output", output) + // Verify if valid cursor is returned + cursor, err := testutils.GetStrVal(output, "Cursor") + require.NoError(t, err) + require.True(t, len(cursor) > 60) + // Verify if Arg0 is correct + actualLogVal, err := testutils.GetBigIntValL2(output, "Data", "Arg0") + require.NoError(t, err) + require.Equal(t, expectedLogVal, actualLogVal.Int64()) +} diff --git a/core/services/relay/evm/capabilities/testutils/backend.go b/core/services/relay/evm/capabilities/testutils/backend.go new file mode 100644 index 00000000000..ef5761b3e4c --- /dev/null +++ b/core/services/relay/evm/capabilities/testutils/backend.go @@ -0,0 +1,120 @@ +package testutils + +import ( + "context" + "encoding/json" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/accounts/abi/bind/backends" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" + evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" + evmrelaytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" +) + +// Test harness with EVM backend and chainlink core services like +// Log Poller and Head Tracker +type EVMBackendTH struct { + // Backend details + Lggr logger.Logger + ChainID *big.Int + Backend *backends.SimulatedBackend + EVMClient evmclient.Client + + ContractsOwner *bind.TransactOpts + ContractsOwnerKey ethkey.KeyV2 + + HeadTracker logpoller.HeadTracker + LogPoller logpoller.LogPoller +} + +// Test harness to create a simulated backend for testing a LOOPCapability +func NewEVMBackendTH(t *testing.T) *EVMBackendTH { + lggr := logger.TestLogger(t) + + ownerKey := cltest.MustGenerateRandomKey(t) + contractsOwner, err := bind.NewKeyedTransactorWithChainID(ownerKey.ToEcdsaPrivKey(), testutils.SimulatedChainID) + require.NoError(t, err) + + // Setup simulated go-ethereum EVM backend + genesisData := core.GenesisAlloc{ + contractsOwner.From: {Balance: assets.Ether(100000).ToInt()}, + } + chainID := testutils.SimulatedChainID + gasLimit := uint32(ethconfig.Defaults.Miner.GasCeil) //nolint:gosec + backend := cltest.NewSimulatedBackend(t, genesisData, gasLimit) + blockTime := time.UnixMilli(int64(backend.Blockchain().CurrentHeader().Time)) //nolint:gosec + err = backend.AdjustTime(time.Since(blockTime) - 24*time.Hour) + require.NoError(t, err) + backend.Commit() + + // Setup backend client + client := evmclient.NewSimulatedBackendClient(t, backend, chainID) + + th := &EVMBackendTH{ + Lggr: lggr, + ChainID: chainID, + Backend: backend, + EVMClient: client, + + ContractsOwner: contractsOwner, + ContractsOwnerKey: ownerKey, + } + th.HeadTracker, th.LogPoller = th.SetupCoreServices(t) + + return th +} + +// Setup core services like log poller and head tracker for the simulated backend +func (th *EVMBackendTH) SetupCoreServices(t *testing.T) (logpoller.HeadTracker, logpoller.LogPoller) { + db := pgtest.NewSqlxDB(t) + const finalityDepth = 2 + ht := headtracker.NewSimulatedHeadTracker(th.EVMClient, false, finalityDepth) + lp := logpoller.NewLogPoller( + logpoller.NewORM(testutils.SimulatedChainID, db, th.Lggr), + th.EVMClient, + th.Lggr, + ht, + logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: finalityDepth, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + }, + ) + require.NoError(t, ht.Start(testutils.Context(t))) + require.NoError(t, lp.Start(testutils.Context(t))) + t.Cleanup(func() { ht.Close() }) + t.Cleanup(func() { lp.Close() }) + return ht, lp +} + +func (th *EVMBackendTH) NewContractReader(ctx context.Context, t *testing.T, cfg []byte) (types.ContractReader, error) { + crCfg := &evmrelaytypes.ChainReaderConfig{} + if err := json.Unmarshal(cfg, crCfg); err != nil { + return nil, err + } + + svc, err := evm.NewChainReaderService(ctx, th.Lggr, th.LogPoller, th.HeadTracker, th.EVMClient, *crCfg) + if err != nil { + return nil, err + } + + return svc, svc.Start(ctx) +} diff --git a/core/services/relay/evm/capabilities/testutils/chain_reader.go b/core/services/relay/evm/capabilities/testutils/chain_reader.go new file mode 100644 index 00000000000..3f0bf82da81 --- /dev/null +++ b/core/services/relay/evm/capabilities/testutils/chain_reader.go @@ -0,0 +1,169 @@ +package testutils + +import ( + "encoding/json" + "fmt" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + + commoncaps "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + commonvalues "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/triggers/logevent" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter" + coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" +) + +// Test harness with EVM backend and chainlink core services like +// Log Poller and Head Tracker +type ContractReaderTH struct { + BackendTH *EVMBackendTH + + LogEmitterAddress *common.Address + LogEmitterContract *log_emitter.LogEmitter + LogEmitterContractReader commontypes.ContractReader + LogEmitterRegRequest commoncaps.TriggerRegistrationRequest + LogEmitterContractReaderCfg []byte +} + +// Creates a new test harness for Contract Reader tests +func NewContractReaderTH(t *testing.T) *ContractReaderTH { + backendTH := NewEVMBackendTH(t) + + // Deploy a test contract LogEmitter for testing ContractReader + logEmitterAddress, _, _, err := + log_emitter.DeployLogEmitter(backendTH.ContractsOwner, backendTH.Backend) + require.NoError(t, err) + logEmitter, err := log_emitter.NewLogEmitter(logEmitterAddress, backendTH.Backend) + require.NoError(t, err) + + // Create new contract reader + reqConfig := logevent.RequestConfig{ + ContractName: "LogEmitter", + ContractAddress: logEmitterAddress.Hex(), + ContractEventName: "Log1", + } + contractReaderCfg := evmtypes.ChainReaderConfig{ + Contracts: map[string]evmtypes.ChainContractReader{ + reqConfig.ContractName: { + ContractPollingFilter: evmtypes.ContractPollingFilter{ + GenericEventNames: []string{reqConfig.ContractEventName}, + }, + ContractABI: log_emitter.LogEmitterABI, + Configs: map[string]*evmtypes.ChainReaderDefinition{ + reqConfig.ContractEventName: { + ChainSpecificName: reqConfig.ContractEventName, + ReadType: evmtypes.Event, + }, + }, + }, + }, + } + + // Encode contractReaderConfig as JSON and decode it into a map[string]any for + // the capability request config. Log Event Trigger capability takes in a + // []byte as ContractReaderConfig to not depend on evm ChainReaderConfig type + // and be chain agnostic + contractReaderCfgBytes, err := json.Marshal(contractReaderCfg) + require.NoError(t, err) + contractReaderCfgMap := make(map[string]any) + err = json.Unmarshal(contractReaderCfgBytes, &contractReaderCfgMap) + require.NoError(t, err) + // Encode the config map as JSON to specify in the expected call in mocked object + // The LogEventTrigger Capability receives a config map, encodes it and + // calls NewContractReader with it + contractReaderCfgBytes, err = json.Marshal(contractReaderCfgMap) + require.NoError(t, err) + + reqConfig.ContractReaderConfig = contractReaderCfgMap + + config, err := commonvalues.WrapMap(reqConfig) + require.NoError(t, err) + req := commoncaps.TriggerRegistrationRequest{ + TriggerID: "logeventtrigger_log1", + Config: config, + Metadata: commoncaps.RequestMetadata{ + ReferenceID: "logeventtrigger", + }, + } + + // Create a new contract reader to return from mock relayer + ctx := coretestutils.Context(t) + contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes) + require.NoError(t, err) + + return &ContractReaderTH{ + BackendTH: backendTH, + + LogEmitterAddress: &logEmitterAddress, + LogEmitterContract: logEmitter, + LogEmitterContractReader: contractReader, + LogEmitterRegRequest: req, + LogEmitterContractReaderCfg: contractReaderCfgBytes, + } +} + +// Wait for a specific log to be emitted to a response channel by ChainReader +func WaitForLog(lggr logger.Logger, logCh <-chan commoncaps.TriggerResponse, timeout time.Duration) ( + *commoncaps.TriggerResponse, map[string]any, error) { + select { + case <-time.After(timeout): + return nil, nil, fmt.Errorf("timeout waiting for Log1 event from ContractReader") + case log := <-logCh: + lggr.Infow("Received log from ContractReader", "event", log.Event.ID) + if log.Err != nil { + return nil, nil, fmt.Errorf("error listening for Log1 event from ContractReader: %v", log.Err) + } + v := make(map[string]any) + err := log.Event.Outputs.UnwrapTo(&v) + if err != nil { + return nil, nil, fmt.Errorf("error unwrapping log to map: (log %v) %v", log.Event.Outputs, log.Err) + } + return &log, v, nil + } +} + +// Get the string value of a key from a generic map[string]any +func GetStrVal(m map[string]any, k string) (string, error) { + v, ok := m[k] + if !ok { + return "", fmt.Errorf("key %s not found", k) + } + vstr, ok := v.(string) + if !ok { + return "", fmt.Errorf("key %s not a string (%T)", k, v) + } + return vstr, nil +} + +// Get int value of a key from a generic map[string]any +func GetBigIntVal(m map[string]any, k string) (*big.Int, error) { + v, ok := m[k] + if !ok { + return nil, fmt.Errorf("key %s not found", k) + } + val, ok := v.(*big.Int) + if !ok { + return nil, fmt.Errorf("key %s not a *big.Int (%T)", k, v) + } + return val, nil +} + +// Get the int value from a map[string]map[string]any +func GetBigIntValL2(m map[string]any, level1Key string, level2Key string) (*big.Int, error) { + v, ok := m[level1Key] + if !ok { + return nil, fmt.Errorf("key %s not found", level1Key) + } + level2Map, ok := v.(map[string]any) + if !ok { + return nil, fmt.Errorf("key %s not a map[string]any (%T)", level1Key, v) + } + return GetBigIntVal(level2Map, level2Key) +} diff --git a/core/services/relay/evm/chain_reader.go b/core/services/relay/evm/chain_reader.go index e5041f5486a..6b9f9411789 100644 --- a/core/services/relay/evm/chain_reader.go +++ b/core/services/relay/evm/chain_reader.go @@ -19,7 +19,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/values" - evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" diff --git a/plugins/cmd/capabilities/log-event-trigger/main.go b/plugins/cmd/capabilities/log-event-trigger/main.go new file mode 100644 index 00000000000..8abecf54aeb --- /dev/null +++ b/plugins/cmd/capabilities/log-event-trigger/main.go @@ -0,0 +1,118 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/hashicorp/go-plugin" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities/triggers/logevent" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" +) + +const ( + serviceName = "LogEventTriggerCapability" +) + +type LogEventTriggerGRPCService struct { + trigger capabilities.TriggerCapability + s *loop.Server + config logevent.Config +} + +func main() { + s := loop.MustNewStartedServer(serviceName) + defer s.Stop() + + s.Logger.Infof("Starting %s", serviceName) + + stopCh := make(chan struct{}) + defer close(stopCh) + + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: loop.StandardCapabilitiesHandshakeConfig(), + Plugins: map[string]plugin.Plugin{ + loop.PluginStandardCapabilitiesName: &loop.StandardCapabilitiesLoop{ + PluginServer: &LogEventTriggerGRPCService{ + s: s, + }, + BrokerConfig: loop.BrokerConfig{Logger: s.Logger, StopCh: stopCh, GRPCOpts: s.GRPCOpts}, + }, + }, + GRPCServer: s.GRPCOpts.NewServer, + }) +} + +func (cs *LogEventTriggerGRPCService) Start(ctx context.Context) error { + return nil +} + +func (cs *LogEventTriggerGRPCService) Close() error { + return nil +} + +func (cs *LogEventTriggerGRPCService) Ready() error { + return nil +} + +func (cs *LogEventTriggerGRPCService) HealthReport() map[string]error { + return nil +} + +func (cs *LogEventTriggerGRPCService) Name() string { + return serviceName +} + +func (cs *LogEventTriggerGRPCService) Infos(ctx context.Context) ([]capabilities.CapabilityInfo, error) { + triggerInfo, err := cs.trigger.Info(ctx) + if err != nil { + return nil, err + } + + return []capabilities.CapabilityInfo{ + triggerInfo, + }, nil +} + +func (cs *LogEventTriggerGRPCService) Initialise( + ctx context.Context, + config string, + telemetryService core.TelemetryService, + store core.KeyValueStore, + capabilityRegistry core.CapabilitiesRegistry, + errorLog core.ErrorLog, + pipelineRunner core.PipelineRunnerService, + relayerSet core.RelayerSet, +) error { + cs.s.Logger.Debugf("Initialising %s", serviceName) + + var logEventConfig logevent.Config + err := json.Unmarshal([]byte(config), &logEventConfig) + if err != nil { + return fmt.Errorf("error decoding log_event_trigger config: %v", err) + } + + relayID := types.NewRelayID(logEventConfig.Network, logEventConfig.ChainID) + relayer, err := relayerSet.Get(ctx, relayID) + if err != nil { + return fmt.Errorf("error fetching relayer for chainID %s from relayerSet: %v", logEventConfig.ChainID, err) + } + + // Set relayer and trigger in LogEventTriggerGRPCService + cs.config = logEventConfig + cs.trigger, err = logevent.NewTriggerService(ctx, cs.s.Logger, relayer, logEventConfig) + if err != nil { + return fmt.Errorf("error creating new trigger for chainID %s: %v", logEventConfig.ChainID, err) + } + + if err := capabilityRegistry.Add(ctx, cs.trigger); err != nil { + return fmt.Errorf("error when adding cron trigger to the registry: %w", err) + } + + return nil +}