Skip to content

Commit

Permalink
Implemented Start/Close pattern in logEventTrigger and used stopChan …
Browse files Browse the repository at this point in the history
…to track listener
  • Loading branch information
kidambisrinivas committed Sep 30, 2024
1 parent 31954ea commit c48b80d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 27 deletions.
23 changes: 8 additions & 15 deletions core/capabilities/triggers/logevent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand Down Expand Up @@ -35,8 +34,6 @@ type TriggerService struct {
triggers CapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]
relayer core.Relayer
logEventConfig Config
stopChan services.StopChan
wg sync.WaitGroup
}

// Common capability level config across all workflows
Expand Down Expand Up @@ -72,7 +69,6 @@ func NewTriggerService(p Params) *TriggerService {
triggers: logEventStore,
relayer: p.Relayer,
logEventConfig: p.LogEventConfig,
stopChan: make(services.StopChan),
}
s.CapabilityInfo, _ = s.Info(context.Background())
s.Validator = capabilities.NewValidator[RequestConfig, Input, capabilities.TriggerResponse](capabilities.ValidatorArgs{Info: s.CapabilityInfo})
Expand Down Expand Up @@ -100,13 +96,11 @@ func (s *TriggerService) RegisterTrigger(ctx context.Context, req capabilities.T
// Add log event trigger with Contract details to CapabilitiesStore
var respCh chan capabilities.TriggerResponse
respCh, err = s.triggers.InsertIfNotExists(req.TriggerID, func() (*logEventTrigger, chan capabilities.TriggerResponse, error) {
ctx, cancel := s.stopChan.NewCtx()
l, ch, err := newLogEventTrigger(ctx, cancel, s.lggr, req.Metadata.WorkflowID, reqConfig, s.logEventConfig, s.relayer)
s.wg.Add(1)
go func() {
defer s.wg.Done()
l.Listen(ctx) // Blocking call, until ctx.Done is issued
}()
l, ch, err := newLogEventTrigger(ctx, s.lggr, req.Metadata.WorkflowID, reqConfig, s.logEventConfig, s.relayer)
if err != nil {
return l, ch, err
}
err = l.Start(ctx)
return l, ch, err
})
if err != nil {
Expand All @@ -122,7 +116,7 @@ func (s *TriggerService) UnregisterTrigger(ctx context.Context, req capabilities
return fmt.Errorf("triggerId %s not found", req.TriggerID)
}
// Close callback channel and stop log event trigger listener
trigger.Stop()
trigger.Close()
// Remove from triggers context
s.triggers.Delete(req.TriggerID)
s.lggr.Infow("UnregisterTrigger", "triggerId", req.TriggerID, "WorkflowID", req.Metadata.WorkflowID)
Expand All @@ -138,9 +132,8 @@ func (s *TriggerService) Start(ctx context.Context) error {
// After this call the Service cannot be started again,
// The service will need to be re-built to start scheduling again.
func (s *TriggerService) Close() error {
close(s.stopChan)
s.wg.Wait()
return nil
triggers := s.triggers.ReadAll()
return services.MultiCloser(triggers).Close()
}

func (s *TriggerService) Ready() error {
Expand Down
10 changes: 7 additions & 3 deletions core/capabilities/triggers/logevent/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type RegisterCapabilityFn[T any, Resp any] func() (*T, chan Resp, error)
// Interface of the capabilities store
type CapabilitiesStore[T any, Resp any] interface {
Read(capabilityID string) (value *T, ok bool)
ReadAll() (values map[string]*T)
ReadAll() (values []*T)
Write(capabilityID string, value *T)
InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error)
Delete(capabilityID string)
Expand Down Expand Up @@ -38,10 +38,14 @@ func (cs *capabilitiesStore[T, Resp]) Read(capabilityID string) (value *T, ok bo
return trigger, ok
}

func (cs *capabilitiesStore[T, Resp]) ReadAll() (values map[string]*T) {
func (cs *capabilitiesStore[T, Resp]) ReadAll() (values []*T) {
cs.mu.RLock()
defer cs.mu.RUnlock()
return cs.capabilities
vals := make([]*T, 0)
for _, v := range cs.capabilities {
vals = append(vals, v)
}
return vals
}

func (cs *capabilitiesStore[T, Resp]) Write(capabilityID string, value *T) {
Expand Down
29 changes: 20 additions & 9 deletions core/capabilities/triggers/logevent/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
Expand Down Expand Up @@ -42,12 +43,12 @@ type logEventTrigger struct {
// Log Event Trigger config with pollPeriod and lookbackBlocks
logEventConfig Config
ticker *time.Ticker
cancel context.CancelFunc
stopChan services.StopChan
done chan bool
}

// Construct for logEventTrigger struct
func newLogEventTrigger(ctx context.Context,
cancel context.CancelFunc,
lggr logger.Logger,
workflowID string,
reqConfig *RequestConfig,
Expand Down Expand Up @@ -103,14 +104,22 @@ func newLogEventTrigger(ctx context.Context,

logEventConfig: logEventConfig,
ticker: ticker,
cancel: cancel,
stopChan: make(services.StopChan),
done: make(chan bool),
}
return l, callbackCh, nil
}

// Listen to contract events and trigger workflow runs
func (l *logEventTrigger) Listen(ctx context.Context) {
defer l.cancel()
func (l *logEventTrigger) Start(ctx context.Context) error {
go l.listen()
return nil
}

// Start to contract events and trigger workflow runs
func (l *logEventTrigger) listen() {
ctx, cancel := l.stopChan.NewCtx()
defer cancel()
defer close(l.done)

// Listen for events from lookbackPeriod
var logs []types.Sequence
Expand Down Expand Up @@ -186,11 +195,13 @@ func createTriggerResponse(log types.Sequence, version string) capabilities.Trig
}
}

// Stop contract event listener for the current contract
// Close contract event listener for the current contract
// This function is called when UnregisterTrigger is called individually
// for a specific ContractAddress and EventName
// When the whole capability service is stopped, stopChan of the service
// is closed, which would stop all triggers
func (l *logEventTrigger) Stop() {
l.cancel()
func (l *logEventTrigger) Close() error {
close(l.stopChan)
<-l.done
return nil
}

0 comments on commit c48b80d

Please sign in to comment.