Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: python workflows #892

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open

feat: python workflows #892

wants to merge 13 commits into from

Conversation

j03-dev
Copy link
Contributor

@j03-dev j03-dev commented Oct 28, 2024

Migration notes

...

  • The change comes with new or modified tests
  • Hard-to-understand functions have explanatory comments
  • End-user documentation is updated to reflect the change

Copy link

linear bot commented Oct 28, 2024

@j03-dev j03-dev changed the title feat(sub):python workflows feat: python workflows Oct 28, 2024
Comment on lines +36 to +70
.then((wfResult: unknown) => {
self.postMessage(
Ok(
Msg(
type,
{
kind: "SUCCESS",
result: wfResult,
run: runCtx!.getRun(),
schedule,
} satisfies WorkflowResult,
),
),
);
})
.catch((wfException: unknown) => {
self.postMessage(
Ok(
Msg(
type,
{
kind: "FAIL",
result: wfException instanceof Error
? wfException.message
: JSON.stringify(wfException),
exception: wfException instanceof Error
? wfException
: undefined,
run: runCtx!.getRun(),
schedule,
} satisfies WorkflowResult,
),
),
);
});
Copy link
Contributor

@michael-0acf4 michael-0acf4 Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should use the same then/catch blocks for both. This is the basis of the interrupt logic, so it must be identical for each languages.

const result = Meta.python.executePython({
python_module_path: modulePath,
python_function_name: functionName,
executing_context: runCtx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious on how you will tackle the context methods. Will you try to unify it for both languages or.. ?

@@ -68,6 +69,9 @@ bytes.workspace = true
protobuf.workspace = true
protobuf-json-mapping.workspace = true

# python
pyo3 = { workspace = true, features = ["extension-module"] }
Copy link
Contributor

@michael-0acf4 michael-0acf4 Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extension-module

nice!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, which python version are you experimenting on this?
Is there a way to lock the Python version directly here?

@michael-0acf4
Copy link
Contributor

Suggestion:
Unifying the context for both languages into pure Rust should now be doable since we can use Meta inside workers (solves for the deno case), the python one will just use the rust implementation directly. The challenge would be on the hostcall.

execution_status: String,
}

fn convert_json_to_python_object(py: Python, json_value: &Value) -> PyResult<PyObject> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't pyo3 already providing such function?

Comment on lines 279 to 280
if wait_until > &now {
bail!(Interupt::Saveretry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest you to use enums directly. As we are within rust already, we can make clear distinctions between execution errors, interrupts and the returned value.

Comment on lines +113 to +125
impl std::error::Error for Interupt {}

pub enum Strategy {
Linear,
}

pub struct Retry {
pub strategy: Option<String>,
pub min_backoff_ms: i32,
pub max_backoff_ms: i32,
pub max_retries: i32,
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move into another file maybe? converters.rs should only contain conversion code imo.

Comment on lines +242 to +246

pub fn next_id(&mut self) -> u32 {
self.id += 1;
self.id
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest creating a context struct that owns a run and move that into another file, just like the way deno_context.ts is designed.

@@ -78,13 +79,126 @@ pub struct Operation {
/// Each operation is produced from the workflow execution
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Run {
pub id: u32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you specify in a comment the difference between id and run_id?

pub fn append_op(&mut self, op: OperationEvent) {
let has_stopped = self
.operations
.iter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reversing the iterator here can add some performance ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants