Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web API Trigger capability and handler #14580

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions core/capabilities/webapi/trigger.go

This file was deleted.

299 changes: 299 additions & 0 deletions core/capabilities/webapi/trigger/trigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
package webapi
justinkaseman marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"

ethCommon "github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webapicapabilities"
)

const defaultSendChannelBufferSize = 1000

var webapiTriggerInfo = capabilities.MustNewCapabilityInfo(
webapicapabilities.TriggerType,
capabilities.CapabilityTypeTrigger,
"A trigger to start workflow execution from a web api call",
)

type Input struct {
}
type TriggerConfig struct {
AllowedSenders []string `toml:"allowedSenders"`
AllowedTopics []string `toml:"allowedTopics"`
// RateLimiter common.RateLimiterConfig `toml:"rateLimiter"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stray commented out line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to show what it can't be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow. What do you mean what it can't be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It had been unacceptable, but with the float fixes it now works. Good callout.

Copy link
Contributor

@justinkaseman justinkaseman Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh I see, there's a TODO here. That's what it should be

RateLimiter *values.Map `toml:"rateLimiter"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that the float support was merged in for values library. Does this need to be *values.Map still? or can we just use common.RateLimiterConfig here and have ValidateConfig unwrap to the struct directly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that now works! Thx for the callout.

RequiredParams []string `toml:"requiredParams"`
justinkaseman marked this conversation as resolved.
Show resolved Hide resolved
}

type webapiTrigger struct {
allowedSendersMap map[string]bool
justinkaseman marked this conversation as resolved.
Show resolved Hide resolved
allowedTopicsMap map[string]bool
justinkaseman marked this conversation as resolved.
Show resolved Hide resolved
ch chan<- capabilities.TriggerResponse
config TriggerConfig
rateLimiter *common.RateLimiter
}

type triggerConnectorHandler struct {
services.StateMachine

capabilities.CapabilityInfo
capabilities.Validator[TriggerConfig, Input, capabilities.TriggerResponse]
connector connector.GatewayConnector
lggr logger.Logger
mu sync.Mutex
registeredWorkflows map[string]webapiTrigger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed these from my changes because workflow guarantees only registered workflows are executed.

Copy link
Contributor

@justinkaseman justinkaseman Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Triggers will be different from the target, because on registering a workflow it makes a new channel, then on trigger messages it sends a message down the channel. Plus there is other state to persist by trigger.

}

var _ capabilities.TriggerCapability = (*triggerConnectorHandler)(nil)
var _ services.Service = &triggerConnectorHandler{}

func NewTrigger(config string, registry core.CapabilitiesRegistry, connector connector.GatewayConnector, lggr logger.Logger) (*triggerConnectorHandler, error) {
if connector == nil {
return nil, errors.New("missing connector")
}
handler := &triggerConnectorHandler{
Validator: capabilities.NewValidator[TriggerConfig, Input, capabilities.TriggerResponse](capabilities.ValidatorArgs{Info: webapiTriggerInfo}),
connector: connector,
registeredWorkflows: map[string]webapiTrigger{},
lggr: lggr.Named("WorkflowConnectorHandler"),
}

return handler, nil
}

// Iterate over each topic, checking against senders and rateLimits, then starting event processing and responding
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go doc convention is to start comments with function name

Suggested change
// Iterate over each topic, checking against senders and rateLimits, then starting event processing and responding
// processTrigger iterates over each topic, checking against senders and rateLimits, then starting event processing and responding

func (h *triggerConnectorHandler) processTrigger(ctx context.Context, gatewayID string, body *api.MessageBody, sender ethCommon.Address, payload webapicapabilities.TriggerRequestPayload) {
// Pass on the payload with the expectation that that is acceptable format for the executor
justinkaseman marked this conversation as resolved.
Show resolved Hide resolved
wrappedPayload, _ := values.WrapMap(payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log en error if it happens?

justinkaseman marked this conversation as resolved.
Show resolved Hide resolved
hasMatchedAWorkflow := false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random idea: what about instead keeping a count of matched workflows. May be an interesting metric to emit.

var response webapicapabilities.TriggerResponsePayload

for _, trigger := range h.registeredWorkflows {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very computationally heavy to iterate O(n) through every registered workflow of a node.
Something feels off here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a map of trigger topic to trigger, grab all the matches, and then dedup the registeredTrigger to ensure only 1 invoke? What troubles me though is the potential for n:m of topic to trigger. Imagine trigger A says it's interested in topics foo and bar. trigger B says it's interested in bar and bat. There'd need to be a map of foo, bar, and bat, with bar pointing to A and B. If a message said it's for topics foo, bar, bat and/or has empty topic, need to get just 1 of A & B

Copy link
Contributor

@justinkaseman justinkaseman Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking more so about keying off of the sender.

Roughly sketched -- two data structures:

	registeredWorkflows map[string]webapiTrigger // as you have it now
        senderToTriggerIds map[string][]string // map[address][]triggerId

topics := payload.Topics
// empty topics means all topics
if len(topics) == 0 {
topics = []string{}
justinkaseman marked this conversation as resolved.
Show resolved Hide resolved
for k := range trigger.allowedTopicsMap {
topics = append(topics, k)
}
}

for _, topic := range topics {

if trigger.allowedTopicsMap[topic] {
hasMatchedAWorkflow = true

if !trigger.allowedSendersMap[sender.String()] {
h.lggr.Errorw("Unauthorized Sender")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
h.lggr.Errorw("Unauthorized Sender")
h.lggr.Errorw("Unauthorized Sender", "sender", sender.String(), "messageID", body.messageId())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, include as much info as available for debugging

h.sendResponse(ctx, gatewayID, body, webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: "Unauthorized Sender"})
return
}
if !trigger.rateLimiter.Allow(body.Sender) {
h.lggr.Errorw("request rate-limited")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
h.lggr.Errorw("request rate-limited")
h.lggr.Errorw("request rate-limited", "Unauthorized Sender", "sender", sender.String(), "messageID", body.messageId())

h.sendResponse(ctx, gatewayID, body, webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: "request rate-limited"})
return
}

TriggerEventID := body.Sender + payload.TriggerEventID
tr := capabilities.TriggerResponse{
Event: capabilities.TriggerEvent{
TriggerType: webapicapabilities.TriggerType,
ID: TriggerEventID,
Outputs: wrappedPayload,
},
}

trigger.ch <- tr
response = webapicapabilities.TriggerResponsePayload{Status: "ACCEPTED"}
// Sending n topics that match a workflow with n allowedTopics, can only be triggered once.
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not block if context is cancelled?

Suggested change
trigger.ch <- tr
response = webapicapabilities.TriggerResponsePayload{Status: "ACCEPTED"}
// Sending n topics that match a workflow with n allowedTopics, can only be triggered once.
break
select {
case <-ctx.Done() :
return
case trigger.ch <- tr:
response = webapicapabilities.TriggerResponsePayload{Status: "ACCEPTED"}
// Sending n topics that match a workflow with n allowedTopics, can only be triggered once.
break
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this, and it broke the last few tests. It seems that the channel is getting closed partway through the test. Is the context really short lived?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a solution. From https://purpleidea.com/blog/2023/02/24/deadline-context-test-cancellation-in-golang/ I found I can do this ctx, _ = context.WithDeadline(ctx, time.Now().Add(10*time.Second))

}
}
}
if !hasMatchedAWorkflow {
h.lggr.Errorw("No Matching Workflow Topics")
response = webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: "No Matching Workflow Topics"}
}
_ = h.sendResponse(ctx, gatewayID, body, response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log error?


}

// https://gateway-us-1.chain.link/web-trigger
// {
justinkaseman marked this conversation as resolved.
Show resolved Hide resolved
// jsonrpc: "2.0",
// id: "...",
// method: "web-trigger",
// params: {
// signature: "...",
// body: {
// don_id: "workflow_123",
// payload: {
// trigger_id: "[email protected]",
// trigger_event_id: "action_1234567890",
// timestamp: 1234567890,
// topics: ["daily_price_update"],
// params: {
// bid: "101",
// ask: "102"
// }
// }
// }
// }
// }

// from Web API Trigger Doc
justinkaseman marked this conversation as resolved.
Show resolved Hide resolved
// trigger_id - ID of the trigger corresponding to the capability ID
// trigger_event_id - uniquely identifies generated event (scoped to trigger_id and sender)
// timestamp - timestamp of the event (unix time), needs to be within certain freshness to be processed
// topics - [OPTIONAL] list of topics (strings) to be started by this event (affects all topics if empty)
// workflow_owners - [OPTIONAL] list of workflow owners allowed to receive this event (affects all workflows if empty)
// params - key-value pairs that will be used as trigger output in the workflow Engine (translated to values.Map)

func (h *triggerConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
// TODO: Validate Signature
body := &msg.Body
sender := ethCommon.HexToAddress(body.Sender)
var payload webapicapabilities.TriggerRequestPayload
err := json.Unmarshal(body.Payload, &payload)
if err != nil {
h.lggr.Errorw("error decoding payload", "err", err)
h.sendResponse(ctx, gatewayID, body, webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s decoding payload", err.Error()).Error()})
return
}

switch body.Method {
case webapicapabilities.MethodWebAPITrigger:
h.processTrigger(ctx, gatewayID, body, sender, payload)

default:
h.lggr.Errorw("unsupported method", "id", gatewayID, "method", body.Method)
h.sendResponse(ctx, gatewayID, body, webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("unsupported method %s", body.Method).Error()})
}
}

func (h *triggerConnectorHandler) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
cfg := req.Config
if cfg == nil {
h.lggr.Errorw("config is required to register a web api trigger")
return nil, errors.New("config is required to register a web api trigger")
}

reqConfig, err := h.ValidateConfig(cfg)
if err != nil {
h.lggr.Errorw("error unwrapping config", "err", err)
return nil, err
}

if len(reqConfig.AllowedSenders) == 0 {
h.lggr.Errorw("allowedSenders must have at least 1 entry")
return nil, errors.New("allowedSenders must have at least 1 entry")
}

h.mu.Lock()
defer h.mu.Unlock()
_, errBool := h.registeredWorkflows[req.TriggerID]
if errBool {
h.lggr.Errorf("triggerId %s already registered", req.TriggerID)
return nil, fmt.Errorf("triggerId %s already registered", req.TriggerID)
}

var rateLimiterCfg common.RateLimiterConfig
err = reqConfig.RateLimiter.UnwrapTo(&rateLimiterCfg)
if err != nil {
h.lggr.Errorw("error creating unwrapping RateLimiter", "err", err, "RateLimiter config", reqConfig.RateLimiter)
return nil, err
}

rateLimiter, err := common.NewRateLimiter(rateLimiterCfg)
if err != nil {
h.lggr.Errorw("error creating RateLimiter", "err", err, "RateLimiter config", reqConfig.RateLimiter)
return nil, err
}

allowedSendersMap := map[string]bool{}
for _, k := range reqConfig.AllowedSenders {
allowedSendersMap[k] = true
}

allowedTopicsMap := map[string]bool{}
for _, k := range reqConfig.AllowedTopics {
allowedTopicsMap[k] = true
}

ch := make(chan capabilities.TriggerResponse, defaultSendChannelBufferSize)

h.registeredWorkflows[req.TriggerID] = webapiTrigger{
allowedTopicsMap: allowedTopicsMap,
allowedSendersMap: allowedSendersMap,
ch: ch,
config: *reqConfig,
rateLimiter: rateLimiter,
}

return ch, nil
}

func (h *triggerConnectorHandler) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error {
h.mu.Lock()
defer h.mu.Unlock()
workflow, ok := h.registeredWorkflows[req.TriggerID]
if !ok {
return fmt.Errorf("triggerId %s not registered", req.TriggerID)
}

close(workflow.ch)
delete(h.registeredWorkflows, req.TriggerID)
return nil
}

func (h *triggerConnectorHandler) Start(ctx context.Context) error {
return h.StartOnce("GatewayConnectorServiceWrapper", func() error {
return h.connector.AddHandler([]string{"web_trigger"}, h)
})
}
func (h *triggerConnectorHandler) Close() error {
return h.StopOnce("GatewayConnectorServiceWrapper", func() error {
return nil
})
}

func (h *triggerConnectorHandler) HealthReport() map[string]error {
return map[string]error{h.Name(): h.Healthy()}
}

func (h *triggerConnectorHandler) Name() string {
return "WebAPITrigger"
}

func (h *triggerConnectorHandler) sendResponse(ctx context.Context, gatewayID string, requestBody *api.MessageBody, payload any) error {
payloadJSON, err := json.Marshal(payload)
if err != nil {
h.lggr.Errorw("error marshalling payload", "err", err)
payloadJSON, _ = json.Marshal(webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s marshalling payload", err.Error()).Error()})
}

msg := &api.Message{
Body: api.MessageBody{
MessageId: requestBody.MessageId,
DonId: requestBody.DonId,
Method: requestBody.Method,
Receiver: requestBody.Sender,
Payload: payloadJSON,
},
}

return h.connector.SendToGateway(ctx, gatewayID, msg)
}
Loading
Loading