Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log Event Trigger Capability Development: Part 1 #14308

Merged
merged 45 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
a93f9e9
Log Event Trigger Capability
kidambisrinivas Sep 2, 2024
072214d
Minor refactoring
kidambisrinivas Sep 2, 2024
833c0c1
Moved main script to plugins/cmd
kidambisrinivas Sep 2, 2024
73644e6
Added initial implementation for UnregisterTrigger
kidambisrinivas Sep 2, 2024
0ff783b
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 5, 2024
1b617a6
Create NewContractReader in RegisterTrigger flow of the trigger capab…
kidambisrinivas Sep 9, 2024
b09646b
Refactoring to integrate with ChainReader QueryKey API
kidambisrinivas Sep 10, 2024
d90d860
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 10, 2024
16a87f4
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 13, 2024
a455565
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 16, 2024
4e49c39
Integrate with ChainReader QueryKey interface
kidambisrinivas Sep 17, 2024
d89020e
Minor changes
kidambisrinivas Sep 17, 2024
2967430
Send cursor in QueryKey in subsequent calls
kidambisrinivas Sep 18, 2024
5e3dec0
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 25, 2024
75904af
Test utils for LOOP capability
kidambisrinivas Sep 25, 2024
c26d024
Happy path test for log event trigger capability
kidambisrinivas Sep 26, 2024
3880a5c
Float64 fix in values
kidambisrinivas Sep 26, 2024
60976f7
Happy path integration test for Log Event Trigger Capability
kidambisrinivas Sep 26, 2024
a4a7f0e
Fix code lint annotations
kidambisrinivas Sep 26, 2024
1cc9ce0
Addressed PR comments
kidambisrinivas Sep 27, 2024
d4c5efc
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 30, 2024
4a56bb5
Added changeset
kidambisrinivas Sep 30, 2024
344f6eb
Addressed Lint errors
kidambisrinivas Sep 30, 2024
3c629e4
Addressed PR comments
kidambisrinivas Sep 30, 2024
842da5a
Addressed more lint issues
kidambisrinivas Sep 30, 2024
17b6c1a
Simplified trigger ctx creation and cancel flows
kidambisrinivas Sep 30, 2024
2786cef
Added comment
kidambisrinivas Sep 30, 2024
31954ea
Addressed PR comments
kidambisrinivas Sep 30, 2024
c48b80d
Implemented Start/Close pattern in logEventTrigger and used stopChan …
kidambisrinivas Sep 30, 2024
33bed46
Addressed more PR comments
kidambisrinivas Sep 30, 2024
48c6728
Handled errors from Info and Close methods
kidambisrinivas Sep 30, 2024
3be3488
Fixed lint errors and pass ctx to Info
kidambisrinivas Sep 30, 2024
de4485c
Handle race conditions in log event trigger service
kidambisrinivas Sep 30, 2024
ecf40eb
Fixed lint errors
kidambisrinivas Sep 30, 2024
6e452f7
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 30, 2024
e23f0d1
Minor change
kidambisrinivas Sep 30, 2024
017c1f7
Test fix and lint fixes
kidambisrinivas Sep 30, 2024
d70aff4
Move EVM specific tests out of chain-agnostic capability
kidambisrinivas Sep 30, 2024
97e18b8
Set block time
kidambisrinivas Sep 30, 2024
7f316b0
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 30, 2024
723b872
Check existence of trigger in slow path
kidambisrinivas Sep 30, 2024
9b6d66b
Complete usage of services.Service with StartOnce and StopOnce with t…
kidambisrinivas Sep 30, 2024
ecd045e
Wait for all goroutines to exit in test
kidambisrinivas Sep 30, 2024
7562bc1
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 30, 2024
cd33c86
Cleanup logpoller and headtracker after test
kidambisrinivas Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chilly-crews-retire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added log-event-trigger LOOPP capability, using ChainReader
157 changes: 157 additions & 0 deletions core/capabilities/triggers/logevent/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package logevent

import (
"context"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
)

const ID = "log-event-trigger-%s-%[email protected]"

const defaultSendChannelBufferSize = 1000

// Log Event Trigger Capability Input
type Input struct {
}

// Log Event Trigger Capabilities Manager
// Manages different log event triggers using an underlying triggerStore
type TriggerService struct {
services.StateMachine
capabilities.CapabilityInfo
capabilities.Validator[RequestConfig, Input, capabilities.TriggerResponse]
lggr logger.Logger
triggers CapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]
relayer core.Relayer
logEventConfig Config
stopCh services.StopChan
}

// Common capability level config across all workflows
type Config struct {
ChainID string `json:"chainId"`
Network string `json:"network"`
LookbackBlocks uint64 `json:"lookbakBlocks"`
PollPeriod uint32 `json:"pollPeriod"`
}

func (config Config) Version(capabilityVersion string) string {
return fmt.Sprintf(capabilityVersion, config.Network, config.ChainID)
}

var _ capabilities.TriggerCapability = (*TriggerService)(nil)
var _ services.Service = &TriggerService{}

// Creates a new Cron Trigger Service.
// Scheduling will commence on calling .Start()
func NewTriggerService(ctx context.Context,
lggr logger.Logger,
relayer core.Relayer,
logEventConfig Config) (*TriggerService, error) {
l := logger.Named(lggr, "LogEventTriggerCapabilityService")

logEventStore := NewCapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]()

s := &TriggerService{
lggr: l,
triggers: logEventStore,
relayer: relayer,
logEventConfig: logEventConfig,
stopCh: make(services.StopChan),
}
var err error
s.CapabilityInfo, err = s.Info(ctx)
if err != nil {
return s, err
}
s.Validator = capabilities.NewValidator[RequestConfig, Input, capabilities.TriggerResponse](capabilities.ValidatorArgs{Info: s.CapabilityInfo})
return s, nil
}

func (s *TriggerService) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
return capabilities.NewCapabilityInfo(
s.logEventConfig.Version(ID),
capabilities.CapabilityTypeTrigger,
"A trigger that listens for specific contract log events and starts a workflow run.",
)
}

// Register a new trigger
// Can register triggers before the service is actively scheduling
func (s *TriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
if req.Config == nil {
return nil, errors.New("config is required to register a log event trigger")
}
reqConfig, err := s.ValidateConfig(req.Config)
if err != nil {
return nil, err
}
// Add log event trigger with Contract details to CapabilitiesStore
var respCh chan capabilities.TriggerResponse
ok := s.IfNotStopped(func() {
respCh, err = s.triggers.InsertIfNotExists(req.TriggerID, func() (*logEventTrigger, chan capabilities.TriggerResponse, error) {
l, ch, tErr := newLogEventTrigger(ctx, s.lggr, req.Metadata.WorkflowID, reqConfig, s.logEventConfig, s.relayer)
if tErr != nil {
return l, ch, tErr
}
tErr = l.Start(ctx)
return l, ch, tErr
})
})
if !ok {
return nil, fmt.Errorf("cannot create new trigger since LogEventTriggerService has been stopped")
}
if err != nil {
return nil, fmt.Errorf("create new trigger failed %w", err)
}
s.lggr.Infow("RegisterTrigger", "triggerId", req.TriggerID, "WorkflowID", req.Metadata.WorkflowID)
return respCh, nil
}

func (s *TriggerService) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error {
trigger, ok := s.triggers.Read(req.TriggerID)
if !ok {
return fmt.Errorf("triggerId %s not found", req.TriggerID)
}
// Close callback channel and stop log event trigger listener
err := trigger.Close()
if err != nil {
return fmt.Errorf("error closing trigger %s (chainID %s): %w", req.TriggerID, s.logEventConfig.ChainID, err)
}
// Remove from triggers context
s.triggers.Delete(req.TriggerID)
s.lggr.Infow("UnregisterTrigger", "triggerId", req.TriggerID, "WorkflowID", req.Metadata.WorkflowID)
return nil
}

// Start the service.
func (s *TriggerService) Start(ctx context.Context) error {
return s.StartOnce("LogEventTriggerCapabilityService", func() error {
s.lggr.Info("Starting LogEventTriggerCapabilityService")
return nil
})
}

// Close stops the Service.
// After this call the Service cannot be started again,
// The service will need to be re-built to start scheduling again.
func (s *TriggerService) Close() error {
return s.StopOnce("LogEventTriggerCapabilityService", func() error {
s.lggr.Infow("Stopping LogEventTriggerCapabilityService")
triggers := s.triggers.ReadAll()
return services.MultiCloser(triggers).Close()
})
}

func (s *TriggerService) HealthReport() map[string]error {
return map[string]error{s.Name(): s.Healthy()}
}

func (s *TriggerService) Name() string {
return s.lggr.Name()
}
82 changes: 82 additions & 0 deletions core/capabilities/triggers/logevent/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package logevent

import (
"fmt"
"sync"
)

type RegisterCapabilityFn[T any, Resp any] func() (*T, chan Resp, error)

// Interface of the capabilities store
type CapabilitiesStore[T any, Resp any] interface {
ettec marked this conversation as resolved.
Show resolved Hide resolved
Read(capabilityID string) (value *T, ok bool)
ReadAll() (values []*T)
Write(capabilityID string, value *T)
InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error)
Delete(capabilityID string)
}

// Implementation for the CapabilitiesStore interface
type capabilitiesStore[T any, Resp any] struct {
mu sync.RWMutex
capabilities map[string]*T
}

var _ CapabilitiesStore[string, string] = (CapabilitiesStore[string, string])(nil)

// Constructor for capabilitiesStore struct implementing CapabilitiesStore interface
func NewCapabilitiesStore[T any, Resp any]() CapabilitiesStore[T, Resp] {
return &capabilitiesStore[T, Resp]{
capabilities: map[string]*T{},
}
}

func (cs *capabilitiesStore[T, Resp]) Read(capabilityID string) (value *T, ok bool) {
cs.mu.RLock()
defer cs.mu.RUnlock()
trigger, ok := cs.capabilities[capabilityID]
return trigger, ok
}

func (cs *capabilitiesStore[T, Resp]) ReadAll() (values []*T) {
cs.mu.RLock()
defer cs.mu.RUnlock()
vals := make([]*T, 0)
for _, v := range cs.capabilities {
vals = append(vals, v)
}
return vals
}

func (cs *capabilitiesStore[T, Resp]) Write(capabilityID string, value *T) {
cs.mu.Lock()
defer cs.mu.Unlock()
cs.capabilities[capabilityID] = value
}

func (cs *capabilitiesStore[T, Resp]) InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error) {
cs.mu.RLock()
_, ok := cs.capabilities[capabilityID]
cs.mu.RUnlock()
if ok {
return nil, fmt.Errorf("capabilityID %v already exists", capabilityID)
}
cs.mu.Lock()
defer cs.mu.Unlock()
_, ok = cs.capabilities[capabilityID]
if ok {
return nil, fmt.Errorf("capabilityID %v already exists", capabilityID)
}
value, respCh, err := fn()
if err != nil {
return nil, fmt.Errorf("error registering capability: %v", err)
}
cs.capabilities[capabilityID] = value
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
return respCh, nil
}

func (cs *capabilitiesStore[T, Resp]) Delete(capabilityID string) {
cs.mu.Lock()
defer cs.mu.Unlock()
delete(cs.capabilities, capabilityID)
}
Loading
Loading