From f3759786997ee57b4e231cb01adad274b9415b44 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Tue, 13 Apr 2021 16:48:15 -0500 Subject: [PATCH] Config options --- Cargo.toml | 2 +- .../src/program/config.rs | 174 ++++++++++ .../differential_datalog/src/program/mod.rs | 110 +++---- .../src/program/worker.rs | 298 ++++++++++-------- .../src/render/arrange_by.rs | 9 +- .../differential_datalog/src/render/mod.rs | 13 +- rust/template/src/api/mod.rs | 7 +- src/Language/DifferentialDatalog/Compile.hs | 1 + 8 files changed, 415 insertions(+), 199 deletions(-) create mode 100644 rust/template/differential_datalog/src/program/config.rs diff --git a/Cargo.toml b/Cargo.toml index 3a4ae884b..90a871434 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [ "rust/template/ovsdb", "rust/template/cmd_parser", "rust/template/ddlog_derive", - "rust/template/distributed_datalog", + #"rust/template/distributed_datalog", "rust/template/differential_datalog", "rust/template/differential_datalog_test", ] diff --git a/rust/template/differential_datalog/src/program/config.rs b/rust/template/differential_datalog/src/program/config.rs new file mode 100644 index 000000000..6be13821c --- /dev/null +++ b/rust/template/differential_datalog/src/program/config.rs @@ -0,0 +1,174 @@ +//! Configuration for DDlog programs + +use crate::{ + profile::Profile, + program::{worker::ProfilingData, Program, PROF_MSG_BUF_SIZE}, +}; +use differential_dataflow::Config as DDFlowConfig; +use std::{ + env, + sync::{atomic::AtomicBool, Arc, Mutex}, + thread::{self, JoinHandle}, +}; +use timely::Config as TimelyConfig; + +/// The configuration for a DDlog program +#[derive(Debug, Clone, Copy)] +pub struct Config { + /// The number of timely + pub num_timely_workers: usize, + /// Whether extra regions should be added to the dataflow + /// + /// These extra regions *significantly* help with the readability + /// of the generated dataflows at the cost of a minor performance + /// penalty. Best used with [`ProfilingKind::TimelyProfiling`] + /// in order to see the + pub enable_debug_regions: bool, + /// The kind of profiling to enable + pub profiling_kind: ProfilingKind, + /// An amount of arrangement effort to spend each scheduling quantum + /// + /// See [`differential_dataflow::Config`] + pub differential_idle_merge_effort: Option, +} + +impl Config { + /// Create a new [`Config`] with the default settings + pub fn new() -> Self { + Self { + num_timely_workers: 1, + enable_debug_regions: false, + profiling_kind: ProfilingKind::default(), + differential_idle_merge_effort: None, + } + } + + pub(super) fn timely_config(&self) -> Result { + let mut config = TimelyConfig::process(self.num_timely_workers); + + // Allow configuring the merge behavior of ddflow + let idle_merge_effort = if self.differential_idle_merge_effort.is_some() { + self.differential_idle_merge_effort + + // Support for previous users who rely on the `DIFFERENTIAL_EAGER_MERGE` variable + // TODO: Remove the env var and expose this in all user apis + } else if let Ok(value) = env::var("DIFFERENTIAL_EAGER_MERGE") { + if value.is_empty() { + None + } else { + let merge_effort: isize = value.parse().map_err(|_| { + "the `DIFFERENTIAL_EAGER_MERGE` variable must be set to an integer value" + .to_owned() + })?; + + Some(merge_effort) + } + } else { + None + }; + + differential_dataflow::configure(&mut config.worker, &DDFlowConfig { idle_merge_effort }); + + Ok(config) + } +} + +impl Default for Config { + fn default() -> Self { + Self::new() + } +} + +/// The kind of profiling to be enabled for DDlog +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ProfilingKind { + /// Disable all profiling + None, + /// Enable self-profiling + /// + /// Note: This spawns an additional thread and can have a + /// performance impact on the target program and also disables + /// general-purpose Timely Dataflow and Differential Dataflow + /// profiling + SelfProfiling, + /// Enable profiling for Timely Dataflow + TimelyProfiling { + /// Enable profiling for Differential Dataflow as well as Timely + differential_dataflow: bool, + }, +} + +impl ProfilingKind { + /// Returns `true` if the profiling_kind is [`None`] + pub const fn is_none(&self) -> bool { + matches!(self, Self::None) + } + + /// Returns `true` if the profiling_kind is [`SelfProfiling`] + pub const fn is_self_profiling(&self) -> bool { + matches!(self, Self::SelfProfiling) + } + + /// Returns `true` if the profiling_kind is [`TimelyProfiling`] + pub const fn is_timely_profiling(&self) -> bool { + matches!(self, Self::TimelyProfiling { .. }) + } +} + +impl Default for ProfilingKind { + fn default() -> Self { + Self::None + } +} + +#[derive(Debug)] +pub(super) struct SelfProfilingRig { + pub(super) profile: Option>>, + pub(super) profile_thread: Option>, + pub(super) profiling_data: Option, + pub(super) profile_cpu: Option>, + pub(super) profile_timely: Option>, +} + +impl SelfProfilingRig { + /// Create a new self profiling rig + /// + /// Note: Spawns a worker thread to process profiling messages + pub(super) fn new(config: &Config) -> Self { + if config.profiling_kind.is_self_profiling() { + let (profile_send, profile_recv) = crossbeam_channel::bounded(PROF_MSG_BUF_SIZE); + + // Profiling data structure + let profile = Arc::new(Mutex::new(Profile::new())); + + let (profile_cpu, profile_timely) = ( + Arc::new(AtomicBool::new(false)), + Arc::new(AtomicBool::new(false)), + ); + + // Thread to collect profiling data. + let cloned_profile = profile.clone(); + let profile_thread = + thread::spawn(move || Program::prof_thread_func(profile_recv, cloned_profile)); + + let profiling_data = + ProfilingData::new(profile_cpu.clone(), profile_timely.clone(), profile_send); + + Self { + profile: Some(profile), + profile_thread: Some(profile_thread), + profiling_data: Some(profiling_data), + profile_cpu: Some(profile_cpu), + profile_timely: Some(profile_timely), + } + } else { + Self { + profile: None, + profile_thread: None, + profiling_data: None, + profile_cpu: None, + profile_timely: None, + } + } + } +} diff --git a/rust/template/differential_datalog/src/program/mod.rs b/rust/template/differential_datalog/src/program/mod.rs index 7f9da1089..7fb2fef5d 100644 --- a/rust/template/differential_datalog/src/program/mod.rs +++ b/rust/template/differential_datalog/src/program/mod.rs @@ -13,6 +13,7 @@ // TODO: single input relation pub mod arrange; +pub mod config; mod timestamp; mod update; mod worker; @@ -25,11 +26,15 @@ use crate::{ ddval::*, profile::*, record::Mutator, - render::arrange_by::{ArrangeBy, ArrangementKind}, + render::{ + arrange_by::{ArrangeBy, ArrangementKind}, + RenderContext, + }, }; use arrange::{ antijoin_arranged, Arrangement as DataflowArrangement, ArrangementFlavor, Arrangements, }; +use config::{Config, SelfProfilingRig}; use crossbeam_channel::{Receiver, Sender}; use fnv::{FnvHashMap, FnvHashSet}; use std::{ @@ -37,7 +42,6 @@ use std::{ borrow::Cow, cmp, collections::{hash_map, BTreeSet}, - env, fmt::{self, Debug, Formatter}, iter::{self, Cycle, Skip}, ops::Range, @@ -45,10 +49,10 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, Mutex, }, - thread::{self, JoinHandle}, + thread::JoinHandle, }; use timestamp::ToTupleTS; -use worker::{DDlogWorker, ProfilingData}; +use worker::DDlogWorker; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::arrangement::Arranged; @@ -58,20 +62,17 @@ use differential_dataflow::trace::implementations::ord::OrdKeySpine as DefaultKe use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace; use differential_dataflow::trace::wrappers::enter::TraceEnter; use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; -use differential_dataflow::{Collection, Config as DDFlowConfig}; +use differential_dataflow::Collection; use dogsdogsdogs::{ altneu::AltNeu, calculus::{Differentiate, Integrate}, operators::lookup_map, }; +use timely::communication::{initialize::WorkerGuards, Allocator}; use timely::dataflow::scopes::*; use timely::order::TotalOrder; use timely::progress::{timestamp::Refines, PathSummary, Timestamp}; use timely::worker::Worker; -use timely::{ - communication::{initialize::WorkerGuards, Allocator}, - execute::Config as TimelyConfig, -}; type ValTrace = DefaultValTrace; type KeyTrace = DefaultKeyTrace; @@ -776,6 +777,7 @@ impl Arrangement { fn build_arrangement_root( &self, + render_context: &RenderContext, collection: &Collection, ) -> DataflowArrangement, TKeyAgent> where @@ -804,11 +806,12 @@ impl Arrangement { kind, target_relation: self.name().into(), } - .render_root(collection) + .render_root(render_context, collection) } fn build_arrangement( &self, + render_context: &RenderContext, collection: &Collection, ) -> DataflowArrangement, TKeyAgent> where @@ -836,7 +839,7 @@ impl Arrangement { kind, target_relation: self.name().into(), } - .render(collection) + .render(render_context, collection) } } @@ -873,13 +876,13 @@ pub struct RunningProgram { need_to_flush: bool, timestamp: TS, /// CPU profiling enabled (can be expensive). - profile_cpu: Arc, + profile_cpu: Option>, /// Consume timely_events and output them to CSV file. Can be expensive. - profile_timely: Arc, + profile_timely: Option>, /// Profiling thread. prof_thread_handle: Option>, /// Profiling statistics. - pub profile: Arc>, + pub profile: Option>>, worker_round_robbin: Skip>>, } @@ -988,70 +991,49 @@ enum Reply { } impl Program { - /// Instantiate the program with `nworkers` timely threads. + /// Instantiate the program with `number_workers` timely threads. pub fn run(&self, number_workers: usize) -> Result { + let config = Config { + num_timely_workers: number_workers, + ..Default::default() + }; + + self.run_with_config(config) + } + + /// Initialize the program with the given configuration + pub fn run_with_config(&self, config: Config) -> Result { // Setup channels to communicate with the dataflow. // We use async channels to avoid deadlocks when workers are parked in // `step_or_park`. This has the downside of introducing an unbounded buffer // that is only guaranteed to be fully flushed when the transaction commits. - let (request_send, request_recv): (Vec<_>, Vec<_>) = (0..number_workers) + let (request_send, request_recv): (Vec<_>, Vec<_>) = (0..config.num_timely_workers) .map(|_| crossbeam_channel::unbounded::()) .unzip(); let request_recv = Arc::from(request_recv); // Channels for responses from worker threads. - let (reply_send, reply_recv): (Vec<_>, Vec<_>) = (0..number_workers) + let (reply_send, reply_recv): (Vec<_>, Vec<_>) = (0..config.num_timely_workers) .map(|_| crossbeam_channel::unbounded::()) .unzip(); let reply_send = Arc::from(reply_send); - let (prof_send, prof_recv) = crossbeam_channel::bounded(PROF_MSG_BUF_SIZE); - - // Profile data structure - let profile = Arc::new(Mutex::new(Profile::new())); - let (profile_cpu, profile_timely) = ( - Arc::new(AtomicBool::new(false)), - Arc::new(AtomicBool::new(false)), - ); - - // Thread to collect profiling data. - let cloned_profile = profile.clone(); - let prof_thread = thread::spawn(move || Self::prof_thread_func(prof_recv, cloned_profile)); + let profiling_rig = SelfProfilingRig::new(&config); // Clone the program so that it can be moved into the timely computation let program = Arc::new(self.clone()); - let profiling = ProfilingData::new(profile_cpu.clone(), profile_timely.clone(), prof_send); - - let mut config = TimelyConfig::process(number_workers); - - // Allow configuring the merge behavior of ddflow - // FIXME: Expose the merge behavior to all apis and deprecate the env var - if let Ok(value) = env::var("DIFFERENTIAL_EAGER_MERGE") { - let idle_merge_effort = if value.is_empty() { - None - } else { - let merge_effort: isize = value.parse().map_err(|_| { - "the `DIFFERENTIAL_EAGER_MERGE` variable must be set to an integer value" - .to_owned() - })?; - - Some(merge_effort) - }; - - differential_dataflow::configure( - &mut config.worker, - &DDFlowConfig { idle_merge_effort }, - ); - } + let timely_config = config.timely_config()?; + let (worker_config, profiling_data) = (config, profiling_rig.profiling_data.clone()); // Start up timely computation. let worker_guards = timely::execute( - config, + timely_config, move |worker: &mut Worker| -> Result<_, String> { let worker = DDlogWorker::new( worker, + worker_config, program.clone(), - profiling.clone(), + profiling_data.clone(), Arc::clone(&request_recv), Arc::clone(&reply_send), ); @@ -1116,11 +1098,11 @@ impl Program { transaction_in_progress: false, need_to_flush: false, timestamp: 1, - profile_cpu, - profile_timely, - prof_thread_handle: Some(prof_thread), - profile, - worker_round_robbin: (0..number_workers).cycle().skip(0), + profile_cpu: profiling_rig.profile_cpu, + profile_timely: profiling_rig.profile_timely, + prof_thread_handle: profiling_rig.profile_thread, + profile: profiling_rig.profile, + worker_round_robbin: (0..config.num_timely_workers).cycle().skip(0), }; // Wait for the initial transaction to complete. running_program.await_flush_ack()?; @@ -1977,11 +1959,17 @@ impl RunningProgram { /// `enable = true` - enables forwarding. This can be expensive in large dataflows. /// `enable = false` - disables forwarding. pub fn enable_cpu_profiling(&self, enable: bool) { - self.profile_cpu.store(enable, Ordering::SeqCst); + if let Some(profile_cpu) = self.profile_cpu.as_ref() { + profile_cpu.store(enable, Ordering::SeqCst); + } + // TODO: Log warning if self profiling is disabled } pub fn enable_timely_profiling(&self, enable: bool) { - self.profile_timely.store(enable, Ordering::SeqCst); + if let Some(profile_timely) = self.profile_timely.as_ref() { + profile_timely.store(enable, Ordering::SeqCst); + } + // TODO: Log warning if self profiling is disabled } /// Terminate program, killing all worker threads. diff --git a/rust/template/differential_datalog/src/program/worker.rs b/rust/template/differential_datalog/src/program/worker.rs index e8099b9ce..b3af3249a 100644 --- a/rust/template/differential_datalog/src/program/worker.rs +++ b/rust/template/differential_datalog/src/program/worker.rs @@ -7,8 +7,10 @@ use crate::{ }, program::{ arrange::{Arrangement, Arrangements}, + config::{Config, ProfilingKind}, ArrId, Dep, Msg, ProgNode, Program, Reply, Update, TS, }, + render::RenderContext, variable::Variable, }; use crossbeam_channel::{Receiver, Sender}; @@ -31,6 +33,7 @@ use fnv::{FnvBuildHasher, FnvHashMap}; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, mem, + net::TcpStream, ops::Deref, rc::Rc, sync::{ @@ -76,11 +79,13 @@ type DelayedVarMap = FnvHashMap< pub struct DDlogWorker<'a> { /// The timely worker instance worker: &'a mut Worker, + /// The DDlog program's configuration settings + config: Config, /// The program this worker is executing program: Arc, /// Information on which metrics are enabled and a /// channel for sending profiling data - profiling: ProfilingData, + profiling: Option, /// The current worker's receiver for receiving messages request_receiver: Receiver, /// The current worker's sender for sending messages @@ -92,8 +97,9 @@ impl<'a> DDlogWorker<'a> { #[allow(clippy::too_many_arguments)] pub(super) fn new( worker: &'a mut Worker, + config: Config, program: Arc, - profiling: ProfilingData, + profiling: Option, request_receivers: Arc<[Receiver]>, reply_senders: Arc<[Sender]>, ) -> Self { @@ -101,6 +107,7 @@ impl<'a> DDlogWorker<'a> { Self { worker, + config, program, profiling, request_receiver: request_receivers[worker_index].clone(), @@ -387,66 +394,89 @@ impl<'a> DDlogWorker<'a> { } /// Initialize timely and differential profiling logging hooks - fn init_profiling(&self) { - let profiling = self.profiling.clone(); - self.worker - .log_register() - .insert::("timely", move |_time, data| { - let profile_cpu = profiling.is_cpu_enabled(); - let profile_timely = profiling.is_timely_enabled(); - - // Filter out events we don't care about to avoid the overhead of sending - // the event around just to drop it eventually. - let filtered: Vec<((Duration, usize, TimelyEvent), Option)> = data - .drain(..) - .filter(|event| { - match event.2 { - // Always send Operates events as they're used for always-on memory profiling. - TimelyEvent::Operates(_) => true, - - // Send scheduling events if profiling is enabled - TimelyEvent::Schedule(_) => profile_cpu || profile_timely, - - // Send timely events if timely profiling is enabled - TimelyEvent::GuardedMessage(_) - | TimelyEvent::Messages(_) - | TimelyEvent::Park(_) - | TimelyEvent::PushProgress(_) => profile_timely, - - _ => false, - } - }) - .map(|(d, s, e)| match e { - // Only Operate events care about the context string. - TimelyEvent::Operates(_) => ((d, s, e), Some(get_prof_context())), - _ => ((d, s, e), None), - }) - .collect(); + fn init_profiling(&mut self) { + if let Some(profiling) = self.profiling.clone() { + let timely_profiling = profiling.clone(); + self.worker + .log_register() + .insert::("timely", move |_time, data| { + let profile_cpu = timely_profiling.is_cpu_enabled(); + let profile_timely = timely_profiling.is_timely_enabled(); + + // Filter out events we don't care about to avoid the overhead of sending + // the event around just to drop it eventually. + let filtered: Vec<((Duration, usize, TimelyEvent), Option)> = data + .drain(..) + .filter(|event| { + match event.2 { + // Always send Operates events as they're used for always-on memory profiling. + TimelyEvent::Operates(_) => true, + + // Send scheduling events if profiling is enabled + TimelyEvent::Schedule(_) => profile_cpu || profile_timely, + + // Send timely events if timely profiling is enabled + TimelyEvent::GuardedMessage(_) + | TimelyEvent::Messages(_) + | TimelyEvent::Park(_) + | TimelyEvent::PushProgress(_) => profile_timely, + + _ => false, + } + }) + .map(|(d, s, e)| match e { + // Only Operate events care about the context string. + TimelyEvent::Operates(_) => ((d, s, e), Some(get_prof_context())), + _ => ((d, s, e), None), + }) + .collect(); + + // If there are any profiling events, record them + if !filtered.is_empty() { + timely_profiling.record(ProfMsg::TimelyMessage( + filtered, + profile_cpu, + profile_timely, + )); + } + }); - // If there are any profiling events, record them - if !filtered.is_empty() { - profiling.record(ProfMsg::TimelyMessage( - filtered, - profile_cpu, - profile_timely, - )); - } - }); - - let profiling = self.profiling.clone(); - self.worker.log_register().insert::( - "differential/arrange", - move |_time, data| { - // If there are events, send them through the profiling channel - if !data.is_empty() { - profiling.record(ProfMsg::DifferentialMessage(mem::take(data))); + self.worker.log_register().insert::( + "differential/arrange", + move |_time, data| { + // If there are events, send them through the profiling channel + if !data.is_empty() { + profiling.record(ProfMsg::DifferentialMessage(mem::take(data))); + } + }, + ); + } else if let ProfilingKind::TimelyProfiling { + differential_dataflow, + } = self.config.profiling_kind + { + if differential_dataflow { + if let Ok(addr) = ::std::env::var("DIFFERENTIAL_LOG_ADDR") { + if !addr.is_empty() { + if let Ok(stream) = TcpStream::connect(&addr) { + differential_dataflow::logging::enable(self.worker, stream); + // TODO: Use tracing to log that logging connected successfully + } else { + panic!("Could not connect to differential log address: {:?}", addr); + } + } } - }, - ); + } + + // Timely already has its logging hooks set by default + } else if self.config.profiling_kind.is_none() { + self.worker.log_register().remove("timely"); + self.worker.log_register().remove("differential/arrange"); + } } fn session_dataflow(&mut self, mut probe: ProbeHandle) -> Result { let program = self.program.clone(); + let render_context = RenderContext::new(self.config); self.worker.dataflow::( |outer: &mut Child, TS>| -> Result<_, String> { @@ -459,7 +489,10 @@ impl<'a> DDlogWorker<'a> { program.nodes.len(), FnvBuildHasher::default(), ); - let mut arrangements = FnvHashMap::default(); + let mut arrangements: FnvHashMap< + ArrId, + Arrangement<_, Weight, TValAgent, TKeyAgent>, + > = FnvHashMap::default(); // Create an `Enabled` relation used to enforce the dataflow termination in the // presence of delayed relations. A delayed relation can potentially generate an @@ -507,6 +540,7 @@ impl<'a> DDlogWorker<'a> { rel, outer, &*program, + &render_context, &mut sessions, &mut collections, &mut arrangements, @@ -523,6 +557,7 @@ impl<'a> DDlogWorker<'a> { node_id, outer, &*program, + &render_context, &mut sessions, &mut collections, &mut arrangements, @@ -592,90 +627,83 @@ impl<'a> DDlogWorker<'a> { } } -fn render_relation<'a>( +// TODO: Add back regions for relations +fn render_relation<'a, S>( relation: &Relation, // TODO: Shift to generic representations for ddflow-related structs - scope: &mut Child<'a, Worker, TS>, + scope: &mut S, program: &Program, + render_context: &RenderContext, sessions: &mut FnvHashMap>, - collections: &mut FnvHashMap< - RelId, - Collection, TS>, DDValue, Weight>, - >, - arrangements: &mut FnvHashMap< - ArrId, - Arrangement, TS>, Weight, TValAgent, TKeyAgent>, - >, - delayed_vars: &DelayedVarMap, TS>>, -) { - scope.clone().region_named(relation.name(), |region| { - // Relation may already be in the map if it was created by an `Apply` node - let mut collection = if let Some(collection) = collections.remove(&relation.id) { - collection.enter_region(region) - } else { - let (session, collection) = scope.new_collection::(); - sessions.insert(relation.id, session); - - // TODO: Find a way to make the collection within the nested region - collection.enter_region(region) + collections: &mut FnvHashMap>, + arrangements: &mut FnvHashMap, TKeyAgent>>, + delayed_vars: &DelayedVarMap, +) where + S: Scope, +{ + // Relation may already be in the map if it was created by an `Apply` node + let mut collection = if let Some(collection) = collections.remove(&relation.id) { + collection + } else { + let (session, collection) = scope.new_collection::(); + sessions.insert(relation.id, session); + + // TODO: Find a way to make the collection within the nested region + collection + }; + + let entered_arrangements: FnvHashMap<_, ArrangementFlavor<_, TS>> = arrangements + .iter() + .map(|(&arr_id, arr)| (arr_id, ArrangementFlavor::Local(arr.clone()))) + .collect(); + + // apply rules + // TODO: Regions for rules + let rule_collections = relation.rules.iter().map(|rule| { + let get_rule_collection = |relation_id| { + if let Some(collection) = collections.get(&relation_id) { + Some(collection.clone()) + } else { + delayed_vars + .get(&relation_id) + .map(|(_, _, collection)| collection.clone()) + } }; - let entered_arrangements: FnvHashMap<_, ArrangementFlavor<_, TS>> = arrangements - .iter() - .map(|(&arr_id, arr)| (arr_id, ArrangementFlavor::Local(arr.enter_region(region)))) - .collect(); - - // apply rules - // TODO: Regions for rules - let rule_collections = relation.rules.iter().map(|rule| { - let get_rule_collection = |relation_id| { - if let Some(collection) = collections.get(&relation_id) { - Some(collection.enter_region(region)) - } else { - delayed_vars - .get(&relation_id) - .map(|(_, _, collection)| collection.enter_region(region)) - } - }; + program.mk_rule( + rule, + get_rule_collection, + Arrangements { + arrangements: &entered_arrangements, + }, + ) + }); - program.mk_rule( - rule, - get_rule_collection, - Arrangements { - arrangements: &entered_arrangements, - }, - ) + if rule_collections.len() > 0 { + collection = with_prof_context(&format!("concatenate rules for {}", relation.name), || { + collection.concatenate(rule_collections) }); + } - if rule_collections.len() > 0 { - collection = - with_prof_context(&format!("concatenate rules for {}", relation.name), || { - collection.concatenate(rule_collections) - }); - } - - // don't distinct input collections, as this is already done by the set_update logic - if !relation.input && relation.distinct { - collection = with_prof_context(&format!("{}.threshold_total", relation.name), || { - collection.threshold_total(|_, c| if *c == 0 { 0 } else { 1 }) - }); - } + // don't distinct input collections, as this is already done by the set_update logic + if !relation.input && relation.distinct { + collection = with_prof_context(&format!("{}.threshold_total", relation.name), || { + collection.threshold_total(|_, c| if *c == 0 { 0 } else { 1 }) + }); + } - // create arrangements - // TODO: Arrangements have their own shebang, region them off too - for (arr_id, arrangement) in relation.arrangements.iter().enumerate() { - with_prof_context(arrangement.name(), || { - arrangements.insert( - (relation.id, arr_id), - arrangement - .build_arrangement_root(&collection) - .leave_region(), - ) - }); - } + // create arrangements + // TODO: Arrangements have their own shebang, region them off too + for (arr_id, arrangement) in relation.arrangements.iter().enumerate() { + with_prof_context(arrangement.name(), || { + arrangements.insert( + (relation.id, arr_id), + arrangement.build_arrangement_root(&render_context, &collection), + ) + }); + } - collections.insert(relation.id, collection.leave_region()); - }); + collections.insert(relation.id, collection); } // TODO: Regions for SCCs @@ -685,6 +713,7 @@ fn render_scc<'a>( // TODO: Shift to generic representations for ddflow-related structs scope: &mut Child<'a, Worker, TS>, program: &Program, + render_context: &RenderContext, sessions: &mut FnvHashMap>, collections: &mut FnvHashMap< RelId, @@ -747,7 +776,7 @@ fn render_scc<'a>( with_prof_context(&format!("local {}", arr.name()), || { local_arrangements.insert( (rel.rel.id, i), - arr.build_arrangement(&*vars.get(&rel.rel.id)?), + arr.build_arrangement(render_context, &*vars.get(&rel.rel.id)?), ) }); } @@ -857,7 +886,10 @@ fn render_scc<'a>( format!("no collection found for relation ID {}", rel.rel.id) })?; - Ok(arrangements.insert((rel.rel.id, i), arr.build_arrangement(collection))) + Ok(arrangements.insert( + (rel.rel.id, i), + arr.build_arrangement(render_context, collection), + )) }, )?; } @@ -867,7 +899,7 @@ fn render_scc<'a>( Ok(()) } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct ProfilingData { /// Whether CPU profiling is enabled cpu_enabled: Arc, diff --git a/rust/template/differential_datalog/src/render/arrange_by.rs b/rust/template/differential_datalog/src/render/arrange_by.rs index 2f6169881..fa704c009 100644 --- a/rust/template/differential_datalog/src/render/arrange_by.rs +++ b/rust/template/differential_datalog/src/render/arrange_by.rs @@ -2,7 +2,7 @@ use crate::{ dataflow::{diff_distinct, FilterMap, MapExt}, ddval::DDValue, program::arrange::Arrangement, - render::{Offset, Str, TraceKey, TraceValue}, + render::{Offset, RenderContext, Str, TraceKey, TraceValue}, }; use differential_dataflow::{ difference::Abelian, @@ -52,7 +52,11 @@ type Arranged = Arrangement>, TraceAgent>>; impl<'a> ArrangeBy<'a> { - pub fn render(&self, collection: &Collection) -> Arranged + pub fn render( + &self, + _context: &RenderContext, + collection: &Collection, + ) -> Arranged where S: Scope, S::Timestamp: Lattice, @@ -108,6 +112,7 @@ impl<'a> ArrangeBy<'a> { pub fn render_root( &self, + _context: &RenderContext, collection: &Collection, ) -> Arrangement>, TraceAgent>> where diff --git a/rust/template/differential_datalog/src/render/mod.rs b/rust/template/differential_datalog/src/render/mod.rs index 9fb4bdec7..19d2157b5 100644 --- a/rust/template/differential_datalog/src/render/mod.rs +++ b/rust/template/differential_datalog/src/render/mod.rs @@ -1,4 +1,4 @@ -use crate::ddval::DDValue; +use crate::{ddval::DDValue, program::config::Config}; use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine}; use std::borrow::Cow; use timely::dataflow::ScopeParent; @@ -13,3 +13,14 @@ pub type TraceValue = OrdValSpine::Timestamp, R, O>; pub type TraceKey = OrdKeySpine::Timestamp, R, O>; + +#[derive(Debug)] +pub struct RenderContext { + pub config: Config, +} + +impl RenderContext { + pub const fn new(config: Config) -> Self { + Self { config } + } +} diff --git a/rust/template/src/api/mod.rs b/rust/template/src/api/mod.rs index 899deb8a7..7a6d2eeb4 100644 --- a/rust/template/src/api/mod.rs +++ b/rust/template/src/api/mod.rs @@ -244,7 +244,12 @@ impl DDlogProfiling for HDDlog { fn profile(&self) -> Result { self.record_command(|r| r.profile()); let rprog = self.prog.lock().unwrap(); - let profile: String = rprog.profile.lock().unwrap().to_string(); + let profile: String = rprog + .profile + .as_ref() + .map(|profile| profile.lock().unwrap().to_string()) + .unwrap_or_else(String::new); + Ok(profile) } } diff --git a/src/Language/DifferentialDatalog/Compile.hs b/src/Language/DifferentialDatalog/Compile.hs index b5ed191d2..59485daa4 100644 --- a/src/Language/DifferentialDatalog/Compile.hs +++ b/src/Language/DifferentialDatalog/Compile.hs @@ -210,6 +210,7 @@ rustLibFiles = , ("differential_datalog/src/program/arrange.rs" , $(embedFile "rust/template/differential_datalog/src/program/arrange.rs")) , ("differential_datalog/src/program/timestamp.rs" , $(embedFile "rust/template/differential_datalog/src/program/timestamp.rs")) , ("differential_datalog/src/program/worker.rs" , $(embedFile "rust/template/differential_datalog/src/program/worker.rs")) + , ("differential_datalog/src/program/config.rs" , $(embedFile "rust/template/differential_datalog/src/program/config.rs")) , ("differential_datalog/src/record/mod.rs" , $(embedFile "rust/template/differential_datalog/src/record/mod.rs")) , ("differential_datalog/src/record/tuples.rs" , $(embedFile "rust/template/differential_datalog/src/record/tuples.rs")) , ("differential_datalog/src/record/arrays.rs" , $(embedFile "rust/template/differential_datalog/src/record/arrays.rs"))