Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log Event Trigger Capability Development: Part 1 #14308

Merged
merged 45 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
a93f9e9
Log Event Trigger Capability
kidambisrinivas Sep 2, 2024
072214d
Minor refactoring
kidambisrinivas Sep 2, 2024
833c0c1
Moved main script to plugins/cmd
kidambisrinivas Sep 2, 2024
73644e6
Added initial implementation for UnregisterTrigger
kidambisrinivas Sep 2, 2024
0ff783b
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 5, 2024
1b617a6
Create NewContractReader in RegisterTrigger flow of the trigger capab…
kidambisrinivas Sep 9, 2024
b09646b
Refactoring to integrate with ChainReader QueryKey API
kidambisrinivas Sep 10, 2024
d90d860
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 10, 2024
16a87f4
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 13, 2024
a455565
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 16, 2024
4e49c39
Integrate with ChainReader QueryKey interface
kidambisrinivas Sep 17, 2024
d89020e
Minor changes
kidambisrinivas Sep 17, 2024
2967430
Send cursor in QueryKey in subsequent calls
kidambisrinivas Sep 18, 2024
5e3dec0
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 25, 2024
75904af
Test utils for LOOP capability
kidambisrinivas Sep 25, 2024
c26d024
Happy path test for log event trigger capability
kidambisrinivas Sep 26, 2024
3880a5c
Float64 fix in values
kidambisrinivas Sep 26, 2024
60976f7
Happy path integration test for Log Event Trigger Capability
kidambisrinivas Sep 26, 2024
a4a7f0e
Fix code lint annotations
kidambisrinivas Sep 26, 2024
1cc9ce0
Addressed PR comments
kidambisrinivas Sep 27, 2024
d4c5efc
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 30, 2024
4a56bb5
Added changeset
kidambisrinivas Sep 30, 2024
344f6eb
Addressed Lint errors
kidambisrinivas Sep 30, 2024
3c629e4
Addressed PR comments
kidambisrinivas Sep 30, 2024
842da5a
Addressed more lint issues
kidambisrinivas Sep 30, 2024
17b6c1a
Simplified trigger ctx creation and cancel flows
kidambisrinivas Sep 30, 2024
2786cef
Added comment
kidambisrinivas Sep 30, 2024
31954ea
Addressed PR comments
kidambisrinivas Sep 30, 2024
c48b80d
Implemented Start/Close pattern in logEventTrigger and used stopChan …
kidambisrinivas Sep 30, 2024
33bed46
Addressed more PR comments
kidambisrinivas Sep 30, 2024
48c6728
Handled errors from Info and Close methods
kidambisrinivas Sep 30, 2024
3be3488
Fixed lint errors and pass ctx to Info
kidambisrinivas Sep 30, 2024
de4485c
Handle race conditions in log event trigger service
kidambisrinivas Sep 30, 2024
ecf40eb
Fixed lint errors
kidambisrinivas Sep 30, 2024
6e452f7
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 30, 2024
e23f0d1
Minor change
kidambisrinivas Sep 30, 2024
017c1f7
Test fix and lint fixes
kidambisrinivas Sep 30, 2024
d70aff4
Move EVM specific tests out of chain-agnostic capability
kidambisrinivas Sep 30, 2024
97e18b8
Set block time
kidambisrinivas Sep 30, 2024
7f316b0
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 30, 2024
723b872
Check existence of trigger in slow path
kidambisrinivas Sep 30, 2024
9b6d66b
Complete usage of services.Service with StartOnce and StopOnce with t…
kidambisrinivas Sep 30, 2024
ecd045e
Wait for all goroutines to exit in test
kidambisrinivas Sep 30, 2024
7562bc1
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 30, 2024
cd33c86
Cleanup logpoller and headtracker after test
kidambisrinivas Sep 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions core/capabilities/triggers/logevent/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package logevent

import (
"context"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
)

const ID = "log-event-trigger-%s-%[email protected]"

const defaultSendChannelBufferSize = 1000

var logEventTriggerInfo = capabilities.MustNewCapabilityInfo(
ID,
capabilities.CapabilityTypeTrigger,
"A trigger that listens for specific contract log events and starts a workflow run.",
)

// Log Event Trigger Capability Input
type Input struct {
}

// Log Event Trigger Capability Payload
type Payload struct {
// Time that Log Event Trigger's task execution occurred (RFC3339Nano formatted)
ActualExecutionTime string
}
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved

// Log Event Trigger Capability Response
type Response struct {
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
capabilities.TriggerEvent
Metadata struct{}
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
Payload Payload
}

// Log Event Trigger Capabilities Manager
// Manages different log event triggers using an underlying triggerStore
type LogEventTriggerService struct {
capabilities.CapabilityInfo
capabilities.Validator[RequestConfig, Input, capabilities.TriggerResponse]
lggr logger.Logger
triggers CapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]
relayer core.Relayer
logEventConfig LogEventConfig
}

// Common capability level config across all workflows
type LogEventConfig struct {
ChainId uint64 `json:"chainId"`
Network string `json:"network"`
LookbackBlocks uint64 `json:"lookbakBlocks"`
PollPeriod uint64 `json:"pollPeriod"`
}

type Params struct {
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
Logger logger.Logger
Relayer core.Relayer
RelayerSet core.RelayerSet
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
LogEventConfig LogEventConfig
}

var _ capabilities.TriggerCapability = (*LogEventTriggerService)(nil)
var _ services.Service = &LogEventTriggerService{}

// Creates a new Cron Trigger Service.
// Scheduling will commence on calling .Start()
func NewLogEventTriggerService(p Params) *LogEventTriggerService {
l := logger.Named(p.Logger, "LogEventTriggerCapabilityService: ")

logEventStore := NewCapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]()

return &LogEventTriggerService{
CapabilityInfo: logEventTriggerInfo,
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
lggr: l,
triggers: logEventStore,
relayer: p.Relayer,
logEventConfig: p.LogEventConfig,
}
}

func (s *LogEventTriggerService) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
return capabilities.NewCapabilityInfo(
fmt.Sprintf(ID, s.logEventConfig.Network, s.logEventConfig.ChainId),
capabilities.CapabilityTypeTrigger,
"A trigger that listens for specific contract log events and starts a workflow run.",
)
}

// Register a new trigger
// Can register triggers before the service is actively scheduling
func (s *LogEventTriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
if req.Config == nil {
return nil, errors.New("config is required to register a log event trigger")
}
reqConfig, err := s.ValidateConfig(req.Config)
if err != nil {
return nil, err
}
// Add log event trigger with Contract details to CapabilitiesStore
respCh, err := s.triggers.InsertIfNotExists(req.TriggerID, func() (*logEventTrigger, chan capabilities.TriggerResponse, error) {
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
return newLogEventTrigger(ctx, reqConfig, s.logEventConfig, s.relayer)
})
if err != nil {
return nil, fmt.Errorf("LogEventTrigger %v", err)
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
}
s.lggr.Debugw("RegisterTrigger", "triggerId", req.TriggerID)
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
return respCh, nil
}

func (s *LogEventTriggerService) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error {
trigger, ok := s.triggers.Read(req.TriggerID)
if !ok {
return fmt.Errorf("triggerId %s not found", req.TriggerID)
}
// Close callback channel and stop log event trigger listener
trigger.Stop()
// Remove from triggers context
s.triggers.Delete(req.TriggerID)
s.lggr.Debugw("UnregisterTrigger", "triggerId", req.TriggerID)
return nil
}

// Start the service.
func (s *LogEventTriggerService) Start(ctx context.Context) error {
if s.relayer == nil {
cedric-cordenier marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("service has shutdown, it must be built again to restart")
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
}

// Close stops the Service.
// After this call the Service cannot be started again,
// The service will need to be re-built to start scheduling again.
func (s *LogEventTriggerService) Close() error {
return nil
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *LogEventTriggerService) Ready() error {
return nil
}

func (s *LogEventTriggerService) HealthReport() map[string]error {
return map[string]error{s.Name(): nil}
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *LogEventTriggerService) Name() string {
return "Service"
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
}
71 changes: 71 additions & 0 deletions core/capabilities/triggers/logevent/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package logevent

import (
"fmt"
"sync"
)

type RegisterCapabilityFn[T any, Resp any] func() (*T, chan Resp, error)

// Interface of the capabilities store
type CapabilitiesStore[T any, Resp any] interface {
ettec marked this conversation as resolved.
Show resolved Hide resolved
Read(capabilityID string) (value *T, ok bool)
ReadAll() (values map[string]*T)
Write(capabilityID string, value *T)
InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error)
Delete(capabilityID string)
}

// Implementation for the CapabilitiesStore interface
type capabilitiesStore[T any, Resp any] struct {
mu sync.RWMutex
capabilities map[string]*T
}

var _ CapabilitiesStore[string, string] = (CapabilitiesStore[string, string])(nil)

// Constructor for capabilitiesStore struct implementing CapabilitiesStore interface
func NewCapabilitiesStore[T any, Resp any]() CapabilitiesStore[T, Resp] {
return &capabilitiesStore[T, Resp]{
capabilities: map[string]*T{},
}
}

func (cs *capabilitiesStore[T, Resp]) Read(capabilityID string) (value *T, ok bool) {
cs.mu.RLock()
defer cs.mu.RUnlock()
trigger, ok := cs.capabilities[capabilityID]
return trigger, ok
}

func (cs *capabilitiesStore[T, Resp]) ReadAll() (values map[string]*T) {
cs.mu.RLock()
defer cs.mu.RUnlock()
return cs.capabilities
}

func (cs *capabilitiesStore[T, Resp]) Write(capabilityID string, value *T) {
cs.mu.Lock()
defer cs.mu.Unlock()
cs.capabilities[capabilityID] = value
}

func (cs *capabilitiesStore[T, Resp]) InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error) {
cs.mu.Lock()
defer cs.mu.Unlock()
if _, ok := cs.capabilities[capabilityID]; ok {
return nil, fmt.Errorf("capabilityID %v already exists", capabilityID)
}
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
value, respCh, err := fn()
if err != nil {
return nil, fmt.Errorf("error registering capability: %v", err)
}
cs.capabilities[capabilityID] = value
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
return respCh, nil
}

func (cs *capabilitiesStore[T, Resp]) Delete(capabilityID string) {
cs.mu.Lock()
defer cs.mu.Unlock()
delete(cs.capabilities, capabilityID)
}
Loading