From de4485cbda77de5f551afb60314d1033ddddedc6 Mon Sep 17 00:00:00 2001 From: Sri Kidambi <1702865+kidambisrinivas@users.noreply.github.com> Date: Mon, 30 Sep 2024 16:22:49 +0100 Subject: [PATCH] Handle race conditions in log event trigger service --- core/capabilities/testutils/backend.go | 2 +- core/capabilities/testutils/chain_reader.go | 6 ++--- .../capabilities/triggers/logevent/service.go | 27 +++++++++++++------ 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/core/capabilities/testutils/backend.go b/core/capabilities/testutils/backend.go index 6efa3048758..f41bcc6838d 100644 --- a/core/capabilities/testutils/backend.go +++ b/core/capabilities/testutils/backend.go @@ -103,7 +103,7 @@ func (th *EVMBackendTH) SetupCoreServices(t *testing.T) (logpoller.HeadTracker, return ht, lp } -func (th *EVMBackendTH) NewContractReader(t *testing.T, ctx context.Context, cfg []byte) (types.ContractReader, error) { +func (th *EVMBackendTH) NewContractReader(ctx context.Context, t *testing.T, cfg []byte) (types.ContractReader, error) { crCfg := &evmrelaytypes.ChainReaderConfig{} if err := json.Unmarshal(cfg, crCfg); err != nil { return nil, err diff --git a/core/capabilities/testutils/chain_reader.go b/core/capabilities/testutils/chain_reader.go index cf9e0ea5c71..976099c2a73 100644 --- a/core/capabilities/testutils/chain_reader.go +++ b/core/capabilities/testutils/chain_reader.go @@ -8,11 +8,11 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/test-go/testify/require" + commoncaps "github.com/smartcontractkit/chainlink-common/pkg/capabilities" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" commonvalues "github.com/smartcontractkit/chainlink-common/pkg/values" - "github.com/test-go/testify/require" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/triggers/logevent" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter" coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" @@ -94,7 +94,7 @@ func NewContractReaderTH(t *testing.T) *ContractReaderTH { // Create a new contract reader to return from mock relayer ctx := coretestutils.Context(t) - contractReader, err := backendTH.NewContractReader(t, ctx, contractReaderCfgBytes) + contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes) require.NoError(t, err) return &ContractReaderTH{ diff --git a/core/capabilities/triggers/logevent/service.go b/core/capabilities/triggers/logevent/service.go index 399c40fac8d..36b6b467d53 100644 --- a/core/capabilities/triggers/logevent/service.go +++ b/core/capabilities/triggers/logevent/service.go @@ -22,12 +22,14 @@ type Input struct { // Log Event Trigger Capabilities Manager // Manages different log event triggers using an underlying triggerStore type TriggerService struct { + services.StateMachine capabilities.CapabilityInfo capabilities.Validator[RequestConfig, Input, capabilities.TriggerResponse] lggr logger.Logger triggers CapabilitiesStore[logEventTrigger, capabilities.TriggerResponse] relayer core.Relayer logEventConfig Config + stopCh services.StopChan } // Common capability level config across all workflows @@ -63,6 +65,7 @@ func NewTriggerService(ctx context.Context, p Params) (*TriggerService, error) { triggers: logEventStore, relayer: p.Relayer, logEventConfig: p.LogEventConfig, + stopCh: make(services.StopChan), } var err error s.CapabilityInfo, err = s.Info(ctx) @@ -93,14 +96,19 @@ func (s *TriggerService) RegisterTrigger(ctx context.Context, req capabilities.T } // Add log event trigger with Contract details to CapabilitiesStore var respCh chan capabilities.TriggerResponse - respCh, err = s.triggers.InsertIfNotExists(req.TriggerID, func() (*logEventTrigger, chan capabilities.TriggerResponse, error) { - l, ch, tErr := newLogEventTrigger(ctx, s.lggr, req.Metadata.WorkflowID, reqConfig, s.logEventConfig, s.relayer) - if tErr != nil { + ok := s.IfNotStopped(func() { + respCh, err = s.triggers.InsertIfNotExists(req.TriggerID, func() (*logEventTrigger, chan capabilities.TriggerResponse, error) { + l, ch, tErr := newLogEventTrigger(ctx, s.lggr, req.Metadata.WorkflowID, reqConfig, s.logEventConfig, s.relayer) + if tErr != nil { + return l, ch, tErr + } + tErr = l.Start(ctx) return l, ch, tErr - } - tErr = l.Start(ctx) - return l, ch, tErr + }) }) + if !ok { + return nil, fmt.Errorf("cannot create new trigger since LogEventTriggerService has been stopped") + } if err != nil { return nil, fmt.Errorf("create new trigger failed %w", err) } @@ -133,8 +141,11 @@ func (s *TriggerService) Start(ctx context.Context) error { // After this call the Service cannot be started again, // The service will need to be re-built to start scheduling again. func (s *TriggerService) Close() error { - triggers := s.triggers.ReadAll() - return services.MultiCloser(triggers).Close() + return s.StopOnce("Log Event Trigger Capability Service", func() error { + s.lggr.Infow("Stopping LogEventTrigger Capability Service") + triggers := s.triggers.ReadAll() + return services.MultiCloser(triggers).Close() + }) } func (s *TriggerService) Ready() error {