Query2 in timely dataflow

Last week, we have established that timely-dataflow rocks. We have shown it was allowing us to crunch data with one order of magnitude cost-efficiency that Redshift or Spark on EC2.

Timely is great, but it can be a bit intimidating. It’s lower-level than Spark, bringing us a bit to the Hadoop manual map/reduce era. So this week, we will take the time to translate step by step our good old Query 2 friend to its timely-dataflow implementation.

rusted gears

From pseudo-SQL to execution plan

First, we need to do the query planner job by hand. Some people like SQL syntax — you guessed it, I’m not one of them — but translating it to something more computer friendly is not simple.

  SELECT SUBSTR(sourceIP, 1, 8), SUM(adRevenue)
    FROM uservisits
GROUP BY SUBSTR(sourceIP, 1, 8)

What’s not to like? Eye-hurting capitalization convention, moronic SUBSTR repetition, complete lack of composability, absurd reading order.

Big, big sigh.

Anyway. What would it look like in Spark, or in fluent collection style? (This is pseudo code)

    .map(visit => (visit.sourceIP.substring(0,8),visit.adRevenue))
    .reduceByKey((a,b) => a+b)

This tries less hard to be readable in natural language, but is a good step is a very nice step in the direction of actually doing something.

To paraphrase it:

In all the experimentations we have done so far, I have computed the actual revenue per prefix, even if I never displayed more than the count of prefix.

Now, if we were to run in Shark, we would be more or less done at this point, and Shark would do the rest for us. But timely dataflow, even with differential-dataflow, does not do this extra mile for us.


First, we need to decide how to distribute this to several workers. In everything that follows, a worker is some code that shares no application data with its couterparts. From an isolation point-of-view, a worker could be a process — they were just that in good old Hadoop. In timely, they can be multiple threads running on different processes, running on different servers. But the isolation invariant stands: application logic and data stay contained in one worker. There is no shared-across-worker memory structures visible to the application developer.

A very efficient way to think about execution plan, obviously, to think in map and reduce terms.

So basically, when processing data in map/reduce, you pick and configure a few reducers, shove most of your application logic in the mapper, and let the framework do it’s magic. Which is more or less what we have just done with the pseudo-code pseudo-spark code above.

It has an input, a map, a first reducer ‘reduceByKey’, that will perform a reduction of numbers by summing them, and a second reducer in the form of the final count.

That is exactly the path I followed to write the timely implementation of Query2. From SQL to stream (at least in my head) to translation to timely.

I happened to follow a different mental path for the non-timely implementation. If you remember the previous posts, with less constraints to start with (a blank page), I manage to produce a slower implementation than the one constrained by timely… even if it had only one reducer step.

Distributing Reducers is more complicated than Mappers because we must deal with their “unit of work” and make sure it falls into one single worker.

All in all, some data will flow over the network twice, one for each reducer. First to get to the right worker for the reduceByKey responsible for the matching shard of the data, later each worker will contribute a single number of keys to an arbitrary worker which will sum them. Timely dataflow calls these worker-to-worker communications “Exchange”.

Implementing the two-reducer plan in timely

The full code is here.

It is not the executable I used for the EC2 tests. I used query2.rs which allow me to tweak and instrument more things. I copy-pasta-ed the relevant bits to make a query2_timely something easier to explain. It has the same performance characteristics.

It’s a single main() function, but the first (and only) thing it does is yield control to timely’s configuration manager. Yeah, Inversion-Of-Control, rust-style.

fn main() {
    timely::execute_from_args(std::env::args(), move |root| {
        let peers = root.peers();
        let index = root.index();

        root.scoped::<u64, _, _>(move |builder| {

This allows timely to parse the arguments from the command line, and start as many workers as necessary. Each worker is given an index, and knows the number of its peers. We put this useful constants somewhere convenient as we will need them in places where borrowing root will not be an option — rust being rust, it is picky about that.

The real interesting stuff happen in scoped(). This is where we will plug everything together.

First we have lines 23 to 33 devoted to read the input files. There is not much in there that is timely-specific, but let’s have a look at them, some bits are actually relevant.

let files = dazone::files::files_for_format("5nodes", "uservisits", "buren-snz");
let files = files.enumerate().filter_map(move |(i, f)| {
    if i % peers == index {
    } else {
let uservisits = files.flat_map(|file| {
    PartialDeserializer::new(file, Compressor::get("snz"), &[0, 3])

First we get an iterator on all the files from the right data directory. This is just stuff I made for these Query2 experiments. The next few lines are more relevant: each worker will load some files in the distributed system. We want every file to be read once and only once, and we want the load spread evenly across the workers. This is where the index and peers count we put aside before come in handy.

Once the files are filtered, we can load the actual content. PartialDeserializer, Compressor are part of the Query2 experiment code. The last three lines produce a rust standard Iterator over (sourceIp, adRevenur) pairs.

Next comes the SUBSTR, in a form of a map:

let uservisits = uservisits.map(|visit: (String, f32)| {
    (Bytes8::prefix(&*visit.0), visit.1)

Here again, nothing comes from timely. Bytes8 is a structure around an array of eight bytes I have written for Query2. As a matter of fact, Map are so easy to deal with that I have not even bother putting this one in timely formalism. I could have done it, but what’s the point really? Iterators are so easy…

A few more lines of preliminary:

let stream = uservisits.to_stream(builder);

let mut hashmap = ::std::collections::HashMap::new();
let mut sum = 0usize;

to_stream actually comes from timely! It takes a standard rust Iterator and transforms it into a timely Stream. From a 30 000 feet point of view, think of it as a mere adaptor.

The two other lines are standard rust, but they define the state of the worker: the HashMap stores the on-going reduced state of the reduceByKey step, the sum is the on-going figure for the final count().

And now for the main course, the two reducers in all their splendor.

let group = stream.unary_notify(
    Exchange::new(move |x: &(Bytes8, f32)| {
                        ::dazone::hash(&(x.0)) as u64
    move |input, output, notif| {
        input.for_each(|time, chunk| {
            for (k, v) in chunk.drain(..) {
                update_hashmap(&mut hashmap, &|&a, &b| a + b, k, v);
        notif.for_each(|time, _| {
            if hashmap.len() > 0 {

First stage implements the reduceByKey — and half of the count actually. Obviously, we are deep in timely realm this time.

We plug a unary_notify operator on our uservisits stream, and we will keep the resulting stream in group for our next step.

unary_notify is “unary” in that it define an operator that has one input (the uservisits key/value tuples) and one output. It is “_notify” in that, as we are a reducer and not a map, we will need some access to the shared computation progress in order to know when we have received everything that belong to our shard of data. Access to this global state is why “timely” is called “timely”: each record in the stream is tagged with a “time”. Time is actually a complex thing in timely dataflow. Think about the black board scene in Back to the Future II, combined with Interstellar library and twelve monkeys mad scientists — or mad engineers, sometimes it’s hard to tell the difference.

In our example, it’s quite simple. There is just one “epoch” we are interested in for the reduceByKey: we need to be notified when all the workers in the system are done with the input files.

The Exchange is what will decide to which broker a record should be sent: we are returning a hash of the key from the pair. Timely will make sure all records with the same hash are going to the same worker.

Next come a debug and audit name for our worker, and a vector of timestamps we know we will want to be notified at. We could use this to register our the required notification, except we don’t know what to_stream() uses as timestamp. Fortunately, there are other ways to register notifications.

Finally, the last argument of unary_notify is the logic that the operator will run every time it gets a chance. It takes the form of a closure which allows the logic to make use of the input, output and notification bus, and, by capture, the various variables the worker has declared as its state: more specifically, here, the hashmap variable.

Inside this closure, we code how to react to the presence of an input batch or a notification. In case of an input batch we get our chance to register to the notification bus that we need to know the end of the epoch matching the data coming in. We are calling notify_at repeatedly. It is a bit offsetting, but the cost is amortized by the framework. It is preferable to guessing whatever to_stream() does and it’s what the timely guys recommend anyway.

Moving on to the “payload” logic, we update the state hashmap. The update_hashmap acts by inserting a new key or updating the value of an existing key using the additioner we provide.

Second bit of logic is what to do when the notification occur: we will send in the outgoing stream the sole count of entries in our map. This is where we are doing half of the count. A strict implementation of reduceByKey would shove the whole HashMap in the pipe, but we know better.

This is an instance of a Map being, again, the good guy. Remember the pseudo-spark implementation above? It featured a reduceByKey(...).count(). So the stream between these two logical operators is a stream of disjoints HashMap. Count, on the other hand, could be implemented as follow (again, pseudo-code).

    .map( h:HashMap => h.count() )
    .map( n => (0, n) )
    .reduceByKey((a,b) => a+b)

The first map calls .len() on the HashMap, the second make an absurd constant key to bring all the partial counts to the same worker, which will sum to get the result.

So what I have done here was to move “up” the first of the count map as close as possible to the reduceByKey above, avoiding making a stream of HashMap and having to explain that to rust.

Back to the code, everything that remains is to move everybody to a worker (let’s pick worker 0) and sum the numbers incoming.

let _: Stream<_, ()> = group.unary_notify(Exchange::new(|_| 0u64),
      move |input, _, notif| {
          input.for_each(|time, data| {
              for x in data.drain(..) {
                  sum += x;
          notif.for_each(|_, _| {
              if index == 0 {
                  println!("result: {}", sum);

Exchange is now a constant function, the rest is similar: the input chunks contain numbers that are added to the worker-scoped sum variable. We register the notification in the input handler, as we know spamming it is tolerable. Once we get notified, we output the count on the console.

And that’s it.

This executable is ready for the distributed mode, but the distribution itself is not covered: something or someone needs to copy the executable on each of the cluster nodes, provide them the list of their peers and start them. If you want to run a second time you need to go to each node and start the process again… For the ec2 tests, I used ansible scripts.


Of course, we are far from the elegant four lines of Spark, we had to do many things by hand including translating the logical query to a distributable executable plan. We produced a full screen with a good quantity of boilerplate, but we have a standalone executable that will crunch the data with an efficiency way higher than that of Spark or Redshift.

Building an equivalent of Spark or Redshift on top of timely is a daunting task. But there are intermediary objectives that could make sense: helpers and ready-to-use operators to make writing data processing task easier, daemons that would migrate executables to all the nodes, start them and manage the peer list for instance.

Not sure what’s the next step is yet.

Rust and BigData series:

  1. Rust, BigData and my laptop
  2. The rust is in there
  3. Let’s optimize
  4. Hashes to hashes
  5. Embrace the glow cloud
  6. Query2 in timely dataflow
  7. Query2 in differential dataflow