Skip to content

Commit

Permalink
Handle race conditions in log event trigger service
Browse files Browse the repository at this point in the history
  • Loading branch information
kidambisrinivas committed Sep 30, 2024
1 parent 3be3488 commit de4485c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
2 changes: 1 addition & 1 deletion core/capabilities/testutils/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (th *EVMBackendTH) SetupCoreServices(t *testing.T) (logpoller.HeadTracker,
return ht, lp
}

func (th *EVMBackendTH) NewContractReader(t *testing.T, ctx context.Context, cfg []byte) (types.ContractReader, error) {
func (th *EVMBackendTH) NewContractReader(ctx context.Context, t *testing.T, cfg []byte) (types.ContractReader, error) {
crCfg := &evmrelaytypes.ChainReaderConfig{}
if err := json.Unmarshal(cfg, crCfg); err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions core/capabilities/testutils/chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/test-go/testify/require"

Check failure on line 11 in core/capabilities/testutils/chain_reader.go

View workflow job for this annotation

GitHub Actions / lint

import 'github.com/test-go/testify/require' is not allowed from list 'main': Use github.com/stretchr/testify/require instead (depguard)

commoncaps "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
commonvalues "github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/test-go/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/triggers/logevent"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter"
coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand Down Expand Up @@ -94,7 +94,7 @@ func NewContractReaderTH(t *testing.T) *ContractReaderTH {

// Create a new contract reader to return from mock relayer
ctx := coretestutils.Context(t)
contractReader, err := backendTH.NewContractReader(t, ctx, contractReaderCfgBytes)
contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes)
require.NoError(t, err)

return &ContractReaderTH{
Expand Down
27 changes: 19 additions & 8 deletions core/capabilities/triggers/logevent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ type Input struct {
// Log Event Trigger Capabilities Manager
// Manages different log event triggers using an underlying triggerStore
type TriggerService struct {
services.StateMachine
capabilities.CapabilityInfo
capabilities.Validator[RequestConfig, Input, capabilities.TriggerResponse]
lggr logger.Logger
triggers CapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]
relayer core.Relayer
logEventConfig Config
stopCh services.StopChan
}

// Common capability level config across all workflows
Expand Down Expand Up @@ -63,6 +65,7 @@ func NewTriggerService(ctx context.Context, p Params) (*TriggerService, error) {
triggers: logEventStore,
relayer: p.Relayer,
logEventConfig: p.LogEventConfig,
stopCh: make(services.StopChan),
}
var err error
s.CapabilityInfo, err = s.Info(ctx)
Expand Down Expand Up @@ -93,14 +96,19 @@ func (s *TriggerService) RegisterTrigger(ctx context.Context, req capabilities.T
}
// Add log event trigger with Contract details to CapabilitiesStore
var respCh chan capabilities.TriggerResponse
respCh, err = s.triggers.InsertIfNotExists(req.TriggerID, func() (*logEventTrigger, chan capabilities.TriggerResponse, error) {
l, ch, tErr := newLogEventTrigger(ctx, s.lggr, req.Metadata.WorkflowID, reqConfig, s.logEventConfig, s.relayer)
if tErr != nil {
ok := s.IfNotStopped(func() {
respCh, err = s.triggers.InsertIfNotExists(req.TriggerID, func() (*logEventTrigger, chan capabilities.TriggerResponse, error) {
l, ch, tErr := newLogEventTrigger(ctx, s.lggr, req.Metadata.WorkflowID, reqConfig, s.logEventConfig, s.relayer)
if tErr != nil {
return l, ch, tErr
}
tErr = l.Start(ctx)
return l, ch, tErr
}
tErr = l.Start(ctx)
return l, ch, tErr
})
})
if !ok {
return nil, fmt.Errorf("cannot create new trigger since LogEventTriggerService has been stopped")
}
if err != nil {
return nil, fmt.Errorf("create new trigger failed %w", err)
}
Expand Down Expand Up @@ -133,8 +141,11 @@ func (s *TriggerService) Start(ctx context.Context) error {
// After this call the Service cannot be started again,
// The service will need to be re-built to start scheduling again.
func (s *TriggerService) Close() error {
triggers := s.triggers.ReadAll()
return services.MultiCloser(triggers).Close()
return s.StopOnce("Log Event Trigger Capability Service", func() error {
s.lggr.Infow("Stopping LogEventTrigger Capability Service")
triggers := s.triggers.ReadAll()
return services.MultiCloser(triggers).Close()
})
}

func (s *TriggerService) Ready() error {
Expand Down

0 comments on commit de4485c

Please sign in to comment.