Monday, December 16, 2013

Interesting use-case for SynchronousQueue

While working on a very unusual system, where: 1) producers can be significantly faster than consumers at times (by more than a factor of two) and 2) producers have low latency processing overhead for real time data, I was contemplating on a data structure that is efficient, performant and can model this situation elegantly. After researching probable candidates, I came across Exchanger/SynchronousQueue from Java's util.concurrent class library.

If I was looking at SynchronousQueue without the above context, I would have wondered why would anyone need a queue that's not really a queue but more like a pointer swap between appropriate threads. But the use-case I'm dealing with ("event bursts") are probably the perfect use case for preventing the consumers from overwhelming rates by modeling the problem more as a "hand-off" than a typical case of buffered queuing. The central idea behind this data-structure is to adapt queue idioms without using a queue in a very efficient manner with an added feature that message production is rate limited by consumer's speed of processing them. Behind the scenes, it uses dual-stack/queue algorithm (depending on ordering fairness preference) to transfer a reference between threads.

SynchronousQueue is more of a thread queue than a data queue, it maintains a stack/queue of waiter threads (i.e. "consumers") and not the queue of data itself. You can probably achieve the same functionality by using BlockingQueue of size 1 or using an explicit object lock and explicit wait/notify on a datum reference like an example below:

//Example code, probably riddled with concurrency bugs 
//(I've only tested it on my laptop :)) 
public class MyNaiveSyncQueue {
    private final Object LOCK = new Object();
    private volatile Object data; //volatile is needed for non compressed OOPS
    public void put(Object o) throws InterruptedException{
        synchronized (LOCK) {
            if(data != null){
                LOCK.wait();
            }
            data = o;
            LOCK.notify();
        }
    }
    public Object take() throws InterruptedException{
        synchronized (LOCK) {
            if(data == null){
                LOCK.wait();
            }
            Object o = data;
            data = null;
            LOCK.notify();
            return o;
        }
    }
}

There are several problems with the solution above:
  • Violent locking and memory fence overhead: for individual queue operations, this will scale terribly with number of producers/consumers, especially on server class SMP hardware.
  • Constant context switching: each successful queue operation involves  syscall(s) for context switching which might involve kernel scheduler and everything that comes with it (cache flush/register reload et. al.).
  • Overhead for fair processing: JVM manages object monitor wait queues in a fixed FIFO order, there's certain overhead in seeking the first enqueued thread to schedule as a consumer. This may or may not be the behavior the programmer cares about.
SynchronousQueue takes care of all of these limitations by providing options for trade-off in terms of scheduler ordering fairness as well as eliminating expensive locking with hardware level CAS (whenever available). It also does a fair bit of spin-locking before a kernel level timed-wait kicks-in, this ensures that context-switches don't become the hot-spots in message processing.

So far this has been working great for the system I'm dealing with which processes about a couple of hundred messages per millisecond at peak bursts but I realize that it might not be appropriate (or even worth it) for non-realtime/non-bursty producers.