Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: python workflows #892

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
66 changes: 66 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 13 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ members = [
"src/typegate/standalone",
"src/typegraph/core",
"src/xtask",
"src/substantial"
"src/substantial",
]

exclude = [
Expand Down Expand Up @@ -69,19 +69,19 @@ sha2 = "0.10.8"
seahash = "4.1.0"

# patterns
anyhow = "1.0.89" # FIXME: replace anyhow with eyre
anyhow = "1.0.89" # FIXME: replace anyhow with eyre
color-eyre = "0.6.3"
eyre = "0.6.12" # NOTE: keep in sync with verison used by color-eyre
eyre = "0.6.12" # NOTE: keep in sync with verison used by color-eyre
thiserror = "1.0.64"
indoc = "2.0.5"
unindent = "0.2.3"
itertools = "0.13.0"
lazy_static = "1.5.0" # FIXME: replace with Lazy Cell
lazy_static = "1.5.0" # FIXME: replace with Lazy Cell
crossbeam-channel = "0.5.13"
enum_dispatch = "0.3.13"
tap = "1.0.1"
derive_more = { version = "1", features = ["from"] }
cached = "0.53.1" # FIXME: replace usage with a Lazy Cell + dashmap
cached = "0.53.1" # FIXME: replace usage with a Lazy Cell + dashmap
garde = "0.20"
paste = "1.0.15"

Expand Down Expand Up @@ -116,7 +116,7 @@ indexmap = { version = "2.6.0", features = ["serde"] }
semver = "1.0.23"
dashmap = "6.1.0"
connection-string = "0.2.0"
chrono = { version = "0.4.38", features = ["serde"] }
chrono = { version = "0.4.38", features = ["serde"] }
tera = { version = "1.20", default-features = false }
ordered-float = "4.3.0"
graphql-parser = "0.4.0"
Expand Down Expand Up @@ -150,7 +150,7 @@ tracing-unwrap = { version = "1.0.1", features = ["log-location"] }
tracing-appender = "0.2.3"

# async
futures = "=0.3.30" # pinned due to bug with .31 with zeromq (deno)
futures = "=0.3.30" # pinned due to bug with .31 with zeromq (deno)
futures-concurrency = "7.6"
futures-lite = "2.3"
tokio = { version = "1", features = ["parking_lot"] }
Expand All @@ -164,7 +164,9 @@ temporal-sdk-core-protos = { git = "https://github.com/temporalio/sdk-core", rev
# prisma
query-core = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" }
query-connector = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" }
request-handlers = { git = "https://github.com/metatypedev/prisma-engines", features = ["all"], branch = "fix/version-compat" }
request-handlers = { git = "https://github.com/metatypedev/prisma-engines", features = [
"all",
], branch = "fix/version-compat" }
datamodel-renderer = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" }
user-facing-errors = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" }
query-engine-metrics = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" }
Expand All @@ -190,6 +192,9 @@ protobuf = "3.6.0"
protobuf-json-mapping = "3.6.0"
proto-parser = { git = "https://github.com/metatypedev/proto-parser", branch = "main" }

# python
pyo3 = { version = "0.22.5" }

# test
assert_cmd = "2.0.16"
pretty_assertions = "1.4.1"
Expand Down
211 changes: 208 additions & 3 deletions src/substantial/src/converters.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::HashMap;
use serde_json::Value;
use std::{collections::HashMap, fmt};

use anyhow::{bail, Context, Ok, Result};
use chrono::{DateTime, Utc};
use anyhow::{bail, Context, Result};
use chrono::{DateTime, Duration, Utc};

use protobuf::{
well_known_types::{
Expand Down Expand Up @@ -78,13 +79,126 @@ pub struct Operation {
/// Each operation is produced from the workflow execution
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Run {
pub id: u32,
pub run_id: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you specify in a comment the difference between id and run_id?

pub operations: Vec<Operation>,
}

#[derive(Debug)]
pub enum Interupt {
Sleep,
SaveRetry,
WaitReceiveEvent,
WaitHandleEvent,
WaitEnsureValue,
}

impl Interupt {
const PREFIX: &'static str = "SUBSTANTIAL_INTERRUPT_";
}

impl fmt::Display for Interupt {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let variant = match self {
Self::Sleep => "SLEEP",
Self::SaveRetry => "SAVE_RETRY",
Self::WaitReceiveEvent => "WAIT_RECEIVE_EVENT",
Self::WaitHandleEvent => "WAIT_HANDLE_EVENT",
Self::WaitEnsureValue => "WAIT_ENSURE_VALUE",
};
write!(f, "{}{:?}", Self::PREFIX, variant)
}
}

impl std::error::Error for Interupt {}

pub enum Strategy {
Linear,
}

pub struct Retry {
pub strategy: Option<String>,
pub min_backoff_ms: i32,
pub max_backoff_ms: i32,
pub max_retries: i32,
}

pub struct RetryStrategy {
Comment on lines +113 to +125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move into another file maybe? converters.rs should only contain conversion code imo.

min_backoff_ms: Option<i32>,
max_backoff_ms: Option<i32>,
max_retries: i32,
}

impl RetryStrategy {
pub fn new(
max_retries: i32,
min_backoff_ms: Option<i32>,
max_backoff_ms: Option<i32>,
) -> anyhow::Result<Self> {
if max_retries < 1 {
anyhow::bail!("maxRetries < 1");
}

let mut min_ms = min_backoff_ms;
let mut max_ms = max_backoff_ms;

match (min_ms, max_ms) {
(Some(low), Some(high)) => {
if low >= high {
anyhow::bail!("minBackoffMs >= maxBackoffMs");
}
if low < 0 {
anyhow::bail!("minBackoffMs < 0");
}
}
(Some(low), None) => {
max_ms = Some(low + 10);
}
(None, Some(high)) => {
min_ms = Some(0.max(high - 10));
}
(None, None) => {}
}

Ok(Self {
min_backoff_ms: min_ms,
max_backoff_ms: max_ms,
max_retries,
})
}

pub fn eval(&self, strategy: Strategy, retries_left: i32) -> anyhow::Result<i32> {
match strategy {
Strategy::Linear => self.linear(retries_left),
// Add more strategy matches here
}
}

fn linear(&self, retries_left: i32) -> anyhow::Result<i32> {
if retries_left <= 0 {
anyhow::bail!("retries left <= 0");
}

let dt = self.max_backoff_ms.unwrap_or(0) - self.min_backoff_ms.unwrap_or(0);
Ok(((self.max_retries - retries_left) * dt) / self.max_retries)
}
}

pub struct Save {
pub timeout_ms: Option<i32>,
pub retry: Option<Retry>,
}

#[derive(Serialize)]
pub struct SaveOutput {
pub payload: Option<Value>,
pub current_retry_count: Option<i32>,
}

impl Run {
pub fn new(run_id: String) -> Self {
Self {
id: 0,
run_id,
operations: vec![],
}
Expand Down Expand Up @@ -125,6 +239,97 @@ impl Run {
pub fn reset(&mut self) {
self.operations = vec![];
}

pub fn next_id(&mut self) -> u32 {
self.id += 1;
self.id
}

Comment on lines +242 to +246
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest creating a context struct that owns a run and move that into another file, just like the way deno_context.ts is designed.

pub fn append_op(&mut self, op: OperationEvent) {
let has_stopped = self
.operations
.iter()
.any(|op| matches!(op.event, OperationEvent::Stop { .. }));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reversing the iterator here can add some performance ;)

if !has_stopped {
self.operations.push(Operation {
at: Utc::now(),
event: op,
});
}
}

pub fn save(&mut self) -> Result<SaveOutput> {
let next_id = self.next_id();
let mut current_retry_count: i32 = 1;

for Operation { event, .. } in self.operations.iter() {
if let OperationEvent::Save { id, value } = event {
if *id == next_id {
if let SavedValue::Resolved { payload } = value {
return Ok(SaveOutput {
payload: Some(payload.clone()),
current_retry_count: None,
});
} else if let SavedValue::Retry {
counter,
wait_until,
} = value
{
let now = Utc::now();
if wait_until > &now {
bail!(Interupt::SaveRetry);
} else {
current_retry_count = *counter;
}
}
}
}
}

Ok(SaveOutput {
payload: None,
current_retry_count: Some(current_retry_count),
})
}

pub fn sleep(&mut self, duration_ms: i32) -> Result<()> {
let next_id = self.next_id();
for Operation { event, .. } in self.operations.iter() {
if let OperationEvent::Sleep { id, end, .. } = event {
if next_id == *id {
if end <= &Utc::now() {
return Ok(());
} else {
bail!(Interupt::Sleep);
}
}
}
}

let start = Utc::now();

let end = start + Duration::milliseconds(start.timestamp() + duration_ms as i64);

self.operations.push(Operation {
at: start,
event: OperationEvent::Sleep {
id: next_id,
start,
end,
},
});
bail!(Interupt::Sleep);
}

pub fn append_event(&mut self, event_name: String, payload: Value) {
self.operations.push(Operation {
at: Utc::now(),
event: OperationEvent::Send {
event_name,
value: payload,
},
});
}
}

impl TryFrom<Event> for Operation {
Expand Down
Loading
Loading