-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Log Event Trigger Capability (#14308)
* Log Event Trigger Capability * Minor refactoring * Moved main script to plugins/cmd * Added initial implementation for UnregisterTrigger * Create NewContractReader in RegisterTrigger flow of the trigger capability * Refactoring to integrate with ChainReader QueryKey API * Integrate with ChainReader QueryKey interface * Minor changes * Send cursor in QueryKey in subsequent calls * Test utils for LOOP capability * Happy path test for log event trigger capability * Float64 fix in values * Happy path integration test for Log Event Trigger Capability * Fix code lint annotations * Addressed PR comments * Added changeset * Addressed Lint errors * Addressed PR comments * Addressed more lint issues * Simplified trigger ctx creation and cancel flows * Added comment * Addressed PR comments * Implemented Start/Close pattern in logEventTrigger and used stopChan to track listener * Addressed more PR comments * Handled errors from Info and Close methods * Fixed lint errors and pass ctx to Info * Handle race conditions in log event trigger service * Fixed lint errors * Minor change * Test fix and lint fixes * Move EVM specific tests out of chain-agnostic capability * Set block time * Check existence of trigger in slow path * Complete usage of services.Service with StartOnce and StopOnce with tests updated * Wait for all goroutines to exit in test * Cleanup logpoller and headtracker after test
- Loading branch information
1 parent
02472a6
commit 3e9e058
Showing
9 changed files
with
950 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"chainlink": minor | ||
--- | ||
|
||
#added log-event-trigger LOOPP capability, using ChainReader |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
package logevent | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
"github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
"github.com/smartcontractkit/chainlink-common/pkg/services" | ||
"github.com/smartcontractkit/chainlink-common/pkg/types/core" | ||
) | ||
|
||
const ID = "log-event-trigger-%s-%[email protected]" | ||
|
||
const defaultSendChannelBufferSize = 1000 | ||
|
||
// Log Event Trigger Capability Input | ||
type Input struct { | ||
} | ||
|
||
// Log Event Trigger Capabilities Manager | ||
// Manages different log event triggers using an underlying triggerStore | ||
type TriggerService struct { | ||
services.StateMachine | ||
capabilities.CapabilityInfo | ||
capabilities.Validator[RequestConfig, Input, capabilities.TriggerResponse] | ||
lggr logger.Logger | ||
triggers CapabilitiesStore[logEventTrigger, capabilities.TriggerResponse] | ||
relayer core.Relayer | ||
logEventConfig Config | ||
stopCh services.StopChan | ||
} | ||
|
||
// Common capability level config across all workflows | ||
type Config struct { | ||
ChainID string `json:"chainId"` | ||
Network string `json:"network"` | ||
LookbackBlocks uint64 `json:"lookbakBlocks"` | ||
PollPeriod uint32 `json:"pollPeriod"` | ||
} | ||
|
||
func (config Config) Version(capabilityVersion string) string { | ||
return fmt.Sprintf(capabilityVersion, config.Network, config.ChainID) | ||
} | ||
|
||
var _ capabilities.TriggerCapability = (*TriggerService)(nil) | ||
var _ services.Service = &TriggerService{} | ||
|
||
// Creates a new Cron Trigger Service. | ||
// Scheduling will commence on calling .Start() | ||
func NewTriggerService(ctx context.Context, | ||
lggr logger.Logger, | ||
relayer core.Relayer, | ||
logEventConfig Config) (*TriggerService, error) { | ||
l := logger.Named(lggr, "LogEventTriggerCapabilityService") | ||
|
||
logEventStore := NewCapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]() | ||
|
||
s := &TriggerService{ | ||
lggr: l, | ||
triggers: logEventStore, | ||
relayer: relayer, | ||
logEventConfig: logEventConfig, | ||
stopCh: make(services.StopChan), | ||
} | ||
var err error | ||
s.CapabilityInfo, err = s.Info(ctx) | ||
if err != nil { | ||
return s, err | ||
} | ||
s.Validator = capabilities.NewValidator[RequestConfig, Input, capabilities.TriggerResponse](capabilities.ValidatorArgs{Info: s.CapabilityInfo}) | ||
return s, nil | ||
} | ||
|
||
func (s *TriggerService) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { | ||
return capabilities.NewCapabilityInfo( | ||
s.logEventConfig.Version(ID), | ||
capabilities.CapabilityTypeTrigger, | ||
"A trigger that listens for specific contract log events and starts a workflow run.", | ||
) | ||
} | ||
|
||
// Register a new trigger | ||
// Can register triggers before the service is actively scheduling | ||
func (s *TriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { | ||
if req.Config == nil { | ||
return nil, errors.New("config is required to register a log event trigger") | ||
} | ||
reqConfig, err := s.ValidateConfig(req.Config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
// Add log event trigger with Contract details to CapabilitiesStore | ||
var respCh chan capabilities.TriggerResponse | ||
ok := s.IfNotStopped(func() { | ||
respCh, err = s.triggers.InsertIfNotExists(req.TriggerID, func() (*logEventTrigger, chan capabilities.TriggerResponse, error) { | ||
l, ch, tErr := newLogEventTrigger(ctx, s.lggr, req.Metadata.WorkflowID, reqConfig, s.logEventConfig, s.relayer) | ||
if tErr != nil { | ||
return l, ch, tErr | ||
} | ||
tErr = l.Start(ctx) | ||
return l, ch, tErr | ||
}) | ||
}) | ||
if !ok { | ||
return nil, fmt.Errorf("cannot create new trigger since LogEventTriggerService has been stopped") | ||
} | ||
if err != nil { | ||
return nil, fmt.Errorf("create new trigger failed %w", err) | ||
} | ||
s.lggr.Infow("RegisterTrigger", "triggerId", req.TriggerID, "WorkflowID", req.Metadata.WorkflowID) | ||
return respCh, nil | ||
} | ||
|
||
func (s *TriggerService) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error { | ||
trigger, ok := s.triggers.Read(req.TriggerID) | ||
if !ok { | ||
return fmt.Errorf("triggerId %s not found", req.TriggerID) | ||
} | ||
// Close callback channel and stop log event trigger listener | ||
err := trigger.Close() | ||
if err != nil { | ||
return fmt.Errorf("error closing trigger %s (chainID %s): %w", req.TriggerID, s.logEventConfig.ChainID, err) | ||
} | ||
// Remove from triggers context | ||
s.triggers.Delete(req.TriggerID) | ||
s.lggr.Infow("UnregisterTrigger", "triggerId", req.TriggerID, "WorkflowID", req.Metadata.WorkflowID) | ||
return nil | ||
} | ||
|
||
// Start the service. | ||
func (s *TriggerService) Start(ctx context.Context) error { | ||
return s.StartOnce("LogEventTriggerCapabilityService", func() error { | ||
s.lggr.Info("Starting LogEventTriggerCapabilityService") | ||
return nil | ||
}) | ||
} | ||
|
||
// Close stops the Service. | ||
// After this call the Service cannot be started again, | ||
// The service will need to be re-built to start scheduling again. | ||
func (s *TriggerService) Close() error { | ||
return s.StopOnce("LogEventTriggerCapabilityService", func() error { | ||
s.lggr.Infow("Stopping LogEventTriggerCapabilityService") | ||
triggers := s.triggers.ReadAll() | ||
return services.MultiCloser(triggers).Close() | ||
}) | ||
} | ||
|
||
func (s *TriggerService) HealthReport() map[string]error { | ||
return map[string]error{s.Name(): s.Healthy()} | ||
} | ||
|
||
func (s *TriggerService) Name() string { | ||
return s.lggr.Name() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package logevent | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
) | ||
|
||
type RegisterCapabilityFn[T any, Resp any] func() (*T, chan Resp, error) | ||
|
||
// Interface of the capabilities store | ||
type CapabilitiesStore[T any, Resp any] interface { | ||
Read(capabilityID string) (value *T, ok bool) | ||
ReadAll() (values []*T) | ||
Write(capabilityID string, value *T) | ||
InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error) | ||
Delete(capabilityID string) | ||
} | ||
|
||
// Implementation for the CapabilitiesStore interface | ||
type capabilitiesStore[T any, Resp any] struct { | ||
mu sync.RWMutex | ||
capabilities map[string]*T | ||
} | ||
|
||
var _ CapabilitiesStore[string, string] = (CapabilitiesStore[string, string])(nil) | ||
|
||
// Constructor for capabilitiesStore struct implementing CapabilitiesStore interface | ||
func NewCapabilitiesStore[T any, Resp any]() CapabilitiesStore[T, Resp] { | ||
return &capabilitiesStore[T, Resp]{ | ||
capabilities: map[string]*T{}, | ||
} | ||
} | ||
|
||
func (cs *capabilitiesStore[T, Resp]) Read(capabilityID string) (value *T, ok bool) { | ||
cs.mu.RLock() | ||
defer cs.mu.RUnlock() | ||
trigger, ok := cs.capabilities[capabilityID] | ||
return trigger, ok | ||
} | ||
|
||
func (cs *capabilitiesStore[T, Resp]) ReadAll() (values []*T) { | ||
cs.mu.RLock() | ||
defer cs.mu.RUnlock() | ||
vals := make([]*T, 0) | ||
for _, v := range cs.capabilities { | ||
vals = append(vals, v) | ||
} | ||
return vals | ||
} | ||
|
||
func (cs *capabilitiesStore[T, Resp]) Write(capabilityID string, value *T) { | ||
cs.mu.Lock() | ||
defer cs.mu.Unlock() | ||
cs.capabilities[capabilityID] = value | ||
} | ||
|
||
func (cs *capabilitiesStore[T, Resp]) InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error) { | ||
cs.mu.RLock() | ||
_, ok := cs.capabilities[capabilityID] | ||
cs.mu.RUnlock() | ||
if ok { | ||
return nil, fmt.Errorf("capabilityID %v already exists", capabilityID) | ||
} | ||
cs.mu.Lock() | ||
defer cs.mu.Unlock() | ||
_, ok = cs.capabilities[capabilityID] | ||
if ok { | ||
return nil, fmt.Errorf("capabilityID %v already exists", capabilityID) | ||
} | ||
value, respCh, err := fn() | ||
if err != nil { | ||
return nil, fmt.Errorf("error registering capability: %v", err) | ||
} | ||
cs.capabilities[capabilityID] = value | ||
return respCh, nil | ||
} | ||
|
||
func (cs *capabilitiesStore[T, Resp]) Delete(capabilityID string) { | ||
cs.mu.Lock() | ||
defer cs.mu.Unlock() | ||
delete(cs.capabilities, capabilityID) | ||
} |
Oops, something went wrong.