Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/workflows/sdk: add WorkflowSpecFactory.BeginSerial/BeginAsync #821

Draft
wants to merge 2 commits into
base: workflowchart
Choose a base branch
from

Conversation

jmank88
Copy link
Collaborator

@jmank88 jmank88 commented Oct 1, 2024

Depends on #790

This PR extends WorkflowSpecFactory to include BeginSerial() and BeginAsync() methods for toggling between two builder modes.

Here is an example from a test which reads from two APIs and one chain, then writes on chain:

flowchart
	
	trigger-chain-event[\"<b>trigger-chain-event</b><br>trigger<br><i>(chain_reader[at]1.0.0)</i>"/]
	
	compute-bar["<b>compute-bar</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
	get-bar --> compute-bar
				
	compute-foo["<b>compute-foo</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
	get-foo --> compute-foo
				
	get-bar["<b>get-bar</b><br>action<br><i>(http[at]1.0.0)</i>"]
	trigger-chain-event --> get-bar
				
	get-foo["<b>get-foo</b><br>action<br><i>(http[at]1.0.0)</i>"]
	trigger-chain-event --> get-foo
				
	read-token-price["<b>read-token-price</b><br>action<br><i>(chain_reader[at]1.0.0)</i>"]
	trigger-chain-event --> read-token-price
				
	data-feeds-report[["<b>data-feeds-report</b><br>consensus<br><i>(offchain_reporting[at]1.0.0)</i>"]]
	compute-bar -- Value --> data-feeds-report 
				compute-foo -- Value --> data-feeds-report 
				read-token-price -- Value --> data-feeds-report 
				
	unnamed7[/"target<br><i>(write_ethereum-testnet-sepolia[at]1.0.0)</i>"\]
	data-feeds-report --> unnamed7
				
Loading

Here is the same workflow with (dotted) edges added in order to serialize the overall DAG:

flowchart

    trigger-chain-event[\"<b>trigger-chain-event</b><br>trigger<br><i>(chain_reader[at]1.0.0)</i>"/]

    compute-bar["<b>compute-bar</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
    get-bar --> compute-bar

    compute-foo["<b>compute-foo</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
    get-foo --> compute-foo

    get-bar["<b>get-bar</b><br>action<br><i>(http[at]1.0.0)</i>"]
    trigger-chain-event --> get-bar

    get-foo["<b>get-foo</b><br>action<br><i>(http[at]1.0.0)</i>"]
    trigger-chain-event --> get-foo

    read-token-price["<b>read-token-price</b><br>action<br><i>(chain_reader[at]1.0.0)</i>"]
    trigger-chain-event --> read-token-price

    data-feeds-report[["<b>data-feeds-report</b><br>consensus<br><i>(offchain_reporting[at]1.0.0)</i>"]]
    compute-bar -- Value --> data-feeds-report
    compute-foo -- Value --> data-feeds-report
    read-token-price -- Value --> data-feeds-report

    unnamed7[/"target<br><i>(write_ethereum-testnet-sepolia[at]1.0.0)</i>"\]
    data-feeds-report --> unnamed7

    compute-foo -..-> get-bar
    compute-bar -..-> read-token-price
Loading

TODO

  • make Condition functional

Copy link
Collaborator

@nolag nolag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, if you can add the invisible dependency to in this function then the workflow engine will respect it. I don't think this can be merged without it.

w.serialMode = true
}

func (w *WorkflowSpecFactory) BeginAsync() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: serial and parallel pair better, or concurrent. All the steps are actually async under the hood.

Copy link
Collaborator Author

@jmank88 jmank88 Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would EndSerial() make sense?
Edit: Or just SetSerial(bool) in place of both?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking forward about how to generalize this a bit might help too. Right now we have one global mode, but what if we want to build sub-graphs within one workflow, and control the mode of each subgraph as we construct it? I think it will be important for the API to be consistent throughout, and one test would be whether we can write helper funcs that are able to add steps to a workflow or a sub-workflow, without any special logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree that subgraphs are important generalization. subgraphs and phases/checkpoints that we discussed yesterday are similar concepts. i think you can implement phases with an ordered list of subgraphs such that the previous element in the list must complete before starting the given subgraph

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes me think of another error scenario - when a step fails, and so we do not execute the subsequent dependent steps, must we still propagate failure through them? i.e. if we have A --> B -..-> C, where B uses data from A and C runs only if B fails, then what happens if A fails? Do we skip B (and C)? Or do we propagate the failure through B, triggering C? I think users will want to express the latter, in terms of "if any part of the subgraph fails, run this alternate subgraph"

Comment on lines +122 to +126
func NewSerialWorkflowSpecFactory(params NewWorkflowParams) *WorkflowSpecFactory {
f := NewWorkflowSpecFactory(params)
f.BeginSerial()
return f
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it just be a param? We can default it either way right now because we don't have any actual WASM workflows yet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a param is necessary because it is trivial to call the method instead. I only added this as a convenience to cater to the "serial only" user experience, but would gladly remove it.

@jmank88 jmank88 force-pushed the workflowchart branch 4 times, most recently from b26ed1a to 7c3e9ff Compare October 7, 2024 23:02
@jmank88 jmank88 force-pushed the workflowchart branch 4 times, most recently from 74d39f7 to 4a33a18 Compare October 16, 2024 02:14
@@ -35,3 +153,61 @@ func (w *WorkflowSpec) Steps() []StepDefinition {
s = append(s, w.Targets...)
return s
}

func (w *WorkflowSpec) FormatChart() (string, error) {
Copy link
Collaborator

@shileiwill shileiwill Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @jmank88 , I am trying to figure out how we provide the visual to users via CLI. From the dev platform CLI perspective,

  1. We get the paths of local files from users and get the wasm binary and config as byte[]
  2. We can get an instance of WorkflowSpec via GetWorkflowSpec
  3. We can call this FormatChart() to get the visual? Is the visual a SVG or image to the local?

Want to hear your thoughts on the flow, and how to integrate. Thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this discussion over to #790, which just introduces the chart without any of this builder stuff.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants