Tuesday, February 10, 2015

A simple rate limiter using Java's DelayQueue

It is rare for me to develop at this level of use case but recently we had to manage a bit of work-load with limited resources that lead to a simplified and light-weight rate limiter using Java's concurrent additions.

Guava's rate limiter is pretty good but I didn't want to include a (fat) dependency on guava just for one class so I wrote a variant based on Java's DelayQueue:

class DelayedEntry implements Delayed {
  long expireAt;
  TimeUnit unit;
  DelayedEntry(long delay, TimeUnit tu) {
   unit = tu;
   setDelay(delay);
  }
  void setDelay(long delay) {
   this.expireAt = System.nanoTime() + unit.toNanos(delay);
  }
  int compareTo(Delayed other) {
   throw new IllegalStateException("Expected single element queue");
  }
  long getDelay(TimeUnit u) {
   return u.convert(expireAt - System.nanoTime(), NANOSECONDS);
  }
}

class RateLimiter {
 DelayQueue<DelayedEntry> queue;
 DelayedEntry token;
 TimeUnit rateUnit;
 AtomicInteger rate;
 RateLimiter(int rateLimit) {
  queue = new DelayQueue<>();
  rateUnit = NANOSECONDS;
  rate = new AtomicInteger(rateLimit);
  token = new DelayedEntry(0, NANOSECONDS);
 }
 boolean acquire(int permits) throws InterruptedException {
  long targetDelay = rateUnit.toNanos(permits) / max(1, rate.get());
  DelayedEntry nextToken = token;
  while (!queue.isEmpty()) {
   nextToken = queue.take();
  }
  assert nextToken != null;
  nextToken.setDelay(targetDelay);
  return queue.offer(token);
 }
}
This implementation isn't exactly a mathematically precise rate limiter that can shape traffic bursts in uniform distributions for volatile rate limits. However, for use-cases involving predictable timings, this works pretty well with minimal and bounded resource usage.

One thing that shouldn't go un-noticed is that we can easily use such utility implementations to instrument collection interfaces such as Iterable and Iterator to decorate existing code base, here's an example:
 public <T> Iterator decorate(final Iterator<T> delegate) {
  return new Iterator() {
   public boolean hasNext() {
    return delegate.hasNext();
   }
   public T next() {
    acquire();
    return delegate.next();
   }
   public void remove() {
    delegate.remove();
   }
  };
 }

I would love to hear reader's opinions about this approach.

Tuesday, April 29, 2014

JDK 8, Lucene and SSD based Xeon Servers

Recently, I've been working on an app that involves near real-time ingestion and indexing of full twitter fire-hose on cutting-edge physical hardware. This mostly involves Lucene and home grown secret sauce analytical algorithms in pure Java.

As an enthusiastic Java developer, I decided to switch run-time of this app to newly released JDK 8, because why not?

This worked great with Linux on multitudes of hardware (32-bit AMD/K2 based CPU on my laptop, raspberry-pi/ARM, 64-bit Intel-I7 Desktop/Intel, 2 generations old 64-bit 2U-Xeon/Intel based server - all with spinning rust), but when I deployed the same app to our latest Xeon servers with SSD based storage I started seeing problems. The first problem I observed was on index optimization phase with insignificant index size for SSD storage (~12G). In optimization phase, the machine becomes irresponsive with ~70% system utilization with no more than 2% io-utilization. The second problem was with zero hits (i.e. no search results) on near real time Index reader, this was unique to SSD based server because when I copy the same index on other machine it would work just fine.

Despite many JDK8 bugs reported by Lucene committers, it looks like there are still some hidden JIT compiler bugs lurking in corner cases when your indices are very large even with the latest JDK 8.

The workaround that has worked (great) for me so far is to switch back to latest JDK 7.

When your app is deployed on SSD based storage infrastructure, the bottleneck of a search app shifts from IO to CPU and you start to see these type of issues. Even after a couple of days of googling, I haven't found anyone on internet seeing this problem, so I'll conclude that this may be specific to my use-case but hopefully someone will find switching back to JDK7 useful.

Monday, April 14, 2014

MySQL Insert performance and random primary key values

It's been almost a year since I've been putting up with MySQL and its bizarre performance characteristics and lack of choices on index data structures. If it wasn't for the volume and velocity of data we're storing in it, I would have proposed to ditch it by now.

We use MySQL purely as a stable high performance key-value store so many might not share the frustrations. I've been bitten by poor performance of InnoDB tables with semi-random values in primary key more than once now and hopefully this post will help someone who is in similar situation.

MySQL with InnoDB struggles with random primary key values, probably because it only supports BTree indices on it and page split cost on a BTree can be very IO intensive, I've only experienced this with numerical columns but it may be the true for other data types as well.

The only solution that has worked satisfactorily for me so far is to add an AUTO INCREMENT primary key and index the other column as unique. This is a complete waste of storage space but has offered the best insert performance so far. AUTO INCREMENT with upsert (ON DUPLICATE UPDATE) is probably the only reliable and performant way of using MySQL as super fast ingestion engine.

Monday, December 16, 2013

Interesting use-case for SynchronousQueue

While working on a very unusual system, where: 1) producers can be significantly faster than consumers at times (by more than a factor of two) and 2) producers have low latency processing overhead for real time data, I was contemplating on a data structure that is efficient, performant and can model this situation elegantly. After researching probable candidates, I came across Exchanger/SynchronousQueue from Java's util.concurrent class library.

If I was looking at SynchronousQueue without the above context, I would have wondered why would anyone need a queue that's not really a queue but more like a pointer swap between appropriate threads. But the use-case I'm dealing with ("event bursts") are probably the perfect use case for preventing the consumers from overwhelming rates by modeling the problem more as a "hand-off" than a typical case of buffered queuing. The central idea behind this data-structure is to adapt queue idioms without using a queue in a very efficient manner with an added feature that message production is rate limited by consumer's speed of processing them. Behind the scenes, it uses dual-stack/queue algorithm (depending on ordering fairness preference) to transfer a reference between threads.

SynchronousQueue is more of a thread queue than a data queue, it maintains a stack/queue of waiter threads (i.e. "consumers") and not the queue of data itself. You can probably achieve the same functionality by using BlockingQueue of size 1 or using an explicit object lock and explicit wait/notify on a datum reference like an example below:

//Example code, probably riddled with concurrency bugs 
//(I've only tested it on my laptop :)) 
public class MyNaiveSyncQueue {
    private final Object LOCK = new Object();
    private volatile Object data; //volatile is needed for non compressed OOPS
    public void put(Object o) throws InterruptedException{
        synchronized (LOCK) {
            if(data != null){
                LOCK.wait();
            }
            data = o;
            LOCK.notify();
        }
    }
    public Object take() throws InterruptedException{
        synchronized (LOCK) {
            if(data == null){
                LOCK.wait();
            }
            Object o = data;
            data = null;
            LOCK.notify();
            return o;
        }
    }
}

There are several problems with the solution above:
  • Violent locking and memory fence overhead: for individual queue operations, this will scale terribly with number of producers/consumers, especially on server class SMP hardware.
  • Constant context switching: each successful queue operation involves  syscall(s) for context switching which might involve kernel scheduler and everything that comes with it (cache flush/register reload et. al.).
  • Overhead for fair processing: JVM manages object monitor wait queues in a fixed FIFO order, there's certain overhead in seeking the first enqueued thread to schedule as a consumer. This may or may not be the behavior the programmer cares about.
SynchronousQueue takes care of all of these limitations by providing options for trade-off in terms of scheduler ordering fairness as well as eliminating expensive locking with hardware level CAS (whenever available). It also does a fair bit of spin-locking before a kernel level timed-wait kicks-in, this ensures that context-switches don't become the hot-spots in message processing.

So far this has been working great for the system I'm dealing with which processes about a couple of hundred messages per millisecond at peak bursts but I realize that it might not be appropriate (or even worth it) for non-realtime/non-bursty producers.