Skip to content

Commit c3f9b4a

Browse files
authored
refactor: Add workflow spans and every cli logging (#603)
# Description This PR implements the following changes: - [x] Add initial workflow and task execution spans - [x] Add workflow initialize, start, and end logs - [x] Add computed and replayed receipt logs - [x] Add custom logging format and filter for EveryCLI - [x] Minor re-wording of existing logs ## Link to issue Implements spans needed in #457 ## Type of change - [x] New feature (non-breaking change that adds functionality) - [x] Refactor (non-breaking change that updates existing functionality) ## Test plan (required) Run Homestar with `EVERY_CLI` set to `true` to see the simplified logs: ``` EVERY_CLI=true cargo run -- start ``` Check that `RUST_LOG` works when `EVERY_CLI` is `false` or not set: ``` EVERY_CLI=false RUST_LOG=info cargo run -- start ```
1 parent 0f093f0 commit c3f9b4a

File tree

7 files changed

+144
-32
lines changed

7 files changed

+144
-32
lines changed

homestar-runtime/src/logger.rs

+60-19
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use tracing_subscriber::{layer::SubscriberExt as _, prelude::*, EnvFilter};
77

88
const LOG_FILE: &str = "homestar.log";
99
const DIRECTIVE_EXPECT: &str = "Invalid tracing directive";
10+
// Sets simplified logging filter and format for Every CLI
11+
const EVERY_CLI: &str = "EVERY_CLI";
1012

1113
/// Logger interface.
1214
#[derive(Debug)]
@@ -43,31 +45,70 @@ fn init(
4345
guard: WorkerGuard,
4446
#[allow(unused_variables)] settings: &settings::Monitoring,
4547
) -> WorkerGuard {
48+
// RUST_LOG ignored when EVERY_CLI is true
49+
let every_cli: bool = std::env::var(EVERY_CLI).is_ok_and(|val| val == "true");
50+
4651
// TODO: Add support for customizing logger(s) / specialzed formatters.
47-
let format_layer = tracing_logfmt::builder()
48-
.with_level(true)
49-
.with_target(true)
50-
.with_span_name(true)
51-
.with_span_path(true)
52-
.with_location(true)
53-
.with_module_path(true)
54-
.layer()
55-
.with_writer(writer);
52+
let format_layer = if every_cli {
53+
tracing_logfmt::builder()
54+
.with_level(true)
55+
.with_target(false)
56+
.with_span_name(false)
57+
.with_span_path(false)
58+
.with_location(false)
59+
.with_module_path(false)
60+
.layer()
61+
.with_writer(writer)
62+
} else {
63+
tracing_logfmt::builder()
64+
.with_level(true)
65+
.with_target(true)
66+
.with_span_name(true)
67+
.with_span_path(true)
68+
.with_location(true)
69+
.with_module_path(true)
70+
.layer()
71+
.with_writer(writer)
72+
};
5673

57-
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
58-
EnvFilter::new("info")
59-
.add_directive("homestar_wasm=info".parse().expect(DIRECTIVE_EXPECT))
60-
.add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT))
74+
let filter = if every_cli {
75+
EnvFilter::new("off")
76+
.add_directive(
77+
"homestar_runtime::runner[run_worker]=info"
78+
.parse()
79+
.expect(DIRECTIVE_EXPECT),
80+
)
81+
.add_directive(
82+
"homestar_runtime::worker[run]=info"
83+
.parse()
84+
.expect(DIRECTIVE_EXPECT),
85+
)
86+
.add_directive(
87+
"homestar_runtime::worker[spawn_workflow_tasks]=info"
88+
.parse()
89+
.expect(DIRECTIVE_EXPECT),
90+
)
6191
.add_directive(
62-
"libp2p_gossipsub::behaviour=info"
92+
"homestar_wasm[wasi_log]=trace"
6393
.parse()
6494
.expect(DIRECTIVE_EXPECT),
6595
)
66-
.add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT))
67-
.add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT))
68-
.add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT))
69-
.add_directive("jsonrpsee=info".parse().expect(DIRECTIVE_EXPECT))
70-
});
96+
} else {
97+
EnvFilter::try_from_default_env().unwrap_or_else(|_| {
98+
EnvFilter::new("info")
99+
.add_directive("homestar_wasm=info".parse().expect(DIRECTIVE_EXPECT))
100+
.add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT))
101+
.add_directive(
102+
"libp2p_gossipsub::behaviour=info"
103+
.parse()
104+
.expect(DIRECTIVE_EXPECT),
105+
)
106+
.add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT))
107+
.add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT))
108+
.add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT))
109+
.add_directive("jsonrpsee=info".parse().expect(DIRECTIVE_EXPECT))
110+
})
111+
};
71112

72113
#[cfg(all(
73114
feature = "console",

homestar-runtime/src/runner.rs

+21-4
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use tokio::{
4040
time,
4141
};
4242
use tokio_util::time::{delay_queue, DelayQueue};
43-
use tracing::{debug, error, info, warn};
43+
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
4444

4545
mod error;
4646
pub(crate) mod file;
@@ -702,6 +702,7 @@ impl Runner {
702702
}
703703
}
704704

705+
#[instrument(skip_all)]
705706
async fn run_worker<S: Into<FastStr>>(
706707
&self,
707708
workflow: Workflow<'static, Arg>,
@@ -767,9 +768,11 @@ impl Runner {
767768
async move { Fetch::get_resources(rscs, workflow_settings).await }.boxed()
768769
};
769770

770-
let handle = self
771-
.runtime
772-
.spawn(worker.run(self.running_tasks(), fetch_fn));
771+
let handle = self.runtime.spawn(
772+
worker
773+
.run(self.running_tasks(), fetch_fn)
774+
.instrument(info_span!("run").or_current()),
775+
);
773776

774777
// Add Cid to expirations timing wheel
775778
let delay_key = self
@@ -790,6 +793,20 @@ impl Runner {
790793
.collect();
791794
let replayed_receipt_info = find_receipt_info_by_pointers(&receipt_pointers, db)?;
792795

796+
// Log replayed receipts if any
797+
if !replayed_receipt_info.is_empty() {
798+
info!(
799+
subject = "workflow.receipts",
800+
category = "workflow",
801+
receipt_cids = replayed_receipt_info
802+
.iter()
803+
.map(|info| info.0.to_string())
804+
.collect::<Vec<String>>()
805+
.join(","),
806+
"replaying receipts",
807+
);
808+
};
809+
793810
Ok(WorkflowData {
794811
info: initial_info,
795812
name: workflow_name,

homestar-runtime/src/tasks/wasm.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use homestar_wasm::{
88
io::{Arg, Output},
99
wasmtime::{world::Env, Error as WasmRuntimeError, State, World},
1010
};
11+
use tracing::Instrument;
1112

1213
#[allow(dead_code)]
1314
#[allow(missing_debug_implementations)]
@@ -32,7 +33,7 @@ impl WasmContext {
3233
args: Args<Arg>,
3334
) -> Result<Output, WasmRuntimeError> {
3435
let env = World::instantiate_with_current_env(bytes, fun_name, &mut self.env).await?;
35-
env.execute(args).await
36+
env.execute(args).in_current_span().await
3637
}
3738
}
3839

homestar-runtime/src/worker.rs

+52-6
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use indexmap::IndexMap;
3636
use libipld::{Cid, Ipld};
3737
use std::{collections::BTreeMap, sync::Arc};
3838
use tokio::task::JoinSet;
39-
use tracing::{debug, error, info};
39+
use tracing::{debug, debug_span, error, info, info_span, instrument, Instrument};
4040

4141
mod poller;
4242
mod resolver;
@@ -157,6 +157,7 @@ where
157157
/// [Instruction]: homestar_invocation::task::Instruction
158158
/// [Swarm]: crate::network::swarm
159159
/// [LinkMap]: homestar_workflow::LinkMap
160+
#[instrument(skip_all)]
160161
pub(crate) async fn run<F>(self, running_tasks: Arc<RunningTaskSet>, fetch_fn: F) -> Result<()>
161162
where
162163
F: FnOnce(FnvHashSet<Resource>) -> BoxFuture<'a, Result<IndexMap<Resource, Vec<u8>>>>,
@@ -169,6 +170,15 @@ where
169170
.await
170171
{
171172
Ok(ctx) => {
173+
let workflow_cid = self.workflow_info.cid.to_string();
174+
175+
info!(
176+
subject = "worker.init_workflow",
177+
category = "worker.run",
178+
workflow_cid,
179+
"initializing workflow"
180+
);
181+
172182
let promises_to_resolve = ctx.scheduler.promises_to_resolve.clone();
173183
let resolver = DHTResolver::new(
174184
promises_to_resolve,
@@ -181,7 +191,7 @@ where
181191
info!(
182192
subject = "worker.resolve_receipts",
183193
category = "worker.run",
184-
workflow_cid = self.workflow_info.cid.to_string(),
194+
workflow_cid,
185195
"resolving receipts in the background"
186196
);
187197
poller::poll(
@@ -196,12 +206,26 @@ where
196206
// Set the workflow status to running.
197207
let conn = &mut self.db.conn()?;
198208
if ctx.scheduler.run_length() > 0 {
209+
info!(
210+
subject = "worker.start_workflow",
211+
category = "worker.run",
212+
workflow_cid,
213+
"starting workflow"
214+
);
215+
199216
Db::set_workflow_status(
200217
self.workflow_info.cid,
201218
workflow::Status::Running,
202219
conn,
203220
)?;
204221
} else {
222+
info!(
223+
subject = "worker.start_workflow",
224+
category = "worker.run",
225+
workflow_cid,
226+
"replaying workflow"
227+
);
228+
205229
Db::set_workflow_status(
206230
self.workflow_info.cid,
207231
workflow::Status::Completed,
@@ -223,6 +247,7 @@ where
223247
}
224248

225249
#[allow(unused_mut)]
250+
#[instrument(skip_all)]
226251
async fn run_queue(
227252
mut self,
228253
mut scheduler: TaskScheduler<'a>,
@@ -321,17 +346,19 @@ where
321346
category = "worker.run",
322347
workflow_cid = workflow_cid.to_string(),
323348
cid = cid.to_string(),
324-
"attempting to resolve cid in workflow"
349+
"attempting to resolve workflow args by cid"
325350
);
326351

327352
cid.resolve(linkmap.clone(), resources.clone(), db.clone())
328353
.boxed()
329354
});
330355

331356
let handle = task_set.spawn(async move {
332-
match resolved.await {
357+
match resolved.await {
333358
Ok(inst_result) => {
334-
match wasm_ctx.run(wasm, &fun, inst_result).await {
359+
match wasm_ctx.run(wasm, &fun, inst_result).instrument({
360+
debug_span!("wasm_run").or_current()
361+
}).await {
335362
Ok(output) => Ok((
336363
output,
337364
instruction_ptr,
@@ -352,7 +379,11 @@ where
352379
})
353380
}
354381
}
355-
});
382+
}
383+
.instrument({
384+
info_span!("spawn_workflow_tasks").or_current()
385+
}));
386+
356387
handles.push(handle);
357388
}
358389
None => error!(
@@ -428,6 +459,13 @@ where
428459
"committed to database"
429460
);
430461

462+
info!(
463+
subject = "worker.receipt",
464+
category = "worker.run",
465+
receipt_cid = stored_receipt.cid().to_string(),
466+
"computed receipt"
467+
);
468+
431469
let _ = self
432470
.event_sender
433471
.send_async(Event::CapturedReceipt(Captured::with(
@@ -442,6 +480,14 @@ where
442480
// Set the workflow status to `completed`
443481
let conn = &mut self.db.conn()?;
444482
Db::set_workflow_status(self.workflow_info.cid, workflow::Status::Completed, conn)?;
483+
484+
info!(
485+
subject = "worker.end_workflow",
486+
category = "worker.run",
487+
workflow_cid = self.workflow_info.cid.to_string(),
488+
"workflow completed"
489+
);
490+
445491
Ok(())
446492
}
447493
}

homestar-runtime/src/worker/resolver.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use tokio::{
2323
sync::RwLock,
2424
time::{timeout_at, Instant},
2525
};
26-
use tracing::debug;
26+
use tracing::{debug, instrument};
2727

2828
pub(crate) trait Resolver {
2929
async fn resolve(
@@ -35,6 +35,7 @@ pub(crate) trait Resolver {
3535
}
3636

3737
impl Resolver for Cid {
38+
#[instrument(level = "debug", name = "cid_resolve", skip_all)]
3839
async fn resolve(
3940
self,
4041
linkmap: Arc<RwLock<LinkMap<task::Result<Arg>>>>,

homestar-wasm/src/wasmtime/host/helpers.rs

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::wasmtime::{
66
};
77
use async_trait::async_trait;
88
use std::time::Instant;
9+
use tracing::instrument;
910

1011
#[async_trait]
1112
impl helpers::Host for State {
@@ -30,6 +31,7 @@ impl helpers::Host for State {
3031
#[async_trait]
3132
impl wasi::logging::logging::Host for State {
3233
/// Log a message, formatted by the runtime subscriber.
34+
#[instrument(name = "wasi_log", skip_all)]
3335
async fn log(
3436
&mut self,
3537
level: wasi::logging::logging::Level,

homestar-wasm/src/wasmtime/world.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use homestar_invocation::{
2020
task::instruction::{Args, Input},
2121
};
2222
use std::{iter, time::Instant};
23+
use tracing::{instrument, Instrument};
2324
use wasmtime::{
2425
component::{self, Component, Func, Instance, Linker},
2526
Config, Engine, Store,
@@ -145,6 +146,7 @@ impl<T> Env<T> {
145146
/// Types must conform to [Wit] IDL types when Wasm was compiled/generated.
146147
///
147148
/// [Wit]: <https://github.com/WebAssembly/component-model/blob/main/design/mvp/WIT.md>
149+
#[instrument(skip_all)]
148150
pub async fn execute(&mut self, args: Args<Arg>) -> Result<Output, Error>
149151
where
150152
T: Send,
@@ -196,13 +198,15 @@ impl<T> Env<T> {
196198
.ok_or(Error::WasmInstantiation)?
197199
.func()
198200
.call_async(&mut self.store, &params, &mut results_alloc)
201+
.in_current_span()
199202
.await?;
200203

201204
self.bindings
202205
.as_mut()
203206
.ok_or(Error::WasmInstantiation)?
204207
.func()
205208
.post_return_async(&mut self.store)
209+
.in_current_span()
206210
.await?;
207211

208212
let results = match &results_alloc[..] {
@@ -415,7 +419,7 @@ fn component_from_bytes(bytes: &[u8], engine: Engine) -> Result<Component, Error
415419
if is_component(chunk) {
416420
Component::from_binary(&engine, bytes).map_err(Error::IntoWasmComponent)
417421
} else {
418-
tracing::info!("Converting Wasm binary into a Wasm component");
422+
tracing::info!("converting Wasm binary into a Wasm component");
419423

420424
let component = ComponentEncoder::default()
421425
.module(bytes)?

0 commit comments

Comments
 (0)