Skip to content

Commit

Permalink
feat(swordfish): Progress Bar (#3571)
Browse files Browse the repository at this point in the history
Uses the https://docs.rs/indicatif/latest/indicatif/index.html library
to make a progress bar.

Tracks rows received + rows emitted for each operator. The counting is
attached to the input / output receivers for each operator, so there
shouldn't be much contention.

By default, the progress bar is printed to stderr. It is also cleared
upon finish. Tested piping it out to stderr, e.g. `python script.py 2 >
errors.log`, and the log was empty.


https://github.com/user-attachments/assets/c494bc2e-afe0-4bfe-9f87-96d1b0dbc577

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent 8f8e210 commit e706caa
Show file tree
Hide file tree
Showing 11 changed files with 553 additions and 44 deletions.
61 changes: 58 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 65 additions & 29 deletions daft/runners/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,48 @@
from daft.execution.execution_step import PartitionTask


class ProgressBar:
def __init__(self, use_ray_tqdm: bool, show_tasks_bar: bool = False, disable: bool = False) -> None:
self.show_tasks_bar = show_tasks_bar
self._maxinterval = 5.0

# Choose the appropriate tqdm module depending on whether we need to use Ray's tqdm
self.use_ray_tqdm = use_ray_tqdm
if use_ray_tqdm:
from ray.experimental.tqdm_ray import tqdm
else:
from tqdm.auto import tqdm as _tqdm
def get_tqdm(use_ray_tqdm: bool) -> Any:
# Choose the appropriate tqdm module depending on whether we need to use Ray's tqdm
if use_ray_tqdm:
from ray.experimental.tqdm_ray import tqdm
else:
from tqdm.auto import tqdm as _tqdm

try:
import sys
try:
import sys

from IPython import get_ipython
from IPython import get_ipython

ipython = get_ipython()
ipython = get_ipython()

# write to sys.stdout if in jupyter notebook
# source: https://github.com/tqdm/tqdm/blob/74722959a8626fd2057be03e14dcf899c25a3fd5/tqdm/autonotebook.py#L14
if ipython is not None and "IPKernelApp" in ipython.config:
# write to sys.stdout if in jupyter notebook
# source: https://github.com/tqdm/tqdm/blob/74722959a8626fd2057be03e14dcf899c25a3fd5/tqdm/autonotebook.py#L14
if ipython is not None and "IPKernelApp" in ipython.config:

class tqdm(_tqdm): # type: ignore[no-redef]
def __init__(self, *args, **kwargs):
kwargs = kwargs.copy()
if "file" not in kwargs:
kwargs["file"] = sys.stdout # avoid the red block in IPython
class tqdm(_tqdm): # type: ignore[no-redef]
def __init__(self, *args, **kwargs):
kwargs = kwargs.copy()
if "file" not in kwargs:
kwargs["file"] = sys.stdout # avoid the red block in IPython

super().__init__(*args, **kwargs)
else:
tqdm = _tqdm
except ImportError:
super().__init__(*args, **kwargs)
else:
tqdm = _tqdm
except ImportError:
tqdm = _tqdm

return tqdm

self.tqdm_mod = tqdm

self.pbars: dict[int, tqdm] = dict()
class ProgressBar:
def __init__(self, use_ray_tqdm: bool, show_tasks_bar: bool = False, disable: bool = False) -> None:
self.show_tasks_bar = show_tasks_bar
self._maxinterval = 5.0

self.use_ray_tqdm = use_ray_tqdm
self.tqdm_mod = get_tqdm(use_ray_tqdm)

self.pbars: dict[int, Any] = dict()
self.disable = (
disable
or not bool(int(os.environ.get("RAY_TQDM", "1")))
Expand Down Expand Up @@ -102,3 +106,35 @@ def close(self) -> None:
p.clear()
p.close()
del p


# Progress Bar for local execution, should only be used in the native executor
class SwordfishProgressBar:
def __init__(self) -> None:
self._maxinterval = 5.0
self.tqdm_mod = get_tqdm(False)
self.pbars: dict[int, Any] = dict()

def make_new_bar(self, bar_format: str, initial_message: str) -> int:
pbar_id = len(self.pbars)
self.pbars[pbar_id] = self.tqdm_mod(
bar_format=bar_format,
desc=initial_message,
position=pbar_id,
leave=False,
mininterval=1.0,
maxinterval=self._maxinterval,
)
return pbar_id

def update_bar(self, pbar_id: int, message: str) -> None:
self.pbars[pbar_id].set_description_str(message)

def close_bar(self, pbar_id: int) -> None:
self.pbars[pbar_id].close()
del self.pbars[pbar_id]

def close(self) -> None:
for p in self.pbars.values():
p.close()
del p
1 change: 1 addition & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ daft-table = {path = "../daft-table", default-features = false}
daft-writers = {path = "../daft-writers", default-features = false}
futures = {workspace = true}
indexmap = {workspace = true}
indicatif = "0.17.9"
lazy_static = {workspace = true}
log = {workspace = true}
loole = "0.4.0"
Expand Down
12 changes: 11 additions & 1 deletion src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
},
dispatcher::{DispatchSpawner, RoundRobinDispatcher, UnorderedDispatcher},
pipeline::PipelineNode,
progress_bar::ProgressBarColor,
runtime_stats::{CountingReceiver, CountingSender, RuntimeStatsContext},
ExecutionRuntimeContext, OperatorOutput, PipelineExecutionSnafu, NUM_CPUS,
};
Expand Down Expand Up @@ -201,19 +202,27 @@ impl PipelineNode for IntermediateNode {
runtime_handle: &mut ExecutionRuntimeContext,
) -> crate::Result<Receiver<Arc<MicroPartition>>> {
let mut child_result_receivers = Vec::with_capacity(self.children.len());
let progress_bar = runtime_handle.make_progress_bar(
self.name(),
ProgressBarColor::Magenta,
true,
self.runtime_stats.clone(),
);
for child in &self.children {
let child_result_receiver = child.start(maintain_order, runtime_handle)?;
child_result_receivers.push(CountingReceiver::new(
child_result_receiver,
self.runtime_stats.clone(),
progress_bar.clone(),
));
}
let op = self.intermediate_op.clone();
let num_workers = op.max_concurrency().context(PipelineExecutionSnafu {
node_name: self.name(),
})?;
let (destination_sender, destination_receiver) = create_channel(1);
let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone());
let counting_sender =
CountingSender::new(destination_sender, self.runtime_stats.clone(), progress_bar);

let dispatch_spawner = self
.intermediate_op
Expand Down Expand Up @@ -246,6 +255,7 @@ impl PipelineNode for IntermediateNode {
);
Ok(destination_receiver)
}

fn as_tree_display(&self) -> &dyn TreeDisplay {
self
}
Expand Down
40 changes: 39 additions & 1 deletion src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod channel;
mod dispatcher;
mod intermediate_ops;
mod pipeline;
mod progress_bar;
mod run;
mod runtime_stats;
mod sinks;
Expand All @@ -15,13 +16,16 @@ mod state_bridge;
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use common_error::{DaftError, DaftResult};
use common_runtime::RuntimeTask;
use lazy_static::lazy_static;
use progress_bar::{OperatorProgressBar, ProgressBarColor, ProgressBarManager};
pub use run::{run_local, ExecutionEngineResult, NativeExecutor};
use runtime_stats::RuntimeStatsContext;
use snafu::{futures::TryFutureExt, ResultExt, Snafu};

lazy_static! {
Expand Down Expand Up @@ -116,14 +120,19 @@ impl RuntimeHandle {
pub struct ExecutionRuntimeContext {
worker_set: TaskSet<crate::Result<()>>,
default_morsel_size: usize,
progress_bar_manager: Option<Box<dyn ProgressBarManager>>,
}

impl ExecutionRuntimeContext {
#[must_use]
pub fn new(default_morsel_size: usize) -> Self {
pub fn new(
default_morsel_size: usize,
progress_bar_manager: Option<Box<dyn ProgressBarManager>>,
) -> Self {
Self {
worker_set: TaskSet::new(),
default_morsel_size,
progress_bar_manager,
}
}
pub fn spawn(
Expand All @@ -149,11 +158,40 @@ impl ExecutionRuntimeContext {
self.default_morsel_size
}

pub fn make_progress_bar(
&self,
prefix: &str,
color: ProgressBarColor,
show_received: bool,
runtime_stats: Arc<RuntimeStatsContext>,
) -> Option<Arc<OperatorProgressBar>> {
if let Some(ref pb_manager) = self.progress_bar_manager {
let pb = pb_manager
.make_new_bar(color, prefix, show_received)
.unwrap();
Some(Arc::new(OperatorProgressBar::new(
pb,
runtime_stats,
show_received,
)))
} else {
None
}
}

pub(crate) fn handle(&self) -> RuntimeHandle {
RuntimeHandle(tokio::runtime::Handle::current())
}
}

impl Drop for ExecutionRuntimeContext {
fn drop(&mut self) {
if let Some(pbm) = self.progress_bar_manager.take() {
let _ = pbm.close_all();
}
}
}

#[cfg(feature = "python")]
use pyo3::prelude::*;

Expand Down
Loading

0 comments on commit e706caa

Please sign in to comment.