The Injector: A new Executor for Java

By | April 24, 2015

Today I’d like to introduce a new executor, developed for Apache Cassandra, that has now been released under its own project called the injector. This approach is designed for systems processing many small messages with more application threads than physical cores. If you can easily have fewer threads than cores, feel free to stop reading and go look at the LMAX Disruptor.

Most modern networked applications follow the SEDA architecture, with networking separated from the work of serving requests. This isolation is important: we want to ensure that any lengthy, especially synchronous, work (e.g. i/o) doesn’t stop us interacting with the outside world.

Where’s the problem?

How much does it cost to separate these two stages?

It’s generally known that message passing between threads can be costly. The LMAX Disruptor is one tool designed to mitigate some of these costs, primarily those of cache coherence. On most systems this is overshadowed by interactions with the OS scheduler to suspend and resume threads. Used as originally designed, the Disruptor avoids any interactions with the scheduler, but for most systems this configuration is not practical: if you have more threads than cores, priority inversion becomes a real problem. If you employ its blocking strategy to avoid this issue, it behaves as other executors do. It relies on the scheduler.

These scheduler interactions can all be characterised by the following statement: while there are threads waiting for work (which in a healthy state is likely) the producer must signal a waiting thread through a system call.

On a modern Linux system and kernel, a good rule of thumb is that this costs on the order of 5-10μs.

5μs, what’s the big deal?

When operating over very small messages, 5μs can be more work than serving an average request. To give us a data point to work from, here are the results of internal latency measurements made by Cassandra, measuring the amount of time spent servicing a request, excluding message transfer costs, networking, etc., on a 2-core laptop against a very simple cassandra-stress workload:

./bin/nodetool cfhistograms keyspace1 standard1
Percentile SSTables Write Latency Read Latency
                       (micros)     (micros)
50%          1.00       10.00        10.00
75%          1.00       12.00        10.00
95%          1.00       24.00        14.00
98%          1.00       42.00        20.00
99%          1.00       60.00        35.00

So, 98% of read messages entail <= 20μs of work, 75% <= 10μs. If each message incurs a 5μs signalling cost, that’s around 40% of our time spent signalling.

On a many-core machine, this situation is much much worse than it first appears. If it takes as long to transfer the work as it does to perform the work, then our producer can never reach escape velocity. Any work we transfer is finished before we can transfer the next item to another core, so we are limited to one message per 5μs. Regardless of available capacity, or how much work is waiting to be scheduled.

Even if the work takes significantly longer, say ~50μs, 5μs to hand off the work means a 10% overhead and we can still only occupy a maximum of 10 threads, despite working flat out shipping work from the wire onto the executor. If those threads are all CPU bound, that’s 10-cores, which is a fraction of a modern server. If some of that time is spent waiting on an SSD it could be a tiny fraction.

Well, what’s to be done?

The injector takes a different approach: as far as possible, the worker threads attempt to schedule themselves. The only time a producer schedules a thread is if the pool has been idle for a period, or the work queue is full (in which case it’s stuck waiting for room anyway). In all other situations the worker threads coordinate with each other to match the rate of work arrival. They do this through a combination of fairly simple mechanisms:

1) after taking work from the queue, if there is work remaining and no active workers to service it, they start one

2) after completing work, if no more work is available, the thread enters a spinning state

The spinning state I will label considerate, as opposed to busy. This asks the scheduler to suspend each thread for a period that results in one of them waking up roughly every 10μs if there is an available core (and the OS can keep up).

How wasteful you might scream.

And by itself it might be. The trick here, is that we track the time collectively spent descheduled or achieving nothing; if the total exceeds the real elapsed time by a few millis, one of these threads stops. Since they accumulate, this means the time to stop over-subscribed workers scales sublinearly (the sum of the harmonic numbers), with the upshot being a tight bound on the amount of timed-sleeping and wakeups we can incur. As work continues to arrive, in many cases no expensive system calls are necessary, even within the work pool1, since it responsively self-prunes its active workers to match the work throughput rate2.

Okay. Sounds contrived. Does it actually work?

Included with the source code is a suite of JMH benchmarks with a variety of knobs to tune3, that compare performance against the common executor implementations. These have been run with a range of parameters4, and the results have been aggregated for different message payloads (as measured in cputime)5. The test hardware was a dedicated 32-vcore EC2 machine (c3.8xlarge) running Ubuntu Server 14.04. These tests were constrained to those scenarios where a benefit is likely to be seen, but the results are quite pronounced: performance is universally better than a plain ThreadPoolExecutor for these scenarios, and with the right workloads we see as much as 6x throughput, with 5x being the median improvement on this system for messages costing around 10μs cpu time to process, and that is just nothing to scoff at6.

vsjdk

With graphs of the individual tests for easy comparison…

full_jdk_comparisons

 

And a briefer comparison against the other major executor implementations around, showing it beating them all handily, except the default disruptor configuration when threads=cores…

compare_all

Obviously, there are lies, damned lies, and microbenchmarks. But this work started out life in Cassandra, a real world use case, in which we see a 20% improvement for in-memory workloads. We expect the yield to improve over time as other costs are reduced.

Is this for me?

This honestly fits a narrow niche, but one that is gaining in popularity. If your messages take > 100μs to process, or your worker threads are consistently saturated, the standard ThreadPoolExecutor is likely perfectly adequate for your needs. If, on the other hand, you’re able to engineer your system to operate with one application thread per physical core you are probably better off looking at an approach like the LMAX Disruptor. However, if you fall in the crack in between these two scenarios, or are seeing a significant portion of time spent in futex calls and need a drop in ExecutorService to take the edge off, the injector may well be worth a look.

1 Timed-sleeping is a comparatively cheap operation, and we are only woken up when a busy worker blocks or a scheduler quanta elapses. This latter happens infrequently compared to the timescales we’re targeting, whereas if the worker is blocked we either want to be woken up because there’s work to do, or there isn’t and we had nothing we could achieve anyway. Ultimately the result is that these costs are generally only incurred at times when we have spare change to pay them with.
2 The reality is that any work arrival rate, even under synthetic benchmarks, is choppy – it bunches up and arrives all at once, or not at all for brief periods. So whilst the executor is optimal at a steady throughput rate, the result is only that the costly operations happen less frequently, not that they are eliminated. However since the producer almost never incurs the cost, the injector can more easily ensure cores are not starved, by sharing the burden of these costs when they cannot be avoided altogether.
3 Size of the work queue, number of outstanding tasks at once, number of threads, length of each operation, amount of time spent suspended by an operation and the chance of doing so (to simulate disk access)
4 Worker threads in {64, 128, 256, 512};  Outstanding task count (as multiple of thread count) in {0.5, 1, 2, 4}; Time spent asleep (and chance thereof) in micros in {0, 1% * {10, 100, 1000}, 10% * {10, 100, 1000}}
5 The cross-product of all variables was run, and for each cputime a performance ratio was computed versus ThreadPoolExecutor for each other variable. The set of ratios formed the box plot data.
6 These benchmarks are sensitive to OS and system characteristics. For instance, whilst I have not explored it thoroughly, on Windows the effect is much more variable: the upside is as significant, but there are also workloads for which there is significant downside.

6 thoughts on “The Injector: A new Executor for Java

  1. Magnus

    Hi,
    just a little hint: In your Text you have a “LMAX Disruptor^7” but no corresponding footnote.
    Nice read!

    Reply
  2. JR

    It would be interesting to see bare-metal benchmarks where hypervisor overhead doesn’t come into the mix since the overhead imposed by running under an HV specifically exacerbates the underlying syscall latency issues you’re optimizing around.

    Reply
    1. belliottsmith Post author

      Agreed that it would be great to run these benchmarks against baremetal hardware, although finding some going spare can be tricky. I can say that running baremetal on two different generation of Intel CPU laptops running Ubuntu 12.04 and 14.04 demonstrates the stated improvement to C* as a whole. Also that old generation CPUs running baremetal on a 2.6 kernel do not exhibit any improvement to C*, but that 1-2 prior generation of Intel CPUs running a modern kernel (on server hardware) do. The discussion on CASSANDRA-4718 is a lengthy read but covers at least three baremetal platforms that demonstrate these benefits, although looking back over the discussion the dialogue is not at all clear even to me (and I was an active participant).

      I will see if I can find some baremetal hardware to run these exact benchmarks against for a more direct comparison. In the meantime, there was a brief exchange with Andriy Plokhotnyuk who ran his that favour the akka-fjp, which in the injector’s home territory lost convincingly (i.e. threads > cores, non-forking workloads). These are my plots of his results.

      It’s worth noting as an aside that the default clock configuration on AWS may artificially hamper the injector (which I did not reconfigure – these were run against a base AWS Ubuntu image), as they are disproportionately expensive without reconfiguration (or running baremetal) and not incurred by the other executors.

      Reply
    1. belliottsmith Post author

      The discussion on CASSANDRA-4718 demonstrated it to be equal to or superior to FJP, for these workloads, on every tested configuration. Andriy Plokhotnyuk also ran independent tests corroborating its superiority particularly when threads > cores, for non-forking workloads (my plot of his results).

      The final graph in the post also compares against a number of executors, including FJP, and shows it to outperform in every case for the JMH benchmarks I provided, although I need to replot that so that the x-axis can be more clearly interpreted (the two cases where disruptor wins are where threads=cores; the low throughput runs are where the Runnables perform costly work. The runs are a subset of those performed for the more comprehensive comparison against TPE).

      Reply

Leave a Reply

Your email address will not be published. Required fields are marked *