Skip to content

Commit 8f9ad21

Browse files
committed
chore(pegboard): allow configuring base_retry_timeout, actor_start_threshold, actor_stop_threshold, and retry_reset_duration
1 parent f9dd2a8 commit 8f9ad21

File tree

4 files changed

+85
-25
lines changed

4 files changed

+85
-25
lines changed

engine/packages/config/src/config/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod clickhouse;
1111
pub mod db;
1212
pub mod guard;
1313
pub mod logs;
14+
pub mod pegboard;
1415
pub mod pubsub;
1516
pub mod telemetry;
1617
pub mod topology;
@@ -24,6 +25,7 @@ pub use clickhouse::*;
2425
pub use db::Database;
2526
pub use guard::*;
2627
pub use logs::*;
28+
pub use pegboard::*;
2729
pub use pubsub::PubSub;
2830
pub use telemetry::*;
2931
pub use topology::*;
@@ -72,6 +74,9 @@ pub struct Root {
7274
#[serde(default)]
7375
pub api_peer: Option<ApiPeer>,
7476

77+
#[serde(default)]
78+
pub pegboard: Option<Pegboard>,
79+
7580
#[serde(default)]
7681
pub logs: Option<Logs>,
7782

@@ -107,6 +112,7 @@ impl Default for Root {
107112
guard: None,
108113
api_public: None,
109114
api_peer: None,
115+
pegboard: None,
110116
logs: None,
111117
topology: None,
112118
database: None,
@@ -136,6 +142,11 @@ impl Root {
136142
self.api_peer.as_ref().unwrap_or(&DEFAULT)
137143
}
138144

145+
pub fn pegboard(&self) -> &Pegboard {
146+
static DEFAULT: LazyLock<Pegboard> = LazyLock::new(Pegboard::default);
147+
self.pegboard.as_ref().unwrap_or(&DEFAULT)
148+
}
149+
139150
pub fn logs(&self) -> &Logs {
140151
static DEFAULT: LazyLock<Logs> = LazyLock::new(Logs::default);
141152
self.logs.as_ref().unwrap_or(&DEFAULT)
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use schemars::JsonSchema;
2+
use serde::{Deserialize, Serialize};
3+
4+
#[derive(Debug, Serialize, Deserialize, Clone, Default, JsonSchema)]
5+
#[serde(deny_unknown_fields)]
6+
pub struct Pegboard {
7+
/// Time to delay an actor from rescheduling after a rescheduling failure.
8+
///
9+
/// Unit is in milliseconds.
10+
///
11+
/// **Experimental**
12+
pub base_retry_timeout: Option<usize>,
13+
/// How long to wait after creating and not receiving a starting state before setting actor as lost.
14+
///
15+
/// Unit is in milliseconds.
16+
///
17+
/// **Experimental**
18+
pub actor_start_threshold: Option<i64>,
19+
/// How long to wait after stopping and not receiving a stop state before setting actor as lost.
20+
///
21+
/// Unit is in milliseconds.
22+
///
23+
/// **Experimental**
24+
pub actor_stop_threshold: Option<i64>,
25+
/// How long an actor goes without retries before it's retry count is reset to 0, effectively resetting its
26+
/// backoff to 0.
27+
///
28+
/// Unit is in milliseconds.
29+
///
30+
/// **Experimental**
31+
pub retry_reset_duration: Option<i64>,
32+
}
33+
34+
impl Pegboard {
35+
pub fn base_retry_timeout(&self) -> usize {
36+
self.base_retry_timeout.unwrap_or(2000)
37+
}
38+
39+
pub fn actor_start_threshold(&self) -> i64 {
40+
self.actor_start_threshold.unwrap_or(30_000)
41+
}
42+
43+
pub fn actor_stop_threshold(&self) -> i64 {
44+
self.actor_stop_threshold.unwrap_or(30_000)
45+
}
46+
47+
pub fn retry_reset_duration(&self) -> i64 {
48+
self.retry_reset_duration.unwrap_or(10 * 60 * 1000)
49+
}
50+
}

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,6 @@ mod destroy;
1010
mod runtime;
1111
mod setup;
1212

13-
/// Time to delay an actor from rescheduling after a rescheduling failure.
14-
const BASE_RETRY_TIMEOUT_MS: usize = 2000;
15-
/// How long to wait after creating and not receiving a starting state before setting actor as lost.
16-
const ACTOR_START_THRESHOLD_MS: i64 = util::duration::seconds(30);
17-
/// How long to wait after stopping and not receiving a stop state before setting actor as lost.
18-
const ACTOR_STOP_THRESHOLD_MS: i64 = util::duration::seconds(30);
19-
/// How long an actor goes without retries before it's retry count is reset to 0, effectively resetting its
20-
/// backoff to 0.
21-
const RETRY_RESET_DURATION_MS: i64 = util::duration::minutes(10);
22-
2313
#[derive(Clone, Debug, Serialize, Deserialize, Hash)]
2414
pub struct Input {
2515
pub actor_id: Id,
@@ -224,7 +214,11 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
224214
runtime::SpawnActorOutput::Allocated {
225215
runner_id,
226216
runner_workflow_id,
227-
} => runtime::LifecycleState::new(runner_id, runner_workflow_id),
217+
} => runtime::LifecycleState::new(
218+
runner_id,
219+
runner_workflow_id,
220+
ctx.config().pegboard().actor_start_threshold(),
221+
),
228222
runtime::SpawnActorOutput::Sleep => {
229223
ctx.activity(runtime::SetSleepingInput {
230224
actor_id: input.actor_id,
@@ -308,8 +302,10 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
308302
}) => match intent {
309303
protocol::ActorIntent::ActorIntentSleep => {
310304
if !state.sleeping {
311-
state.gc_timeout_ts =
312-
Some(util::timestamp::now() + ACTOR_STOP_THRESHOLD_MS);
305+
state.gc_timeout_ts = Some(
306+
util::timestamp::now()
307+
+ ctx.config().pegboard().actor_stop_threshold(),
308+
);
313309
state.sleeping = true;
314310

315311
ctx.activity(runtime::SetSleepingInput {
@@ -330,8 +326,10 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
330326
}
331327
}
332328
protocol::ActorIntent::ActorIntentStop => {
333-
state.gc_timeout_ts =
334-
Some(util::timestamp::now() + ACTOR_STOP_THRESHOLD_MS);
329+
state.gc_timeout_ts = Some(
330+
util::timestamp::now()
331+
+ ctx.config().pegboard().actor_stop_threshold(),
332+
);
335333

336334
ctx.activity(runtime::SetNotConnectableInput {
337335
actor_id: input.actor_id,

engine/packages/pegboard/src/workflows/actor/runtime.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@ use universaldb::utils::{FormalKey, IsolationLevel::*};
1515

1616
use crate::{keys, metrics, workflows::runner::RUNNER_ELIGIBLE_THRESHOLD_MS};
1717

18-
use super::{
19-
ACTOR_START_THRESHOLD_MS, Allocate, BASE_RETRY_TIMEOUT_MS, Destroy, Input, PendingAllocation,
20-
RETRY_RESET_DURATION_MS, State, destroy,
21-
};
18+
use super::{Allocate, Destroy, Input, PendingAllocation, State, destroy};
2219

2320
#[derive(Deserialize, Serialize)]
2421
pub struct LifecycleState {
@@ -42,7 +39,7 @@ pub struct LifecycleState {
4239
}
4340

4441
impl LifecycleState {
45-
pub fn new(runner_id: Id, runner_workflow_id: Id) -> Self {
42+
pub fn new(runner_id: Id, runner_workflow_id: Id, actor_start_threshold: i64) -> Self {
4643
LifecycleState {
4744
generation: 0,
4845
runner_id: Some(runner_id),
@@ -51,7 +48,7 @@ impl LifecycleState {
5148
will_wake: false,
5249
wake_for_alarm: false,
5350
alarm_ts: None,
54-
gc_timeout_ts: Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS),
51+
gc_timeout_ts: Some(util::timestamp::now() + actor_start_threshold),
5552
reschedule_state: RescheduleState::default(),
5653
}
5754
}
@@ -622,7 +619,7 @@ pub async fn reschedule_actor(
622619
let mut backoff = util::backoff::Backoff::new_at(
623620
8,
624621
None,
625-
BASE_RETRY_TIMEOUT_MS,
622+
ctx.config().pegboard().base_retry_timeout(),
626623
500,
627624
state.reschedule_state.retry_count,
628625
);
@@ -675,7 +672,8 @@ pub async fn reschedule_actor(
675672
state.runner_workflow_id = Some(*runner_workflow_id);
676673

677674
// Reset gc timeout once allocated
678-
state.gc_timeout_ts = Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS);
675+
state.gc_timeout_ts =
676+
Some(util::timestamp::now() + ctx.config().pegboard().actor_start_threshold());
679677
}
680678

681679
Ok(spawn_res)
@@ -727,8 +725,11 @@ struct CompareRetryInput {
727725
async fn compare_retry(ctx: &ActivityCtx, input: &CompareRetryInput) -> Result<(i64, bool)> {
728726
let now = util::timestamp::now();
729727

730-
// If the last retry ts is more than RETRY_RESET_DURATION_MS ago, reset retry count
731-
Ok((now, input.last_retry_ts < now - RETRY_RESET_DURATION_MS))
728+
// If the last retry ts is more than retry_reset_duration ago, reset retry count
729+
Ok((
730+
now,
731+
input.last_retry_ts < now - ctx.config().pegboard().retry_reset_duration(),
732+
))
732733
}
733734

734735
#[derive(Debug, Serialize, Deserialize, Hash)]

0 commit comments

Comments
 (0)