Skip to content

Commit

Permalink
[CAPPL-6] Formalize trigger API (#14145)
Browse files Browse the repository at this point in the history
* [CAPPL-6-formalize-trigger-API

* Fix tests

* WIP

* WIP

* Update common

* Fix tests

* Address feedback

* Update common

* Update common

* Fix test
  • Loading branch information
cedric-cordenier authored Aug 28, 2024
1 parent 710d48e commit 567ce22
Show file tree
Hide file tree
Showing 23 changed files with 201 additions and 229 deletions.
5 changes: 5 additions & 0 deletions .changeset/wise-snakes-protect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Formalize trigger API #internal
48 changes: 21 additions & 27 deletions core/capabilities/integration_tests/mock_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package integration_tests

import (
"context"
"fmt"
"strconv"
"sync"
"testing"

Expand Down Expand Up @@ -45,7 +43,7 @@ func (r *reportsSink) Close() error {

func (r *reportsSink) sendReports(reportList []*datastreams.FeedReport) {
for _, trigger := range r.triggers {
resp, err := wrapReports(reportList, "1", 12, datastreams.SignersMetadata{})
resp, err := wrapReports(reportList, "1", 12, datastreams.Metadata{})
if err != nil {
panic(err)
}
Expand All @@ -54,7 +52,7 @@ func (r *reportsSink) sendReports(reportList []*datastreams.FeedReport) {
}

func (r *reportsSink) getNewTrigger(t *testing.T) *streamsTrigger {
trigger := streamsTrigger{t: t, toSend: make(chan capabilities.CapabilityResponse, 1000),
trigger := streamsTrigger{t: t, toSend: make(chan capabilities.TriggerResponse, 1000),
wg: &r.wg, stopCh: r.stopCh}
r.triggers = append(r.triggers, trigger)
return &trigger
Expand All @@ -63,13 +61,13 @@ func (r *reportsSink) getNewTrigger(t *testing.T) *streamsTrigger {
type streamsTrigger struct {
t *testing.T
cancel context.CancelFunc
toSend chan capabilities.CapabilityResponse
toSend chan capabilities.TriggerResponse

wg *sync.WaitGroup
stopCh services.StopChan
}

func (s *streamsTrigger) sendResponse(resp capabilities.CapabilityResponse) {
func (s *streamsTrigger) sendResponse(resp capabilities.TriggerResponse) {
s.toSend <- resp
}

Expand All @@ -81,12 +79,12 @@ func (s *streamsTrigger) Info(ctx context.Context) (capabilities.CapabilityInfo,
), nil
}

func (s *streamsTrigger) RegisterTrigger(ctx context.Context, request capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
func (s *streamsTrigger) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
if s.cancel != nil {
s.t.Fatal("trigger already registered")
}

responseCh := make(chan capabilities.CapabilityResponse)
responseCh := make(chan capabilities.TriggerResponse)

ctxWithCancel, cancel := context.WithCancel(context.Background())
s.cancel = cancel
Expand All @@ -108,7 +106,7 @@ func (s *streamsTrigger) RegisterTrigger(ctx context.Context, request capabiliti
return responseCh, nil
}

func (s *streamsTrigger) UnregisterTrigger(ctx context.Context, request capabilities.CapabilityRequest) error {
func (s *streamsTrigger) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
if s.cancel == nil {
s.t.Fatal("trigger not registered")
}
Expand All @@ -118,32 +116,28 @@ func (s *streamsTrigger) UnregisterTrigger(ctx context.Context, request capabili
return nil
}

func wrapReports(reportList []*datastreams.FeedReport, eventID string, timestamp int64, meta datastreams.SignersMetadata) (capabilities.CapabilityResponse, error) {
val, err := values.Wrap(reportList)
if err != nil {
return capabilities.CapabilityResponse{}, err
func wrapReports(reportList []*datastreams.FeedReport, eventID string, timestamp int64, meta datastreams.Metadata) (capabilities.TriggerResponse, error) {
rl := []datastreams.FeedReport{}
for _, r := range reportList {
rl = append(rl, *r)
}

metaVal, err := values.Wrap(meta)
outputs, err := values.WrapMap(datastreams.StreamsTriggerEvent{
Payload: rl,
Metadata: meta,
Timestamp: timestamp,
})
if err != nil {
return capabilities.CapabilityResponse{}, err
return capabilities.TriggerResponse{}, err
}

triggerEvent := capabilities.TriggerEvent{
TriggerType: triggerID,
ID: eventID,
Timestamp: strconv.FormatInt(timestamp, 10),
Metadata: metaVal,
Payload: val,
}

triggerEventMapValue, err := values.WrapMap(triggerEvent)
if err != nil {
return capabilities.CapabilityResponse{}, fmt.Errorf("failed to wrap trigger event: %w", err)
Outputs: outputs,
}

// Create a new CapabilityResponse with the MercuryTriggerEvent
return capabilities.CapabilityResponse{
Value: triggerEventMapValue,
// Create a new TriggerResponse with the MercuryTriggerEvent
return capabilities.TriggerResponse{
Event: triggerEvent,
}, nil
}
4 changes: 2 additions & 2 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ type mockTrigger struct {
capabilities.CapabilityInfo
}

func (m *mockTrigger) RegisterTrigger(ctx context.Context, request capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
func (m *mockTrigger) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
return nil, nil
}

func (m *mockTrigger) UnregisterTrigger(ctx context.Context, request capabilities.CapabilityRequest) error {
func (m *mockTrigger) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
return nil
}

Expand Down
21 changes: 8 additions & 13 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type registrationKey struct {
}

type pubRegState struct {
callback <-chan commoncap.CapabilityResponse
request commoncap.CapabilityRequest
callback <-chan commoncap.TriggerResponse
request commoncap.TriggerRegistrationRequest
}

var _ types.Receiver = &triggerPublisher{}
Expand Down Expand Up @@ -94,9 +94,9 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
}

if msg.Method == types.MethodRegisterTrigger {
req, err := pb.UnmarshalCapabilityRequest(msg.Payload)
req, err := pb.UnmarshalTriggerRegistrationRequest(msg.Payload)
if err != nil {
p.lggr.Errorw("failed to unmarshal capability request", "capabilityId", p.capInfo.ID, "err", err)
p.lggr.Errorw("failed to unmarshal trigger registration request", "capabilityId", p.capInfo.ID, "err", err)
return
}
callerDon, ok := p.workflowDONs[msg.CallerDonId]
Expand Down Expand Up @@ -135,7 +135,7 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
p.lggr.Errorw("failed to aggregate trigger registrations", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err)
return
}
unmarshaled, err := pb.UnmarshalCapabilityRequest(aggregated)
unmarshaled, err := pb.UnmarshalTriggerRegistrationRequest(aggregated)
if err != nil {
p.lggr.Errorw("failed to unmarshal request", "capabilityId", p.capInfo.ID, "err", err)
return
Expand Down Expand Up @@ -189,7 +189,7 @@ func (p *triggerPublisher) registrationCleanupLoop() {
}
}

func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.CapabilityResponse, key registrationKey) {
func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.TriggerResponse, key registrationKey) {
defer p.wg.Done()
for {
select {
Expand All @@ -200,14 +200,9 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.Capabili
p.lggr.Infow("triggerEventLoop channel closed", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId)
return
}
triggerEvent := capabilities.TriggerEvent{}
err := response.Value.UnwrapTo(&triggerEvent)
if err != nil {
p.lggr.Errorw("can't unwrap trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId, "err", err)
break
}
triggerEvent := response.Event
p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId, "triggerEventID", triggerEvent.ID)
marshaled, err := pb.MarshalCapabilityResponse(response)
marshaled, err := pb.MarshalTriggerResponse(response)
if err != nil {
p.lggr.Debugw("can't marshal trigger event", "err", err)
break
Expand Down
14 changes: 7 additions & 7 deletions core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ func TestTriggerPublisher_Register(t *testing.T) {
}
underlying := &testTrigger{
info: capInfo,
registrationsCh: make(chan commoncap.CapabilityRequest, 2),
registrationsCh: make(chan commoncap.TriggerRegistrationRequest, 2),
}
publisher := remote.NewTriggerPublisher(config, underlying, capInfo, capDonInfo, workflowDONs, dispatcher, lggr)
require.NoError(t, publisher.Start(ctx))

// trigger registration event
capRequest := commoncap.CapabilityRequest{
triggerRequest := commoncap.TriggerRegistrationRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: workflowID1,
},
}
marshaled, err := pb.MarshalCapabilityRequest(capRequest)
marshaled, err := pb.MarshalTriggerRegistrationRequest(triggerRequest)
require.NoError(t, err)
regEvent := &remotetypes.MessageBody{
Sender: p1[:],
Expand All @@ -79,25 +79,25 @@ func TestTriggerPublisher_Register(t *testing.T) {
publisher.Receive(ctx, regEvent)
require.NotEmpty(t, underlying.registrationsCh)
forwarded := <-underlying.registrationsCh
require.Equal(t, capRequest.Metadata.WorkflowID, forwarded.Metadata.WorkflowID)
require.Equal(t, triggerRequest.Metadata.WorkflowID, forwarded.Metadata.WorkflowID)

require.NoError(t, publisher.Close())
}

type testTrigger struct {
info commoncap.CapabilityInfo
registrationsCh chan commoncap.CapabilityRequest
registrationsCh chan commoncap.TriggerRegistrationRequest
}

func (t *testTrigger) Info(_ context.Context) (commoncap.CapabilityInfo, error) {
return t.info, nil
}

func (t *testTrigger) RegisterTrigger(_ context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
func (t *testTrigger) RegisterTrigger(_ context.Context, request commoncap.TriggerRegistrationRequest) (<-chan commoncap.TriggerResponse, error) {
t.registrationsCh <- request
return nil, nil
}

func (t *testTrigger) UnregisterTrigger(_ context.Context, request commoncap.CapabilityRequest) error {
func (t *testTrigger) UnregisterTrigger(_ context.Context, request commoncap.TriggerRegistrationRequest) error {
return nil
}
10 changes: 5 additions & 5 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type triggerEventKey struct {
}

type subRegState struct {
callback chan commoncap.CapabilityResponse
callback chan commoncap.TriggerResponse
rawRequest []byte
}

Expand Down Expand Up @@ -98,8 +98,8 @@ func (s *triggerSubscriber) Info(ctx context.Context) (commoncap.CapabilityInfo,
return s.capInfo, nil
}

func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
rawRequest, err := pb.MarshalCapabilityRequest(request)
func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commoncap.TriggerRegistrationRequest) (<-chan commoncap.TriggerResponse, error) {
rawRequest, err := pb.MarshalTriggerRegistrationRequest(request)
if err != nil {
return nil, err
}
Expand All @@ -113,7 +113,7 @@ func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commonc
regState, ok := s.registeredWorkflows[request.Metadata.WorkflowID]
if !ok {
regState = &subRegState{
callback: make(chan commoncap.CapabilityResponse, defaultSendChannelBufferSize),
callback: make(chan commoncap.TriggerResponse, defaultSendChannelBufferSize),
rawRequest: rawRequest,
}
s.registeredWorkflows[request.Metadata.WorkflowID] = regState
Expand Down Expand Up @@ -160,7 +160,7 @@ func (s *triggerSubscriber) registrationLoop() {
}
}

func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commoncap.CapabilityRequest) error {
func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commoncap.TriggerRegistrationRequest) error {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down
14 changes: 8 additions & 6 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr)
require.NoError(t, subscriber.Start(ctx))

req := commoncap.CapabilityRequest{
req := commoncap.TriggerRegistrationRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: workflowID1,
},
Expand All @@ -83,11 +83,13 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
// receive trigger event
triggerEventValue, err := values.NewMap(triggerEvent1)
require.NoError(t, err)
capResponse := commoncap.CapabilityResponse{
Value: triggerEventValue,
Err: nil,
capResponse := commoncap.TriggerResponse{
Event: commoncap.TriggerEvent{
Outputs: triggerEventValue,
},
Err: nil,
}
marshaled, err := pb.MarshalCapabilityResponse(capResponse)
marshaled, err := pb.MarshalTriggerResponse(capResponse)
require.NoError(t, err)
triggerEvent := &remotetypes.MessageBody{
Sender: p1[:],
Expand All @@ -101,7 +103,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
}
subscriber.Receive(ctx, triggerEvent)
response := <-triggerEventCallbackCh
require.Equal(t, response.Value, triggerEventValue)
require.Equal(t, response.Event.Outputs, triggerEventValue)

require.NoError(t, subscriber.UnregisterTrigger(ctx, req))
require.NoError(t, subscriber.UnregisterTrigger(ctx, req))
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Receiver interface {
}

type Aggregator interface {
Aggregate(eventID string, responses [][]byte) (commoncap.CapabilityResponse, error)
Aggregate(eventID string, responses [][]byte) (commoncap.TriggerResponse, error)
}

// NOTE: this type will become part of the Registry (KS-108)
Expand Down
8 changes: 4 additions & 4 deletions core/capabilities/remote/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ func NewDefaultModeAggregator(minIdenticalResponses uint32) *defaultModeAggregat
}
}

func (a *defaultModeAggregator) Aggregate(_ string, responses [][]byte) (commoncap.CapabilityResponse, error) {
func (a *defaultModeAggregator) Aggregate(_ string, responses [][]byte) (commoncap.TriggerResponse, error) {
found, err := AggregateModeRaw(responses, a.minIdenticalResponses)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to aggregate responses, err: %w", err)
return commoncap.TriggerResponse{}, fmt.Errorf("failed to aggregate responses, err: %w", err)
}

unmarshaled, err := pb.UnmarshalCapabilityResponse(found)
unmarshaled, err := pb.UnmarshalTriggerResponse(found)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to unmarshal aggregated responses, err: %w", err)
return commoncap.TriggerResponse{}, fmt.Errorf("failed to unmarshal aggregated responses, err: %w", err)
}
return unmarshaled, nil
}
Expand Down
20 changes: 12 additions & 8 deletions core/capabilities/remote/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,24 @@ func TestToPeerID(t *testing.T) {
func TestDefaultModeAggregator_Aggregate(t *testing.T) {
val, err := values.NewMap(triggerEvent1)
require.NoError(t, err)
capResponse1 := commoncap.CapabilityResponse{
Value: val,
Err: nil,
capResponse1 := commoncap.TriggerResponse{
Event: commoncap.TriggerEvent{
Outputs: val,
},
Err: nil,
}
marshaled1, err := pb.MarshalCapabilityResponse(capResponse1)
marshaled1, err := pb.MarshalTriggerResponse(capResponse1)
require.NoError(t, err)

val2, err := values.NewMap(triggerEvent2)
require.NoError(t, err)
capResponse2 := commoncap.CapabilityResponse{
Value: val2,
Err: nil,
capResponse2 := commoncap.TriggerResponse{
Event: commoncap.TriggerEvent{
Outputs: val2,
},
Err: nil,
}
marshaled2, err := pb.MarshalCapabilityResponse(capResponse2)
marshaled2, err := pb.MarshalTriggerResponse(capResponse2)
require.NoError(t, err)

agg := remote.NewDefaultModeAggregator(2)
Expand Down
6 changes: 4 additions & 2 deletions core/capabilities/streams/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type codec struct {
var _ datastreams.ReportCodec = &codec{}

func (c *codec) Unwrap(wrapped values.Value) ([]datastreams.FeedReport, error) {
dest, err := datastreams.UnwrapFeedReportList(wrapped)
dest, err := datastreams.UnwrapStreamsTriggerEventToFeedReportList(wrapped)
if err != nil {
return nil, fmt.Errorf("failed to unwrap: %v", err)
}
Expand All @@ -45,7 +45,9 @@ func (c *codec) Unwrap(wrapped values.Value) ([]datastreams.FeedReport, error) {
}

func (c *codec) Wrap(reports []datastreams.FeedReport) (values.Value, error) {
return values.Wrap(reports)
return values.Wrap(&datastreams.StreamsTriggerEvent{
Payload: reports,
})
}

func (c *codec) Validate(report datastreams.FeedReport, allowedSigners [][]byte, minRequiredSignatures int) error {
Expand Down
Loading

0 comments on commit 567ce22

Please sign in to comment.