Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web API Trigger capability and handler #14580

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 271 additions & 10 deletions core/capabilities/webapi/trigger.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,279 @@
package webapi
package trigger

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"

ethCommon "github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webapicapabilities"
)

func NewTrigger(config string, registry core.CapabilitiesRegistry, connector connector.GatewayConnector, lggr logger.Logger) (job.ServiceCtx, error) {
// TODO (CAPPL-22, CAPPL-24):
// - decode config
// - create an implementation of the capability API and add it to the Registry
// - create a handler and register it with Gateway Connector
// - manage trigger subscriptions
// - process incoming trigger events and related metadata
return nil, nil
const defaultSendChannelBufferSize = 1000

const TriggerType = "[email protected]"

var webapiTriggerInfo = capabilities.MustNewCapabilityInfo(
TriggerType,
capabilities.CapabilityTypeTrigger,
"A trigger to start workflow execution from a web api call",
)

type Input struct {
}
type Config struct {
AllowedSenders []string `toml:"allowedSenders"`
AllowedTopics []string `toml:"allowedTopics"`
RateLimiter common.RateLimiterConfig `toml:"rateLimiter"`
// RequiredParams is advisory to the web trigger message sender it is not enforced.
RequiredParams []string `toml:"requiredParams"`
}

type webapiTrigger struct {
allowedSenders map[string]bool
allowedTopics map[string]bool
ch chan<- capabilities.TriggerResponse
config Config
rateLimiter *common.RateLimiter
}

type triggerConnectorHandler struct {
services.StateMachine

capabilities.CapabilityInfo
capabilities.Validator[Config, Input, capabilities.TriggerResponse]
connector connector.GatewayConnector
lggr logger.Logger
mu sync.Mutex
registeredWorkflows map[string]webapiTrigger
}

var _ capabilities.TriggerCapability = (*triggerConnectorHandler)(nil)
var _ services.Service = &triggerConnectorHandler{}

func NewTrigger(config string, registry core.CapabilitiesRegistry, connector connector.GatewayConnector, lggr logger.Logger) (*triggerConnectorHandler, error) {
if connector == nil {
return nil, errors.New("missing connector")
}
handler := &triggerConnectorHandler{
Validator: capabilities.NewValidator[Config, Input, capabilities.TriggerResponse](capabilities.ValidatorArgs{Info: webapiTriggerInfo}),
connector: connector,
registeredWorkflows: map[string]webapiTrigger{},
lggr: lggr.Named("WorkflowConnectorHandler"),
}

return handler, nil
}

// processTrigger iterates over each topic, checking against senders and rateLimits, then starting event processing and responding
func (h *triggerConnectorHandler) processTrigger(ctx context.Context, gatewayID string, body *api.MessageBody, sender ethCommon.Address, payload webapicapabilities.TriggerRequestPayload) error {
// Pass on the payload with the expectation that it's in an acceptable format for the executor
wrappedPayload, err := values.WrapMap(payload)
if err != nil {
return fmt.Errorf("error wrapping payload %s", err)
}
topics := payload.Topics

// empty topics is error for V1
if len(topics) == 0 {
return fmt.Errorf("empty Workflow Topics")
}

// workflows that have matched topics
matchedWorkflows := 0
// workflows that have matched topic and passed all checks
fullyMatchedWorkflows := 0
for _, trigger := range h.registeredWorkflows {
for _, topic := range topics {
Copy link
Contributor

@justinkaseman justinkaseman Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump on #14580 (comment)

With the current code when multiple workflows with different allowed senders use the same topics they will each be blocked. This seems very off usability-wise. If that's the intention, maybe we should guard against this in register workflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about that. Imagine workflow A with topic T and AllowedSender S, workflow B with Topic T and AllowedSender S'. There will be 2 triggers and they will have different allowedSenders, so I think the behavior codified is what we want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine two workflows for different customers:

Workflow W1: for customer C1

- allowed senders: 0x1
- topics: "forex"

Workflow W2: for customer C2

- allowed senders: 0x2 
- topics: "forex"

User 0x2 sends in a request R1:

sender: 0x2
topic: "forex"

User 0x2 wants to trigger W2
Line 94 starts iterating every workflow, so starts on W1
Line 95 starts with the first topic on W1: "forex"
Line 96 passes because the topic of R1 is "forex"
Line 99 errors because the allowed sender of W1 is 0x1
User 0x2 has to supply a topic, but when they supply the topic "forex" this will fail.

Same ends up happening for W1 and user 0x1. The first iteration works as expected, but then the second matches on W2, sees that 0x1 is not an allowed sender, and errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I see. I think we need a temporary error and keep proceeding.

Copy link
Contributor

@justinkaseman justinkaseman Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumping #14580 (comment)
I still don't like the time complexity here from iterating over every workflow and its topics on a node.

I'd prefer to see a unit test showing load testing for benchmarking, but won't block the PR over it. Could we have a ticket tracking optimizing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to do a ticket. I'd love to learn how we can do load testing in unit tests.

if trigger.allowedTopics[topic] {
matchedWorkflows++
if !trigger.allowedSenders[sender.String()] {
err = fmt.Errorf("unauthorized Sender %s, messageID %s", sender.String(), body.MessageId)
h.lggr.Debugw(err.Error())
continue
}
if !trigger.rateLimiter.Allow(body.Sender) {
err = fmt.Errorf("request rate-limited for sender %s, messageID %s", sender.String(), body.MessageId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that with the current implementation this can drop some feedback to the user.

Scenario is: a request is sent that matches the topic on multiple workflows W1 and W2.
Then W1 doesn't start a run because of rate limiting, and W2 does.
The user would not know from the Gateway's response what happened, they just get a "success".
Even though they intended to start two runs with the single API request.

This is mitigated once observability is added, since the user could see in the dashboard what happened, but it would be the most complete to give feedback in the API response as well.

I'm not going to block this PR over it, since I think it's a decent change to how the Gateway responds, but we should have a ticket tracking to handle this. Can just be a note in the observability ticket to revisit and decide if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is not ideal. As mentioned before, behavior under partial failure is a big part of the problem with boxcarring. I'm happy to add a ticket for this.

continue
Copy link
Contributor

@jinhoonbang jinhoonbang Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to log err here?

}
fullyMatchedWorkflows++
TriggerEventID := body.Sender + payload.TriggerEventID
tr := capabilities.TriggerResponse{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we want these to be unique per workflow run
What is the impact if the user supplies the same TriggerEventID every time?

cc: @cedric-cordenier

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the workflow engine will just refuse to process subsequent events

We can't really avoid that in this case: the ID needs to be deterministic in order to be correlatable so if a user misuses the ID and repeats the same one multiple times that's on them 🤷

Event: capabilities.TriggerEvent{
TriggerType: TriggerType,
ID: TriggerEventID,
Outputs: wrappedPayload,
},
}
select {
case <-ctx.Done():
return nil
case trigger.ch <- tr:
// Sending n topics that match a workflow with n allowedTopics, can only be triggered once.
break
}
}
}
}
if matchedWorkflows == 0 {
return fmt.Errorf("no Matching Workflow Topics")
}

if fullyMatchedWorkflows > 0 {
return nil
}
return err
}

func (h *triggerConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
// TODO: Validate Signature
body := &msg.Body
sender := ethCommon.HexToAddress(body.Sender)
var payload webapicapabilities.TriggerRequestPayload
err := json.Unmarshal(body.Payload, &payload)
if err != nil {
h.lggr.Errorw("error decoding payload", "err", err)
err = h.sendResponse(ctx, gatewayID, body, webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s decoding payload", err.Error()).Error()})
if err != nil {
h.lggr.Errorw("error sending response", "err", err)
}
return
}

switch body.Method {
case webapicapabilities.MethodWebAPITrigger:
resp := h.processTrigger(ctx, gatewayID, body, sender, payload)
var response webapicapabilities.TriggerResponsePayload
if resp == nil {
response = webapicapabilities.TriggerResponsePayload{Status: "ACCEPTED"}
} else {
response = webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: resp.Error()}
h.lggr.Errorw("Error processing trigger", "gatewayID", gatewayID, "body", body, "response", resp)
}
err = h.sendResponse(ctx, gatewayID, body, response)
if err != nil {
h.lggr.Errorw("Error sending response", "body", body, "response", response, "err", err)
}
return

default:
h.lggr.Errorw("unsupported method", "id", gatewayID, "method", body.Method)
err = h.sendResponse(ctx, gatewayID, body, webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("unsupported method %s", body.Method).Error()})
if err != nil {
h.lggr.Errorw("error sending response", "err", err)
}
}
}

func (h *triggerConnectorHandler) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
cfg := req.Config
if cfg == nil {
return nil, errors.New("config is required to register a web api trigger")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to log + return the error here; it'll get logged upstream if the caller wants to (in prod, it will get logged by the workflow engine)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with the other errors in the function


reqConfig, err := h.ValidateConfig(cfg)
if err != nil {
return nil, err
}

if len(reqConfig.AllowedSenders) == 0 {
return nil, errors.New("allowedSenders must have at least 1 entry")
}

h.mu.Lock()
defer h.mu.Unlock()
_, errBool := h.registeredWorkflows[req.TriggerID]
if errBool {
return nil, fmt.Errorf("triggerId %s already registered", req.TriggerID)
}

rateLimiter, err := common.NewRateLimiter(reqConfig.RateLimiter)
if err != nil {
return nil, err
}

allowedSendersMap := map[string]bool{}
for _, k := range reqConfig.AllowedSenders {
allowedSendersMap[k] = true
}

allowedTopicsMap := map[string]bool{}
for _, k := range reqConfig.AllowedTopics {
allowedTopicsMap[k] = true
}

ch := make(chan capabilities.TriggerResponse, defaultSendChannelBufferSize)

h.registeredWorkflows[req.TriggerID] = webapiTrigger{
allowedTopics: allowedTopicsMap,
allowedSenders: allowedSendersMap,
ch: ch,
config: *reqConfig,
rateLimiter: rateLimiter,
}

return ch, nil
}

func (h *triggerConnectorHandler) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error {
h.mu.Lock()
defer h.mu.Unlock()
workflow, ok := h.registeredWorkflows[req.TriggerID]
if !ok {
return fmt.Errorf("triggerId %s not registered", req.TriggerID)
}

close(workflow.ch)
delete(h.registeredWorkflows, req.TriggerID)
return nil
}

func (h *triggerConnectorHandler) Start(ctx context.Context) error {
return h.StartOnce("GatewayConnectorServiceWrapper", func() error {
return h.connector.AddHandler([]string{"web_trigger"}, h)
})
}
func (h *triggerConnectorHandler) Close() error {
return h.StopOnce("GatewayConnectorServiceWrapper", func() error {
return nil
})
}

func (h *triggerConnectorHandler) HealthReport() map[string]error {
return map[string]error{h.Name(): h.Healthy()}
}

func (h *triggerConnectorHandler) Name() string {
return "WebAPITrigger"
}

func (h *triggerConnectorHandler) sendResponse(ctx context.Context, gatewayID string, requestBody *api.MessageBody, payload any) error {
payloadJSON, err := json.Marshal(payload)
if err != nil {
h.lggr.Errorw("error marshalling payload", "err", err)
payloadJSON, _ = json.Marshal(webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s marshalling payload", err.Error()).Error()})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you just return the err here, maybe also logging it to make it clear this is unexpected and shouldn't happen?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, best practice to handle all errors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then no message gets back to the gateway when we can send one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I'm specifically talking about handling the error on line 254; if that returns an error what payload are you going to send?


msg := &api.Message{
Body: api.MessageBody{
MessageId: requestBody.MessageId,
DonId: requestBody.DonId,
Method: requestBody.Method,
Receiver: requestBody.Sender,
Payload: payloadJSON,
},
}

return h.connector.SendToGateway(ctx, gatewayID, msg)
}
Loading
Loading