Sunday, January 4, 2009

CountDownLatch for Terracotta

With Java 5, java has inbuilt concurrency library in java.concurrent with classes like CountDownLatch, CylcicBarrier, FutureTask, ExecutorService, LinkedBlockingQueue, ConcurrentHashMap, ReentrantReadWriteLock, which greatly simplifies writing multi-threaded applications. With increasing number of cores you need to write applications which are multi-threaded. With Terracotta this further makes really simple to run such application across more than one JVM effectively giving you more number of threads with slight degradation of performance but near-linear scalability. Terracotta supports some important data structures of "java.utl.concurrent" out of the box these are mainly : LinkedBlockingQueue, ExecutorService, CycliBarrier., FutureTask and of course Locks.


Below I am presenting one more addition to this library : CountDownLatch. CountDownLatch is used to co-ordinate between threads. You pass number of threads in constructor and each thread then calls countDown() method. When you want to get notified that all threads have finished their work you call await() method. This method will wait till all parties have finished and called countDown() method. If you want to write such code in Terracotta enabled applicaiton you have to use CyclicBarrier where each thread calls await() method. But this will cause finished threads to unnecessarily block on barrier. By using CountDownLatch you can "countdown" and exit the thread thus only master or co-ordinator thread needs to block.

Logic implemented is very simple - initiate with number of parties. decrease the counter in countDown method, when reached to zero notify all waiting threads and in await() method "wait()" on object till count is reached to zero.

Below is the source code for it. You need to put class MyCountDownLatch in instrumented classes section as well as define write-lock for the await() and countDown() method. You can download the source-code for the same here.


Main Class



public class MyCountdownLatch {


int count = -1;
public MyCountdownLatch(int count)
{
this.count = count;
}


public synchronized void countDown()
{
count--;
if (count == 0) { notifyAll(); }
}


public synchronized void reset(int count)
{
this.count = count;
}

public synchronized void await() throws InterruptedException
{
if (count == 0) { notifyAll(); return; }
else {
while(count > 0)
{
wait();
}
}
}

}


Test Class TestCountDownLatch


import java.util.Random;

public class TestCountDownLatch {

public static int N=10;

public static MyCountdownLatch startSignal = null;;
public static MyCountdownLatch doneSignal = null;

private static Object lock = new Object();

public static void main(String[] args) {


Runnable runs[] = new Runnable[N];

if(startSignal==null)
{
synchronized (lock) {
startSignal = new MyCountdownLatch(1);
doneSignal = new MyCountdownLatch(N);
}
}


for(int i=0;i {
runs[i] = new Worker(startSignal,doneSignal);
new Thread(runs[i]).start();
}

startSignal.countDown();
try {
doneSignal.await();
} catch (InterruptedException e) {

e.printStackTrace();
}


System.out.println("All thread finished ...");

}


public static class Worker implements Runnable
{
MyCountdownLatch startSignal = null;
MyCountdownLatch doneSignal = null;

public Worker(MyCountdownLatch startSignal, MyCountdownLatch doneSignal)
{
this.startSignal = startSignal;
this.doneSignal = doneSignal;

}

public void run()
{
System.out.println("Waiting for start signal...");
try {
startSignal.await();
} catch (InterruptedException e) {

e.printStackTrace();
}
doWork();
doneSignal.countDown();
}

public void doWork()
{
System.out.println("Starting to work now");
Random random = new Random();
try {
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Completd work");
}

}


}


Config file tc-config.xml



<?xml version="1.0" encoding="UTF-8"?>
<tc:tc-config xsi:schemaLocation="http://www.terracotta.org/schema/terracotta-4.xsd" xmlns:tc="http://www.terracotta.org/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<servers>
<server host="localhost" name="tc-srv01" bind="0.0.0.0">
<data>%(user.home)/terracotta/server-data1</data>
<logs>%(user.home)/terracotta/server-logs1</logs>
<dso-port>9510</dso-port>
<jmx-port>9520</jmx-port>
<l2-group-port>9530</l2-group-port>
<dso>
<garbage-collection>
<enabled>true</enabled>
<interval>300</interval>
<verbose>true</verbose>
</garbage-collection>
</dso>
</server>
</servers>
<clients>
<logs>%(user.home)/terracotta/client-logs1/</logs>
<statistics>%(user.home)/terracotta/server-statistics-%D</statistics>
</clients>
<application>
<dso>
<instrumented-classes>
<include>
<class-expression>*..*</class-expression>
</include>
</instrumented-classes>
<roots>
<root>
<field-name>TestCountDownLatch.startSignal</field-name>
</root>
<root>
<field-name>TestCountDownLatch.doneSignal</field-name>
</root>
</roots>
<locks>
<autolock>
<method-expression>* MyCountdownLatch.countDown(..)</method-expression>
<lock-level>write</lock-level>
</autolock>
<autolock>
<method-expression>* MyCountdownLatch.await(..)</method-expression>
<lock-level>write</lock-level>
</autolock>
</locks>
</dso>
</application>
</tc:tc-config>

1 comment:

msangel said...

save *.class as sourcess is not good idea

actualy i have java decompiler but i dont sure that annotatattion (if was any) restored correctly