Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web API Trigger capability and handler #14580

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from

Conversation

DavidOrchard
Copy link
Contributor

@DavidOrchard DavidOrchard commented Sep 26, 2024

  • implement web api trigger capability that accepts inbound web api json rpc requests, CAPPL-22 and CAPPL-24
  • implement script to send trigger CAPPL-23
  • implement gateway connector and tests CAPPL-21

Design doc
https://docs.google.com/document/d/1mCTAo-ix-P923eUlh4SloZfBN9PCvgf90oHWbmykjsc/edit#heading=h.4e7ng0tekbkc

@DavidOrchard DavidOrchard requested review from a team as code owners September 26, 2024 19:11
@DavidOrchard DavidOrchard requested review from vyzaldysanchez and removed request for a team September 26, 2024 19:11
@DavidOrchard DavidOrchard force-pushed the feature/CAPPL-22-web-api-trigger-connector-handler-2 branch 3 times, most recently from 28afc62 to 4814a6c Compare September 26, 2024 19:45
@DavidOrchard DavidOrchard force-pushed the feature/CAPPL-22-web-api-trigger-connector-handler-2 branch from 4814a6c to 2e825fe Compare September 26, 2024 23:23
@@ -0,0 +1,299 @@
package webapi
Copy link
Contributor

@justinkaseman justinkaseman Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have this be isolated trigger package, so if we want a webapi package it doesn't bleed together.
Which makes the path webapi/trigger

type TriggerConfig struct {
AllowedSenders []string `toml:"allowedSenders"`
AllowedTopics []string `toml:"allowedTopics"`
// RateLimiter common.RateLimiterConfig `toml:"rateLimiter"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stray commented out line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to show what it can't be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow. What do you mean what it can't be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It had been unacceptable, but with the float fixes it now works. Good callout.

Copy link
Contributor

@justinkaseman justinkaseman Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh I see, there's a TODO here. That's what it should be

}

type webapiTrigger struct {
allowedSendersMap map[string]bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
allowedSendersMap map[string]bool
allowedSenders map[string]bool


type webapiTrigger struct {
allowedSendersMap map[string]bool
allowedTopicsMap map[string]bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
allowedTopicsMap map[string]bool
allowedTopics map[string]bool


// Iterate over each topic, checking against senders and rateLimits, then starting event processing and responding
func (h *triggerConnectorHandler) processTrigger(ctx context.Context, gatewayID string, body *api.MessageBody, sender ethCommon.Address, payload webapicapabilities.TriggerRequestPayload) {
// Pass on the payload with the expectation that that is acceptable format for the executor
Copy link
Contributor

@justinkaseman justinkaseman Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that that typo

Suggested change
// Pass on the payload with the expectation that that is acceptable format for the executor
// Pass on the payload with the expectation that it's in an acceptable format for the executor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually meant that that, but I can live with the amendment.

// Iterate over each topic, checking against senders and rateLimits, then starting event processing and responding
func (h *triggerConnectorHandler) processTrigger(ctx context.Context, gatewayID string, body *api.MessageBody, sender ethCommon.Address, payload webapicapabilities.TriggerRequestPayload) {
// Pass on the payload with the expectation that that is acceptable format for the executor
wrappedPayload, _ := values.WrapMap(payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally better to handle the error than to ignore it

Comment on lines 56 to 57
mu sync.Mutex
registeredWorkflows map[string]webapiTrigger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed these from my changes because workflow guarantees only registered workflows are executed.

Copy link
Contributor

@justinkaseman justinkaseman Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Triggers will be different from the target, because on registering a workflow it makes a new channel, then on trigger messages it sends a message down the channel. Plus there is other state to persist by trigger.

return handler, nil
}

// Iterate over each topic, checking against senders and rateLimits, then starting event processing and responding
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go doc convention is to start comments with function name

Suggested change
// Iterate over each topic, checking against senders and rateLimits, then starting event processing and responding
// processTrigger iterates over each topic, checking against senders and rateLimits, then starting event processing and responding

AllowedSenders []string `toml:"allowedSenders"`
AllowedTopics []string `toml:"allowedTopics"`
// RateLimiter common.RateLimiterConfig `toml:"rateLimiter"`
RateLimiter *values.Map `toml:"rateLimiter"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that the float support was merged in for values library. Does this need to be *values.Map still? or can we just use common.RateLimiterConfig here and have ValidateConfig unwrap to the struct directly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that now works! Thx for the callout.

hasMatchedAWorkflow = true

if !trigger.allowedSendersMap[sender.String()] {
h.lggr.Errorw("Unauthorized Sender")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
h.lggr.Errorw("Unauthorized Sender")
h.lggr.Errorw("Unauthorized Sender", "sender", sender.String(), "messageID", body.messageId())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, include as much info as available for debugging

return
}
if !trigger.rateLimiter.Allow(body.Sender) {
h.lggr.Errorw("request rate-limited")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
h.lggr.Errorw("request rate-limited")
h.lggr.Errorw("request rate-limited", "Unauthorized Sender", "sender", sender.String(), "messageID", body.messageId())

// Iterate over each topic, checking against senders and rateLimits, then starting event processing and responding
func (h *triggerConnectorHandler) processTrigger(ctx context.Context, gatewayID string, body *api.MessageBody, sender ethCommon.Address, payload webapicapabilities.TriggerRequestPayload) {
// Pass on the payload with the expectation that that is acceptable format for the executor
wrappedPayload, _ := values.WrapMap(payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log en error if it happens?

Comment on lines 119 to 122
trigger.ch <- tr
response = webapicapabilities.TriggerResponsePayload{Status: "ACCEPTED"}
// Sending n topics that match a workflow with n allowedTopics, can only be triggered once.
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not block if context is cancelled?

Suggested change
trigger.ch <- tr
response = webapicapabilities.TriggerResponsePayload{Status: "ACCEPTED"}
// Sending n topics that match a workflow with n allowedTopics, can only be triggered once.
break
select {
case <-ctx.Done() :
return
case trigger.ch <- tr:
response = webapicapabilities.TriggerResponsePayload{Status: "ACCEPTED"}
// Sending n topics that match a workflow with n allowedTopics, can only be triggered once.
break
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this, and it broke the last few tests. It seems that the channel is getting closed partway through the test. Is the context really short lived?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a solution. From https://purpleidea.com/blog/2023/02/24/deadline-context-test-cancellation-in-golang/ I found I can do this ctx, _ = context.WithDeadline(ctx, time.Now().Add(10*time.Second))

h.lggr.Errorw("No Matching Workflow Topics")
response = webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: "No Matching Workflow Topics"}
}
_ = h.sendResponse(ctx, gatewayID, body, response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log error?

err := json.Unmarshal(body.Payload, &payload)
if err != nil {
d.lggr.Errorw("error decoding payload", "err", err)
callbackCh <- handlers.UserCallbackPayload{Msg: msg, ErrCode: api.UserMessageParseError, ErrMsg: fmt.Sprintf("error decoding payload %s", err.Error())}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to write to callbackCh here? returning an err from this function seems sufficient to send a HTTP Error back to the client. callbackCh seems to be only needed to handle messages from the node in HandleNodeMessage
similar questions for other error scenarios in this function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want the http connection to have the full error message, as that's the only way the gateway can respond. Good catch though, I want to return no error here and the UserResponse so that the ErrMsg is used.
The gateway code does

// send to the handler
	responseCh := make(chan handlers.UserCallbackPayload, 1)
	err = handler.HandleUserMessage(ctx, msg, responseCh)
	if err != nil {
		return newError(g.codec, msg.Body.MessageId, api.HandlerError, err.Error())
	}
	// await response
	var response handlers.UserCallbackPayload
	select {
	case <-ctx.Done():
		return newError(g.codec, msg.Body.MessageId, api.RequestTimeoutError, "handler timeout")
	case response = <-responseCh:
		break
	}
	if response.ErrCode != api.NoError {
		return newError(g.codec, msg.Body.MessageId, response.ErrCode, response.ErrMsg)
	}
	// encode
	rawResponse, err = g.codec.EncodeResponse(response.Msg)
	if err != nil {
		return newError(g.codec, msg.Body.MessageId, api.NodeReponseEncodingError, "")
	}
	promRequest.WithLabelValues(api.NoError.String()).Inc()
	return rawResponse, api.ToHttpErrorCode(api.NoError)

AllowedTopics []string `toml:"allowedTopics"`
// RateLimiter common.RateLimiterConfig `toml:"rateLimiter"`
RateLimiter *values.Map `toml:"rateLimiter"`
RequiredParams []string `toml:"requiredParams"`
Copy link
Contributor

@justinkaseman justinkaseman Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a TODO to enforce these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment on it.

func (h *triggerConnectorHandler) processTrigger(ctx context.Context, gatewayID string, body *api.MessageBody, sender ethCommon.Address, payload webapicapabilities.TriggerRequestPayload) {
// Pass on the payload with the expectation that that is acceptable format for the executor
wrappedPayload, _ := values.WrapMap(payload)
hasMatchedAWorkflow := false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random idea: what about instead keeping a count of matched workflows. May be an interesting metric to emit.

hasMatchedAWorkflow := false
var response webapicapabilities.TriggerResponsePayload

for _, trigger := range h.registeredWorkflows {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very computationally heavy to iterate O(n) through every registered workflow of a node.
Something feels off here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a map of trigger topic to trigger, grab all the matches, and then dedup the registeredTrigger to ensure only 1 invoke? What troubles me though is the potential for n:m of topic to trigger. Imagine trigger A says it's interested in topics foo and bar. trigger B says it's interested in bar and bat. There'd need to be a map of foo, bar, and bat, with bar pointing to A and B. If a message said it's for topics foo, bar, bat and/or has empty topic, need to get just 1 of A & B

Copy link
Contributor

@justinkaseman justinkaseman Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking more so about keying off of the sender.

Roughly sketched -- two data structures:

	registeredWorkflows map[string]webapiTrigger // as you have it now
        senderToTriggerIds map[string][]string // map[address][]triggerId

// }
// }

// from Web API Trigger Doc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

// https://gateway-us-1.chain.link/web-trigger
// {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noisey to have a test fixture commented here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally find sample json really helpful.

topics := payload.Topics
// empty topics means all topics
if len(topics) == 0 {
topics = []string{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If len(topics) == 0, then topics already = []string{}

if !trigger.allowedSenders[sender.String()] {
h.lggr.Errorw("Unauthorized Sender", "sender", sender.String(), "messageID", body.MessageId)
h.sendResponse(ctx, gatewayID, body, webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: "Unauthorized Sender"})
return
Copy link
Contributor

@justinkaseman justinkaseman Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an illustration of what I was saying about iterating over every workflow seeming off:

I send a request with no topics, as sender 0x1:

  {
    jsonrpc: "2.0",
    id: "...",
    method: "web-trigger",
    params: {
      signature: "...",
      body: {
        don_id: "workflow_123",
        payload: {
          trigger_id: "[email protected]",
          trigger_event_id: "action_1234567890",
          timestamp: 1234567890,
          topics: [],
          params: {}
        }
      }
    }
  }

On the first iteration because there are no topics, it fills itself on line 93 with the topics of the first trigger.
It passes through the if check on line 99 because it inherited all of the topics.
But now on line 102 if the first trigger does have address 0x1 as an allowed sender it will fail.

So no one will be able to use empty topics. And there are conflicts in multiple users using the same topic.

Put another way: topics are scoped too high and conflicting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think empty topics should be removed for this and other complexities. I think a consumer should explicitly know which triggers to invoke.

if !trigger.rateLimiter.Allow(body.Sender) {
h.lggr.Errorw("request rate-limited", sender.String(), "messageID", body.MessageId)
h.sendResponse(ctx, gatewayID, body, webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: "request rate-limited"})
return
Copy link
Contributor

@justinkaseman justinkaseman Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I send a request with a topic that is meant to kick off multiple workflows, then it will exit out at the first rate limit that happens. This could lead to inconsistent results.

Is this behavior what we want? Would it be better to continue on being rate limited? All or nothing?
Maybe the response to the Gateway should be an array of statuses?

Copy link
Contributor Author

@DavidOrchard DavidOrchard Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly the point I brought up earlier about the issue around boxcarring requests, which is partial failures and status reports. What is the status if some are successful and some aren't for example. There's no easy solution imo. Which is why it's rarely done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry about partial success / partial failures. IMO all or nothing seems to be a simple and acceptable approach here. In the first iteration of the loop, validate rate limits and allowed senders and collect all triggers that need to be triggered. If any of the validation fails, none of the workflows are triggered. If all validation passes, then start all matching triggers.

one option is to create a ticket for this and come back to it? would make reviewing easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same worry. I decided to skip the ability to invoke multiple topics. I spoke with Ryan and he's ok with that for V1. Longer term, the reality is that boxcarring is pretty hard which is why no one really does it. Because the server can't know what the client wants for boxcarred requests, the client will have to tell it, like "FailFast" or even per condition: {validation: "FailFast", senders: "FailFast", topics: "FailSlow", rateLimit: "FailSlow"}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip the ability to invoke multiple topics? Or skip the ability to invoke multiple workflows?

@@ -15,3 +19,52 @@ type TargetResponsePayload struct {
Headers map[string]string `json:"headers,omitempty"` // HTTP headers
Body []byte `json:"body,omitempty"` // HTTP response body
}

const TriggerType = "[email protected]"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should live within the web-trigger itself, since the only place it is used is there and in tests.

return
}

TriggerEventID := body.Sender + payload.TriggerEventID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we want these to be unique per workflow run
What is the impact if the user supplies the same TriggerEventID every time?

cc: @cedric-cordenier

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the workflow engine will just refuse to process subsequent events

We can't really avoid that in this case: the ID needs to be deterministic in order to be correlatable so if a user misuses the ID and repeats the same one multiple times that's on them 🤷

d.mu.Unlock()
body := msg.Body
var payload TriggerRequestPayload
err := json.Unmarshal(body.Payload, &payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fills empty fields with default values. Since we don't do any validation on the values, what would go wrong if an empty request is sent?

Probably worth a test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I've also added a test for an invalid payload.

return nil
}

if time.Now().Unix()-int64(d.config.MaxAllowedMessageAgeSec) > payload.Timestamp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user supplies their own timestamp. Does this pose any risk?

I think this check is to protect the user, but may be worth thinking about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a risk perspective, I don't think there is significant risk. At the end of the day, they supply the timestamp, and I assume they would timestamp the current time. So the worst case is that we process old messages that somehow have taken a long time to get through their system.

Or are you worried about parsing the field, injecting bad values? I believe we'd always get back an int64 so no panics possible.

It does make me wonder if we should use this timestamp as part of the trigger_event_id tho.

@DavidOrchard DavidOrchard force-pushed the feature/CAPPL-22-web-api-trigger-connector-handler-2 branch from f9e54e7 to 94cf373 Compare September 27, 2024 19:07
@DavidOrchard DavidOrchard force-pushed the feature/CAPPL-22-web-api-trigger-connector-handler-2 branch from 94cf373 to 1dd461b Compare September 27, 2024 19:14
@DavidOrchard DavidOrchard force-pushed the feature/CAPPL-22-web-api-trigger-connector-handler-2 branch from b1f0719 to 799b899 Compare September 27, 2024 19:38
AllowedSenders []string `toml:"allowedSenders"`
AllowedTopics []string `toml:"allowedTopics"`
RateLimiter common.RateLimiterConfig `toml:"rateLimiter"`
// RequiredParams is advisory to the consumer, it is not enforced.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this a bit more? Who is the consumer in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumer would be whoever is sending the web API call to kick off the trigger.

@DavidOrchard May be more clear to just change this comment line to something like "TODO: unused config, not enforced in requests"

payloadJSON, err := json.Marshal(payload)
if err != nil {
h.lggr.Errorw("error marshalling payload", "err", err)
payloadJSON, _ = json.Marshal(webapicapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s marshalling payload", err.Error()).Error()})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you just return the err here, maybe also logging it to make it clear this is unexpected and shouldn't happen?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, best practice to handle all errors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then no message gets back to the gateway when we can send one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I'm specifically talking about handling the error on line 254; if that returns an error what payload are you going to send?

cfg := req.Config
if cfg == nil {
h.lggr.Errorw("config is required to register a web api trigger")
return nil, errors.New("config is required to register a web api trigger")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to log + return the error here; it'll get logged upstream if the caller wants to (in prod, it will get logged by the workflow engine)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with the other errors in the function

}

rateLimiter, err := common.NewRateLimiter(reqConfig.RateLimiter)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: stray spacing line

}

// processTrigger iterates over each topic, checking against senders and rateLimits, then starting event processing and responding
func (h *triggerConnectorHandler) processTrigger(ctx context.Context, gatewayID string, body *api.MessageBody, sender ethCommon.Address, payload webapicapabilities.TriggerRequestPayload) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DavidOrchard Some feedback on code organisation here: a lot of the code below is about handling errors and calling sendResponse with the resultant error:

You might find it easier to put the core logic in an inner function that returns and error and gets called from processTrigger. processTrigger could then check the error and call sendResponse once, for all errors returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I now have processTrigger return error and the handleGatewayMessage is the place that calls sendResponse

return
}

TriggerEventID := body.Sender + payload.TriggerEventID + strconv.FormatInt(time.Now().Unix(), 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DavidOrchard This is problematic: the TriggerEventID generated needs to be consistent across all nodes in the workflowDON, otherwise it won't be possible to correlate requests during the consensus phase.

If we can rely on the sender being the same it's fine to use body.Sender + payload.TriggerEventID, otherwise just using TriggerEventID is fine

Copy link
Contributor

@justinkaseman justinkaseman Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DavidOrchard last week we talked about having this in the Gateway, so it is generated once with the Gateway's current time, then told to each node. Let's do that here to have it consistent across nodes.

@cedric-cordenier re: just using body.Sender + payload.TriggerEventID one thing I'm worried about is leaving it up to the user to ensure changing the TriggerEventID on every request, rather than enforcing it ourselves. #14580 (comment)

@@ -82,7 +82,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
return nil, errors.New("gateway connector is required for web API Trigger capability")
}
connector := d.gatewayConnectorWrapper.GetGatewayConnector()
triggerSrvc, err := webapi.NewTrigger(spec.StandardCapabilitiesSpec.Config, d.registry, connector, log)
triggerSrvc, err := webapitrigger.NewTrigger(spec.StandardCapabilitiesSpec.Config, d.registry, connector, log)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to rename the import here, it reads as webapi.NewTrigger unmodified which is clear about what it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually coming in as trigger but ok.


matchedWorkflows := 0
var response webapicapabilities.TriggerResponsePayload
for _, trigger := range h.registeredWorkflows {
Copy link
Contributor

@justinkaseman justinkaseman Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump on #14580 (comment)

With the current code when multiple workflows with different allowed senders use the same topics they will each be blocked. This seems very off usability-wise. If that's the intention, maybe we should guard against this in register workflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about that. Imagine workflow A with topic T and AllowedSender S, workflow B with Topic T and AllowedSender S'. There will be 2 triggers and they will have different allowedSenders, so I think the behavior codified is what we want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine two workflows for different customers:

Workflow W1: for customer C1

- allowed senders: 0x1
- topics: "forex"

Workflow W2: for customer C2

- allowed senders: 0x2 
- topics: "forex"

User 0x2 sends in a request R1:

sender: 0x2
topic: "forex"

User 0x2 wants to trigger W2
Line 94 starts iterating every workflow, so starts on W1
Line 95 starts with the first topic on W1: "forex"
Line 96 passes because the topic of R1 is "forex"
Line 99 errors because the allowed sender of W1 is 0x1
User 0x2 has to supply a topic, but when they supply the topic "forex" this will fail.

Same ends up happening for W1 and user 0x1. The first iteration works as expected, but then the second matches on W2, sees that 0x1 is not an allowed sender, and errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I see. I think we need a temporary error and keep proceeding.

// trigger_id - ID of the trigger corresponding to the capability ID
// trigger_event_id - uniquely identifies generated event (scoped to trigger_id and sender)
// timestamp - timestamp of the event (unix time), needs to be within certain freshness to be processed
// topics - [OPTIONAL] list of topics (strings) to be started by this event (affects all topics if empty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I say lower down in an amendments section that it must be specified and a single topic. But I can see how that's missed so I edited the field descriptions as you suggest.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants