-
Notifications
You must be signed in to change notification settings - Fork 276
Home
Timely dataflow is a framework for data-parallel computation.
In this document, we'll learn how to construct timely dataflow computations. By the end, you'll be able to easily write and run computations on your personal computer, and seamlessly scale them up to multiple threads, processes, and computers.
Let's start with a simple example computation.
This example shows off some of the moving parts of timely dataflow. After seeing them for the first time, we'll walk through each of the parts to see what is going on, and get a rough idea for how these computations look.
extern crate timely;
use timely::construction::*;
use timely::construction::operators::*;
fn main() {
// initializes and runs a timely dataflow computation
timely::execute(std::env::args(), |root| {
// create a new input, and inspect its output
let mut input = root.subcomputation(move |builder| {
let (input, stream) = builder.new_input();
stream.inspect(|x| println!("hello {:?}", x));
input
});
// introduce data and watch!
for round in 0..10 {
input.give(round);
input.advance_to(round + 1);
root.step();
}
// seal the input
input.close();
// finish off any remaining work
while root.step() { }
});
}
Ok. That isn't so bad. There are definitely some things that need explanation, but at least it all fits on one screen. Let's talk through each of the parts.
At the top of the example we see the following:
extern crate timely;
use timely::construction::*;
use timely::construction::operators::*;
The first line is Rust's way of saying that we have an external dependence on the timely
crate, which is where all the code for timely dataflow exists. The other two lines explicitly import some types we will need in the program. I'll call them out as we get to them.
A timely dataflow program is still just a Rust program, so it should have a main
function somewhere in it. In our example, it doesn't really do anything other than make a call in to timely::execute
:
fn main() {
// initializes and runs a timely dataflow computation
timely::execute(std::env::args(), |root| {
The call to timely::execute
spins up the timely dataflow infrastructure. The method has two parameters: arguments for execution (the number of workers, where they live, etc) and what each worker should do once it is started up.
We now get in to writing the code each worker should execute. I like to think of this as just an extension of main
, where I'm writing as if for a single-threaded computation and not stressing about the whole "deployed across a cluster of machines".
The worker logic is passed root
as an argument. This argument wraps up some information about the worker and its environment, but for our purposes is the blank canvas on which we are going to define a dataflow computation.
The first thing we do is call subcomputation
, which lets us define a group of dataflow operators. In this group we place an input, inspect its output stream, and return a handle to the input back up.
// create a new input, and inspect its output
let mut input = root.subcomputation(move |builder| {
let (input, stream) = builder.new_input();
stream.inspect(|x| println!("hello {:?}", x));
input
});
Importantly, we haven't actually done any data processing yet. We've only described a dataflow computation in which data pushed in to input
flow out through stream
and in to the inspect
logic.
With a dataflow graph constructed, we can now start running it.
// introduce data and watch!
for round in 0..10 {
input.give(round);
input.advance_to(round + 1);
root.step();
}
Recall that input
was our handle to the input stream. It takes data from outside the dataflow and introduces it in to the dataflow computation. Here we repeatedly use the give
method to introduce records, the numbers 0
through 9
.
After introducing each number we call advance_to
. This is part of timely dataflow's advanced technology. This is a signal to the system that we are not going to produce any data with a timestamp less than or equal to round
. This would be very useful if the dataflow we constructed needed to know when a group of records were complete, for example if we wanted to count the number of submitted records. We aren't doing that here, so it is a little hard to justify (we could have left it out, but advance_to
is also what flushes internal buffers in input
).
The call to root.step()
tells timely dataflow to do some work. Internally, it will give each operator a change to do some computation, which means sending records and applying operator logic. For example, this is where the println!
in inspect
will run. We could skip this step as well, but records will start to back up in the system. In a streaming system it is generally healthiest to keep records moving.
The last thing we do is shut down the dataflow computation.
// seal the input
input.close();
// finish off any remaining work
while root.step() { }
});
}
The call to input.close()
indicates that we have no more data to supply through this input. It allows timely dataflow to reach the conclusion that we may be "done". To reach this conclusion, we may need to step the computation several more times (as records may still be in flight in the system). When step()
returns false, it means that all inputs are closed, all messages are processed, and no operators have outstanding work.