Tuesday, May 05, 2009

Fork/Join Concurrency with Scala Actors

Have you ever wondered why there are no special frameworks to address concurrency in a Java based application? Considering Java's rich (NIH) ecosystem, I do wonder why I have to write same old state management code while introducing even a small amount of concurrency in Java application.

The reason why I think it is almost impossible to consider concurrency as an aspect in arbitrary application is because of JVM's native support for shared memory concurrency. As a result every developer is forced to think in terms of threaded shared state with guarded blocks. If you have read or written non-trivial piece of code using shared memory concurrency primitives (Mutex, Semaphore etc.) you probably know that the resultant code is hard to visualize and test.

I have been reading about Scala's Actor library and its share-nothing message passing abstraction built over existing concurrency model of JVM. While it doesn't try to solve the fundamental problem, it provides an alternative to address concurrency in your application from a different perspective which is testable and easier to understand.

In actor model, an Actor is a forkable task which runs independently, something like a serializable+immutable object with its private data and behavior. Each actor can send and receive (or react to) messages asynchronously, very similar to object oriented programming with objects responding to messages, but in a concurrent way. This abstraction can seamlessly be applied to a given application of divide and conquer nature and can be made concurrent with minimal efforts as compared to adapting to Java's concurrency primitives.

To explain my point further take a look at classically trivial Producer/Consumer example in Java.

public class Consumer extends Thread {
private final Buffer buffer;
public Consumer(Buffer buffer) {
super("Consumer");
this.buffer = buffer;
}
@Override
public void run() {
while (true){
System.out.println(buffer.next());
}
}
}
public class Producer extends Thread {
private final Buffer buffer;
public Producer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
Random random = new Random(System.nanoTime());
while (true) {
String num = Integer.toString(random.nextInt());
System.out.println(getName() + "=putting: " + num);
buffer.add(num + ": " + getName());
try {
sleep(400);
} catch (InterruptedException e) {
}
}
}
}
public class Buffer {
private String string;
private boolean ready = false;
public synchronized String next() {
if (ready != true)
try {
wait();
} catch (InterruptedException e) {
}
ready = false;
return string;
}

public synchronized void add(String string) {
while(ready == true)
try {
wait();
} catch (InterruptedException e) {
}
this.string = string;
notifyAll();
ready = true;
}

}
public class Test {
public static void main(String[] args) throws Throwable {
Buffer buffer = new Buffer();
new Consumer(buffer).start();
Producer producer = new Producer(buffer);
producer.start();
producer.join();
}
}


Take a look at Buffer class, we have used some concurrency primitives there since that's the place where state is being manipulated. We didn't declare variable ready as volatile since primitive assignments are guaranteed to be atomic (except long and double), Even a simple problem like this involves fair bit of understanding of the underlying threading model. There's no doubt this complexity will extrapolate in non-trivial applications e.g. multi-phase concurrent incremental compiler, SEDA based server etc.

Now take a look at the equivalent Producer/Consumer example in Scala.

import actors._
import actors.Actor._
import util.Random

case class SimpleMessage(num: Long)

class Producer(c: Consumer) extends Actor{
val random = new Random(System nanoTime)
def act = {
loop{
val num = produce
println("Sending: " + num )
c ! SimpleMessage(num) // asynchronous message passing
}
}
def produce(): Long = {
Thread sleep 400
return random.nextLong
}
}
class Consumer() extends Actor{
def act = {
loop{
receive{ //blocks here
case SimpleMessage(num) => println("Received: " + num);
}
}
}
}
object PCTest {
def main(args : Array[String]) : Unit = {
var c = new Consumer()
var p = new Producer(c)
c.start;p.start
}
}

Even if we don't compare the amount of code, the Scala code above is much more clear in terms of its functionality. In Scala, Actors can be mapped to a single native thread with 'receive' (similar to Thread#wait()) or we can replace 'receive' with 'react' which is event based invocation but doesn't cost a blocked thread. The code within 'react' is executed by any non-blocked thread from a pre-created thread-pool. Just a single change and your application is scalable!

The Java example code above can be equally trivialized with the util.concurrent BlockingQueue, but the important point to take away is, writing shared memory concurrency code is inherently difficult and error-prone. With JDK1.7 we will get similar fork/join abstraction in Java itself (JSR166y), which will add new alternative to how we design and write concurrent applications.

Scala borrowed Actors from Erlang and similar libraries exist for Java as well. If you are curious about interesting details on Actor based OO concurrency implementation in Java, take a look at some of the thoughts Sebastian is sharing with his ConcurrentObjects library.

1 comment:

Alex said...

Thank you for interesting post Nirav. I have a question. Java model can be easily modified to support some fixed side buffer and also variable number of producers and consumers. My understanding of Actor framework is quite limited could you please show how could you solve this slightly extended problem with actors

Regards, Alex