Hashes to hashes

Okay. So this post was supposed to be about running on a cluster. I promise we will come to that eventually, but this week I got a bit side-tracked. Serendipity happened! We will have to dive into Rust HashMaps characteristics.

Where were we?

We are still working on “Query2”, a simple groupby/sum query over uservisits, and trying to see how fast we can get it to run on today hardware.

  SELECT SUBSTR(sourceIP, 1, X), SUM(adRevenue)
    FROM uservisits

The BigData benchmark provides the following performances for a 8$/hr AWS cluster. Note there is a hardware section than can be unfolded.

Query X group count Hive Shark Redshift
2A 8 2,067,313 730s 83s 25s
2B 10 31,348,913 764s 100s 56s
2C 12 253,890,330 730s 132s 79s

I chose to use my laptop instead (mid-2014 MacBook Pro Retina).

My first attempts for 2A were in the realm of 650-800 seconds, but, we found some nice optimisations last week:

At that point, I felt pretty good. I felt ready to try and go to the cluster.

First of all, I went to try my timely dataflow implementation of Query2 locally, on the laptop. I was expecting it to be performing a bit less efficiently than my hand-crafted implementation. After all, timely introduces a few abstractions and some complexity, so…

Well, timely 2A ran in 60 seconds. That’s 25% better than my best so far.

So what’s happening here? It turns out the “natural” way to implement the groupby in timely is slightly different than by hand.

Hashes to hashes

First let’s see how the hand-made runner works.

The timely dataflow runner does things a bit differently.

In short, timely bucketize the data then hash it, whereas the hand-made runner bucketize and hash separately the data of each input chunk before merging each chunk bucket.

Now remember how dominant HashMap manipulations where showing in the FlameGraphs last week? Well, each bit of data gets inserted twice in a HashMap in the hand-made implementation and only once in the timely implementation…

The rationale behind doing a first stage aggregation at the worker level was to reduce the volume of data to be inserted in the final HashMap buckets, and thus reducing lock contention on the bucket mutexes. It may have been a (bad case of) premature optimisation. Just cast the first stone if you feel like it.

What if we just use regular Vec instead of HashMap for the worker buckets?

Guess what. Yep. 60 seconds. So timely and the hand-made runner are in the same ball park. Phew.

Fixing Query2C

Were are we so far ?

Query X group count Hive Shark Redshift Me so far
2A 8 2,067,313 730s 83s 25s 60s
2B 10 31,348,913 764s 100s 56s 93s
2C 12 253,890,330 730s 132s 79s 255s

So we are doing a bit better than Shark on 2A and 2B, but something unpleasant happens in 2C.

Let’s have a look at some metrics.


Performance and resources usage for 2A

and 2C.

Performance and resources usage for 2C

These graphs show many things, but of particular interest are:

So, at about 140s and 95% of the work done, 2C tries to roughly double its memory space and starts swapping. What is happening is, again, HashMap reallocations, this time affecting the final buckets HashMap, not the intermediary ones — which we have just replaced by Vectors anyway.

You may remember that my initial evaluation of the size of the final result for 2C in the first post was about 4GB. But here, we are actually manipulating 12GB.

We have to accommodate 253M keys and values. Each key is 12 bytes, each value is 4 bytes (a 32bit float). 253M*(12+4)=4GB.

Well, reality is a bit different. First of all there is the capacity management issue. To keep doing its work fast, a HashMap has to waste some space. Rust HashMap needs at least 10% of empty space in it. If it get above this mark, it reallocates itself doubling its size. So depending on when the reallocation happens, we may need to multiply the theoretical data size by about 2.2 to get the real memory usage.

It gets worse. Rust HashMap needs a 64bit integer on each of its pair for housekeeping. So all in all, we need 253 * (12+4+8) * 1.1=6.6GB if we’re lucky with re-allocation timing or twice that if we are not.

And we are not lucky: the reallocation that kills us is triggered at 95% of the progress. Can we do something about that? Yes.

First, I tried to pre-allocate the final 256 bucket maps at the “right” capacity. That did not work: rust HashMap wants its raw capacity to a power of two, so it was picking the same value than the one it can barely holds after the last reallocation… I was putting the process in the desperate state of swapping at the very beginning. That was very counter productive.

But… I can also very easily change the number of partition buckets. Bumping it from 256 to 300 made enough room in the buckets to delay the dramatic reallocation. And we get a 91 seconds run!

Performance and resources usage for 2C

Partition bucket count impact

Have you noticed how this 91s point makes 2C slighly faster than our previous best for 2B?

This was unexpected. So let’s try 2A and 2B with 300 buckets too… 54 and 64! We’re getting better at this, but we don’t know why.

OK, we definitely need more data. It becomes increasingly difficult to get reliable measurements on the laptop as we eliminate fat everywhere, but I have consistent measurement on the more quiet linux boxes. Running times for Query2B on the linux server boxes, with partition from 230 to 320 buckets.

Query2B running time for partition bucket count


So there is some kind of long-term variation, maybe related to the values that could avoid one more turn of HashMap reallocation. But the short term looks pretty interesting :) There is a strong correlation between the highest power-of-two in the factorization of the bucket count and the time: every other fourth value is a local peak, every other eighth value is a higher local peak and so on until the 256 value which is dreadful.

Let’s check out the FlameGraphs. I even managed to get the SVG working this time. First one is 2B with 256 buckets, second one with 257.

Can you spot the difference? Click on the nearly invisible “Search” button top right of the first one and type “Full” in there… See that FullBucket stuff ? Smells like collisions :) There is some kind of resonance happening between the partitioner and the HashMap hasher.

Partitioners and hashers

Bottom line is hash(key)%bucket.len() partition data in a way that will generate an important number of collision in the buckets. With 256 buckets, we are just partitioning the data using the last 8 bits of the hash. I tried to get more entropy (from the 7 other bytes) by adding some salt: What I am doing is hash("noiz"+key)%bucket.len() instead.

As we are talking about hashers, several people have told that I would probably benefit from using the FNV hasher instead of the default SIP hasher. I’ve actually been using FNV for a while even if it was probably a premature optimisation when I made the switch.

But it is true nonetheless that FNV slays a few % of the running time. Both SIP and FNV have the same kind of “resonance” between the partitions and the HashMap themselves. On the linux boxes:

Query2B running time for partition bucket count, sip and fnv

Running times for Query2A on my laptop with different partitioner and hasher tweaks:

bucket size sip fnv salted fnv
256 68s 60s 50s
257 64s 57s 53s


So in this post, I have mostly been tweaking the hash based aggregation and managed to squeeze a few more seconds out of the running time.

Query X group count Hive Shark Redshift Laptop
2A 8 2,067,313 730s 83s 25s 50s
2B 10 31,348,913 764s 100s 56s 64s
2C 12 253,890,330 730s 132s 79s 91s

Next week, I promise, we’ll have fun with timely dataflow and clusters.

Rust and BigData series:

  1. Rust, BigData and my laptop
  2. The rust is in there
  3. Let’s optimize
  4. Hashes to hashes
  5. Embrace the glow cloud
  6. Query2 in timely dataflow
  7. Query2 in differential dataflow