Skip to content

Commit

Permalink
Rename CounterCore to Counter
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Mar 21, 2024
1 parent ad2b613 commit 10352e0
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 21 deletions.
13 changes: 5 additions & 8 deletions timely/src/dataflow/channels/pushers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@ use crate::Container;

/// A wrapper which updates shared `produced` based on the number of records pushed.
#[derive(Debug)]
pub struct CounterCore<T, D, P: Push<Bundle<T, D>>> {
pub struct Counter<T, D, P: Push<Bundle<T, D>>> {
pushee: P,
produced: Rc<RefCell<ChangeBatch<T>>>,
phantom: PhantomData<D>,
}

/// A counter specialized to vector.
pub type Counter<T, D, P> = CounterCore<T, Vec<D>, P>;

impl<T: Timestamp, D: Container, P> Push<Bundle<T, D>> for CounterCore<T, D, P> where P: Push<Bundle<T, D>> {
impl<T: Timestamp, D: Container, P> Push<Bundle<T, D>> for Counter<T, D, P> where P: Push<Bundle<T, D>> {
#[inline]
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
if let Some(message) = message {
Expand All @@ -34,10 +31,10 @@ impl<T: Timestamp, D: Container, P> Push<Bundle<T, D>> for CounterCore<T, D, P>
}
}

impl<T, D, P: Push<Bundle<T, D>>> CounterCore<T, D, P> where T : Ord+Clone+'static {
impl<T, D, P: Push<Bundle<T, D>>> Counter<T, D, P> where T : Ord+Clone+'static {
/// Allocates a new `Counter` from a pushee and shared counts.
pub fn new(pushee: P) -> CounterCore<T, D, P> {
CounterCore {
pub fn new(pushee: P) -> Counter<T, D, P> {
Counter {
pushee,
produced: Rc::new(RefCell::new(ChangeBatch::new())),
phantom: PhantomData,
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pushers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub use self::tee::{Tee, TeeHelper};
pub use self::exchange::Exchange;
pub use self::counter::{Counter, CounterCore};
pub use self::counter::Counter;

pub mod tee;
pub mod exchange;
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/capture/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
//! than that in which the stream was captured.

use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::channels::pushers::CounterCore as PushCounter;
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
use crate::progress::Timestamp;
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/operators/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::progress::{Source, Target};
use crate::order::Product;
use crate::{Container, Data};
use crate::communication::Push;
use crate::dataflow::channels::pushers::{CounterCore, Tee};
use crate::dataflow::channels::pushers::{Counter, Tee};
use crate::dataflow::channels::{Bundle, Message};

use crate::worker::AsWorker;
Expand Down Expand Up @@ -90,7 +90,7 @@ impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container> Enter<G, T

let (targets, registrar) = Tee::<T, C>::new();
let ingress = IngressNub {
targets: CounterCore::new(targets),
targets: Counter::new(targets),
phantom: PhantomData,
activator: scope.activator_for(&scope.addr()),
active: false,
Expand Down Expand Up @@ -161,7 +161,7 @@ impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave


struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Container> {
targets: CounterCore<TInner, TData, Tee<TInner, TData>>,
targets: Counter<TInner, TData, Tee<TInner, TData>>,
phantom: ::std::marker::PhantomData<TOuter>,
activator: crate::scheduling::Activator,
active: bool,
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::progress::frontier::{Antichain, MutableAntichain};
use crate::Container;
use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::channels::pushers::Tee;
use crate::dataflow::channels::pushers::CounterCore as PushCounter;
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use crate::dataflow::channels::pact::ParallelizationContract;
use crate::dataflow::channels::pullers::Counter as PullCounter;
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::progress::Timestamp;
use crate::progress::ChangeBatch;
use crate::progress::frontier::MutableAntichain;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::channels::pushers::CounterCore as PushCounter;
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
use crate::dataflow::channels::Bundle;
use crate::communication::{Push, Pull, message::RefOrMut};
Expand Down
8 changes: 4 additions & 4 deletions timely/src/dataflow/operators/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::progress::Source;
use crate::{Container, Data};
use crate::communication::Push;
use crate::dataflow::{Stream, ScopeParent, Scope, StreamCore};
use crate::dataflow::channels::pushers::{Tee, CounterCore};
use crate::dataflow::channels::pushers::{Tee, Counter};
use crate::dataflow::channels::Message;


Expand Down Expand Up @@ -178,7 +178,7 @@ impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {

fn input_from_core<D: Container>(&mut self, handle: &mut HandleCore<<G as ScopeParent>::Timestamp, D>) -> StreamCore<G, D> {
let (output, registrar) = Tee::<<G as ScopeParent>::Timestamp, D>::new();
let counter = CounterCore::new(output);
let counter = Counter::new(output);
let produced = counter.produced().clone();

let index = self.allocate_operator_index();
Expand Down Expand Up @@ -249,7 +249,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
pub struct HandleCore<T: Timestamp, C: Container> {
activate: Vec<Activator>,
progress: Vec<Rc<RefCell<ChangeBatch<T>>>>,
pushers: Vec<CounterCore<T, C, Tee<T, C>>>,
pushers: Vec<Counter<T, C, Tee<T, C>>>,
buffer1: C,
buffer2: C,
now_at: T,
Expand Down Expand Up @@ -332,7 +332,7 @@ impl<T: Timestamp, D: Container> HandleCore<T, D> {

fn register(
&mut self,
pusher: CounterCore<T, D, Tee<T, D>>,
pusher: Counter<T, D, Tee<T, D>>,
progress: Rc<RefCell<ChangeBatch<T>>>,
) {
// flush current contents, so new registrant does not see existing data.
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::cell::RefCell;

use crate::progress::Timestamp;
use crate::progress::frontier::{AntichainRef, MutableAntichain};
use crate::dataflow::channels::pushers::CounterCore as PushCounter;
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::channels::pullers::Counter as PullCounter;
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::progress::Source;
use crate::progress::ChangeBatch;

use crate::Data;
use crate::dataflow::channels::pushers::{CounterCore as PushCounter, Tee};
use crate::dataflow::channels::pushers::{Counter as PushCounter, Tee};
use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSessionCore};

use crate::dataflow::operators::{ActivateCapability, Capability};
Expand Down

0 comments on commit 10352e0

Please sign in to comment.