Sunday, March 29, 2015

How CAS (Compare And Swap) in Java works?

Before we dig into CAS (Compare And Swap) strategy and how is it used by atomic constructs like AtomicInteger, first consider this code:
public class MyApp
{
    private volatile int count = 0;
    public void upateVisitors() 
    {
       ++count; //increment the visitors count
    }
}

This sample code is tracking the count of visitors to the application. Is there anything wrong with this code? What will happen if multiple threads try to update count? Actually the problem is simply marking count as volatile does not guarantee atomicity and ++count is not an atomic operations. To read more check this.

Can we solve this problem if we mark the method itself synchronized as shown below:
public class MyApp
{
    private int count = 0;
    public synchronized void upateVisitors() 
    {
       ++count; //increment the visitors count
    }
}

Will this work? If yes then what changes have we made actually?
Does this code guarantee atomicity? Yes.
Does this code guarantee visibility? Yes.

Then what is the problem?
It makes use of locking and that introduces lot of delay and overhead. Check this article. This is very expensive way of making things work.

To overcome these problems atomic constructs were introduced. If we make use of an AtomicInteger to track the count it will work.
public class MyApp
{
    private AtomicInteger count = new AtomicInteger(0);
    public void upateVisitors() 
    {
       count.incrementAndGet(); //increment the visitors count
    }
}

The classes that support atomic operations e.g. AtomicInteger, AtomicLong etc. makes use of CAS. CAS does not make use of locking rather it is very optimistic in nature. It follows these steps:
  • Compare the value of the primitive to the value we have got in hand.
  • If the values do not match it means some thread in between has changed the value. Else it will go ahead and swap the value with new value.

Check the following code in AtomicLong class:
public final long incrementAndGet() {
    for (;;) {
        long current = get();
        long next = current + 1;
        if (compareAndSet(current, next))
          return next;
    }
}

In JDK 8 the above code has been changed to a single intrinsic:
public final long incrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}

What advantage this single intrinsic have?
Actually this single line is JVM intrinsic which is translated by JIT into an optimized instruction sequence. In case of x86 architecture it is just a single CPU instruction LOCK XADD which might yield better performance than classic load CAS loop.

Now think about the possibility when we have high contention and a number of threads want to update the same atomic variable. In that case there is a possibility that locking will outperform the atomic variables but in realistic contention levels atomic variables outperform lock. There is one more construct introduced in Java 8, LongAdder. As per the documentation:
This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.
So LongAdder is not always a replacement for AtomicLong. We need to consider the following aspects:

  • When no contention is present AtomicLong performs better.
  • LongAdder will allocate Cells (a final class declared in abstract class Striped64) to avoid contention which consumes memory. So in case we have a tight memory budget we should prefer AtomicLong.

That's all folks. Hope you enjoyed it.

Tuesday, March 24, 2015

Is multi-threading really worth it?

"Premature optimization is the root of all evil" - Donald E. Knuth

This article is in no way against the usage of multithreading if we really need it. This is just an attempt to explain the cost multithreading comes with as there ain't no such thing as a free lunch. I have seen many examples where multithreading is used in real life projects in name of making things faster. We should always benchmark these things to conclude whether concurrent code has really made our code faster or not. It is quite possible that sometimes it may introduce a lot of delay and its better to avoid then. There is always a context switch overhead and a thread needs more resources to run.

I will explain it with one very simple example I am running on my machine with Intel Core i5 processor with 4 GB of DDR3 Ram. The example is regarding a very simple counter class having only variable count as:
public class Counter {
    long count;
    public Counter(long count) {
        this.count = count;
    }
    void  incrementByOne() { count++; }
    long getCount() { return count; }
}

This class will be used to increment the counter to 1 billion (1,000,000,000) as shown below:
public static void main(String[] args) {
    Counter counter = new Counter(0);
    long startTime = System.currentTimeMillis();
    for (long index = 0; index<1000000000L; index++) {
        counter.incrementByOne();
    }
    long endTime = System.currentTimeMillis();
    System.out.println("Time taken: " + (endTime - startTime) + " ms");
    System.out.println("Value is: " + counter.getCount());
}

There is only one thread which is going to increment the counter to one billion. The time taken on average is 2450 ms. I have also printed the value of count to ensure that count is incremented to one billion successfully and nothing went wrong during it.Next thing I am going to try is to mark the count variable in Counter class volatile and will again run the same code and note the time taken.
public class Counter {
    volatile long count;
    public Counter(long count) {
        this.count = count;
    }
    void  incrementByOne() { count++; }
    long getCount() { return count; }
}

This takes on average time of 8680 ms, so time is increased by a factor of 3.54. We also have atomic classes to use to increment a counter. Lets write another counter class making use of AtomicLong as:
public class AtomicCounter {
    private AtomicLong count;

    public AtomicCounter() {
        count = new AtomicLong(0);
    }
    void  incrementByOne() { count.incrementAndGet(); }
    long getCount() { return count.get(); }
}

We will now use this class for single thread only to check the time taken.
public static void main(String[] args) {
        AtomicCounter counter = new AtomicCounter();
        long startTime = System.currentTimeMillis();
        for (long index = 0; index<1000000000L; index++) {
            counter.incrementByOne();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Time taken: " + (endTime - startTime) + " ms");
        System.out.println("Value is: " + counter.getCount());
    }




The time taken on average while using atomic long is 8720 so time is increased by a factor of 3.55. Next example we will try is to use Lock to implement the similar counter. We will use the same Counter class but we will make use of Lock while increasing the counter as:
public static void main(String[] args) {
        Counter counter = new Counter(0);
        Lock lock = new ReentrantLock();
        long startTime = System.currentTimeMillis();
        for (long index = 0; index < 1000000000L; index++) {
            lock.lock();
            try {
                counter.incrementByOne();
            } finally {
                lock.unlock();
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Time taken: " + (endTime - startTime) + " ms");
        System.out.println("Value is: " + counter.getCount());
    }

This time it takes an average time of 28340 which means time is increased by a whopping multiple of 11.5 and the fun part is we are still using single thread. Lets now move to two threads sharing the same instance of AtomicCounter as:
public static void main(String[] args) {
        AtomicCounter counter = new AtomicCounter();

        Thread thread1 = new Thread(() -> {
            for (long index = 0; index < 500000000L; index++) {
                counter.incrementByOne();
            }
        });
        Thread thread2 = new Thread(() -> {
            for (long index = 0; index < 500000000L; index++) {
                counter.incrementByOne();
            }
        });
        long startTime = System.currentTimeMillis();

        thread1.start();
        thread2.start();
        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Time taken: " + (endTime - startTime) + " ms");
        System.out.println("Value is: " + counter.getCount());
    }

When we were having single thread time taken was 8720 and when we test it for two threads it took an average time of 20687 almost 2.3 times more time. Now what time will be taken when using two threads with locks.
public static void main(String[] args) {

        Counter counter = new Counter(0);
        Lock lock = new ReentrantLock();

        Thread thread1 = new Thread(() -> {
            for (long index = 0; index < 500000000L; index++) {
                lock.lock();
                try {
                    counter.incrementByOne();
                } finally {
                    lock.unlock();
                }
            }
        });

        Thread thread2 = new Thread(() -> {
            for (long index = 0; index < 500000000L; index++) {
                lock.lock();
                try {
                    counter.incrementByOne();
                } finally {
                    lock.unlock();
                }
            }
        });

        long startTime = System.currentTimeMillis();

        thread1.start();
        thread2.start();
        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Time taken: " + (endTime - startTime) + " ms");
        System.out.println("Value is: " + counter.getCount());
    }

Here is the summary of the result:

Threads Time Taken (in milliseconds)
Single Thread2450
Single Thread using volatile8680 (3.54 times)
Single Thread using AtomicLong8720 (3.55 times)
Single Thread using Lock28340 (11.5 times)
Two Threads using AtomicLong20687
Two threads using Lock90245

So moral of the story is that concurrency comes with its own cost and we need to use it when we really need it. That's all for now. Enjoy!!

Sunday, March 22, 2015

Abstract class vs Interfaces in Java

After introduction of default methods in Java 8, even interfaces can have methods with implementation. The major motivation for introduction of default methods is to add new methods (with body) to the existing interfaces without breaking the existing implementation of those interfaces. Read this post.

An abstract class is a class that is marked abstract and it may or may not have abstract methods. This cannot be instantiated but it can be subclassed. Consider an example of a class BinarySearchTree which has methods addNode/removeNode etc. We know AVLTree and RedBlackTree are also binary search trees but they support rotation. We can create an abstract class RotatableBinarySearchTree which extends BinarySearchTree and also introduce two methods rotateLeft and rotateRight. Now the trees AVLTree and RedBlackTree can extend this abstract class RotatableBinarySearchTree.

Abstract class vs Interface
There are following similarities or differences between the two:

Abstract classInterface
Cannot be instantiated.Same for interfaces.
Can contain mix of declared and implemented methods.Same for interfaces.
We can declare fields that are not static and final.All fields are by default public, static and final.
We can declare concrete methods which are public, protected or private.All methods (declared or defined) are by default public.
We can only extend one (abstract or not)A class can implement any number of interfaces.

Now the question is when to use abstract class and when to use interface. If you are in doubt always prefer interface but here are the guidelines that can help.

Prefer abstract class if:

  • We want to share code among many closely related classes. An example if of AVLTree and RedBlackTree I mentioned before.
  • If we have a set of classes that share common methods or fields or they require access modifier other than public, in that case abstract class can be a good fit.
  • If we want to declare non-static or non-final fields. This enables us to define methods that can access and modify the object's state to which they belong.

Prefer interface if: In my opinion use interface in all other scenarios. For example:

  • If we expect unrelated classes to implement our interface e.g. Serializable is implemented by many unrelated classes.
  • If we want to specify only the behavior of a particular data type and not concerned about its behavior e.g. Tree interface which has methods add, remove, height, size etc. which can be implemented by various tree classes.
  • If we want to use multiple inheritance.

We know that if a class implements an interface it must implement all its methods but in case of an abstract class if subclass implements all the abstract methods of super class it is fine else the subclass itself must be declared abstract. Suppose we got a situation where a class wants to implement an interface but does not want to implement all its methods. For example:
public interface Demo{  
    void methodA();
    void methodB();
    void methodC();
    void methodD();
}

public class DemoClass implements Demo{
 public void methodA(){
         System.out.println("in methodA");
        }
 public void methodB(){
         System.out.println("in methodB");
        }
}

This will lead to compilation problem as all the methods of interface Demo are not implemented. In such a situation we can create an abstract class which implements the interface and provides implementation body for other two methods.
public interface Demo{  
 void methodA();
 void methodB();
 void methodC();
 void methodD();
}
abstract class AbstractDemo implements Demo {
 public void methodC(){
      System.out.println("in methodC");
 }
 public void methodD(){
      System.out.println("in methodD");
 }
}
public class DemoClass exends AbstractDemo{
 public void methodA(){
         System.out.println("in methodA");
    }
 public void methodB(){
         System.out.println("in methodB");
    }
}

An abstract class can have static fields and methods and we can use them with a class reference as AbstractClass.staticMethod() as we do with any regular class. That's all folks. Hope you liked it :).

Friday, March 20, 2015

Volatile: Atomicity, Visibility and Ordering

Disclaimer: I don't claim to be an expert in this field but I will try to explain the concepts in my capability. If you feel something is wrong or can be improved please drop an comment. I will be more than happy to accept and include your suggestions as it will help all of us. Thanks.

You may be wondering why do we need one more article for exploring volatile? Well reason is: because it is probably the most confused and not-well-understood construct in concurrency. I have seen so many blogs explaining about volatile but still most of them feel incomplete or too difficult to understand. I will start with some of the most important aspects in concurrency:

Atomicity
Atomicity is all about indivisible operations i.e. they will either happen completely or will not happen at all. The best example of atomic operation in Java is assignment of a value to a variable.

Visibility
Visibility is about one aspect: whether the changes (or effects) made by one thread to a shared variable will be visible to other threads or not?

Ordering
Ordering is all about whether the order of instructions in source code can be altered by the compiler in name of optimization or not. There is a possibility that the actions in one thread can occur out of order with respect to another.

Now consider certain examples to check out these aspects.
public class MyApp
{
    private int count = 0;
    public void upateVisitors() 
    {
       ++count; //increment the visitors count
    }
}
Hint: read-modify-write


The sample code has a method that tries to update the number of visitors to an application (web page). The problem with this code is the instruction ++count is not atomic. It is composed of three separate instructions:

temp = count;   (read)
temp = temp + 1;   (modify)
count = temp;  (write)

So this instruction can be pre-empted by another thread while one thread is executing this. So this is not atomic instruction. Suppose value is 10. Consider the following sequence of execution:

Thread OneThread Two
temp = 10;
temp = 10 + 1= 11
temp = 10 ( as count is still 10)
temp = temp + 1 = 11
count = 11 (temp is 11)
count = 11 (temp is 11)

Here we should observe one thing: with some unlucky timing each thread read the same value (10) and add one to it and then each set the counter to 11. So an increment got lost along the way. And this possibility when the actual output depends on the thread interleaving is known as race condition. So which aspects of concurrency are missing here? What about lack of atomicity? Consider one more example of creating singleton (bad way of course):
public Singleton getInstance()
{
   if(_instance == null)
   { 
      _instance = new Singleton();
   }
}
Hint: check-then-act

Now again there is a possibility that two threads may notice that instance is null and then both can enter into if block. This will lead to creation of two instances. Here again the problem is the if block is not atomic and also changes in instance are not visible to other threads. So this section which must not be executed by more than on thread at same time is known as Critical Section. So we defnitely need to control access to critical section and for that we can use synchronized block and methods.

Atomicity Again
To ensure atomicity we generally make use of locking to ensure mutual exclusion. Consider the following example of a bank account using synchronized methods:
class BankAccount {
 private int accountBalance;
 synchronized int getAccountBalance() {
    return accountBalance;  
 }
 synchronized void setAccountBalance(int b) throws IllegalStateException {
    accountBalance = b;
    if (accountBalance < 0) {
     throw new IllegalStateException("Sorry but account has negative Balance");
    }
 }
 void depositMoney(int amount) {
    int balance = getAccountBalance();
    setAccountBalance(balance + amount);
 }
 void withdrawMoney(int amount) {
    int balance = getAccountBalance();
    setAccountBalance(balance - amount);
 }
}

All the access to the shared variable balance are guarded by locks so no problem of data race. Is there anything wrong with this class? Well yes there is. Suppose one thread calls depositMoney(50) and another thread calls withdrawMoney(50) and there is an initial balance of 100. Ideally there should be a balance of 100. But that is not guaranteed:
  • The method depositMoney sees a value of 100 for balance.
  • Then withdrawMoney method sees a value of 100 for balance and withdraws 50 and leaves a balance of 50.
  • Finally the method depositMoney uses the balance it saw previously to calculate a new balance of 150. Note one thing that the change in balance is not visible to it.
So again due to lack of atomicity an update is lost. If both of the methods are declared synchronized the lock will be ensured for entire duration of method and changes will take place atomically.

Visibility Again
If action of one thread is visible to another thread then the result of all its actions are observed by other threads as well. Consider the following example:
public class LooperThread extends Thread
{
    private boolean isDone = false;
    public void run() 
    {
       while( !isDone ) {
          doSomeWork();
       }
    }
    public void stopWork() {
       isDone = true;
    }
}

What is missing here? Assume an instance of LooperThread is running while main thread calls stopWork() to stop it. There is no synchronization between the two threads. The compiler may detect that no writes are performed to isDone in the first thread and may decide to read isDone only once and then BOOM!! Some JVMs may do this and may transform it into infinite loop. The problem is clearly lack of visibility.

Ordering Again
Ordering is all about the order in which things are seen to occur. Consider the following example:

Initialization (Both not volatile)Thread One (not synchronized)Thread Two (not synchronized)
int value = 0value = 1if( result == true)
boolean result = falseresult = true;sysout(“value = “ + value)

In the above scenario is it possible for thread two to print value = 0?  Well Yes it is possible. It is possible that result=true may come before value=1 in compiler reordering. Also value=1 might not become visible in thread two and then thread two will load value=0. What can be done to fix it? If we make value volatile will this fix the problem?

CPU Architecture (multi level of RAMs)
CPUs generally have multiple cores now a days and threads will be running on different cores. Also there are different level of cache memories as demonstrated in the diagram below:

When a volatile is written by any thread in a particular core that value needs to be updated for all other cores as well because each core has its own cache that will have stale value of the variable. The messages are passed to all cores to update the values. The local caches of all cores are not flushed as such because it is done by cache coherence protocol and it is hardware specific.

Volatile (a younger sibling of synchronized block)
As per Java documentation if a variable is declared volatile then Java Memory Model (after JDK 5) ensures that all threads see a consistent value for the variable. Volatile is a cousin of synchronization in a way that reading a volatile is like entering into synchronized block and writing a volatile variable is like exiting from it. When a volatile is written the value is written to main memory and not to local processor cache and all the other caches of other cores are informed of this change by message passing. Volatile is not atomic.
Volatile provides ordering and visibility and does not provide mutual exclusion or atomicity guarantee. Locking guarantees atomicity, visibility and ordering. So volatile is not a substitute for synchronization.
Volatile read and writes
Volatile provided ordering guarantee and it means the generated instructions by compiler cannot have action results in different order other than the order defined by instructions of actual source code. Though the order of generated instructions can be different from original order of source code but resultant effect has to be in same order. We need to also observe the following point regarding read and write from Java Doc:
when a thread reads a volatile variable, it sees not just the latest change to the volatile, but also the side effects of the code that led up the change.

We need to understand the following points regarding the read and writes of a volatile:

  • When one thread writes to a volatile variable, and another thread sees that write, the first thread is telling the second about all of the contents of memory up until it performed the write to that volatile variable.
  • Here, Thread 2 sees the contents of all the threads as seen by them before they are going to write to ready. So the contents of memory of all those threads would be visible to Thread 2, after it reads the value true for ready.
Image is taken from Jeremy's blog.

Can we declare a final volatile volatile?
What do you think? If a variable is final we cannot change its value and volatile is all about making sure changes to a share variable are visible to other threads. So it is not allowed and will result in compilation error.

Why do we declare long/double volatile in concurrent programming?
By default reading/writing long or double variables is not atomic in nature. Non-volatile long or double is treated as two separate writes: one to each 32-bit half. It can result in a situation where a thread sees the first 32 bits of a 64 bit value from one write and the second sees 32 bits from another write. Write and read of volatile long and double are always atomic.

Now consider the first example again where we have marked the counter volatile:

public class MyApp
{
    private volatile int count = 0;
    public void upateVisitors() 
    {
       ++count; //increment the visitors count
    }
}

Before coming to any conclusion the point to remember is "Volatile guarantee visibility and ordering only". So what do you think? Will this example fail at all?

Volatile vs Atomic classes
If we make the count variable atomic will this work? Yes it will work and its better to make use of Atomic classes for incrementing or decrementing a primitive. AtomicInteger actually makes use of volatile and CAS (Compare And Sweep) for thread-safe implementation of integer counter. Also check this post to read about LongAdder.

So I finally close this topic. I hope you enjoyed the post.

Wednesday, March 18, 2015

Java Concurrency Problem: Accumulator to accumulate results of multiple threads.

I recently encountered one very interesting problem related to Java Concurrency. I will try to explain the problem in brief. There can be many threads executing in parallel and each will be computing some result but the output will be one of the two possible values: A and B (say). Now we want to keep these two values along with the number of threads that produced this value in a map. For example suppose there were 1000 threads and after computations 400 threads produced value A and 600 produced value B then the map should have: {A,600} and {B,400}.


The solution was refined in various iterations. Lets start with the very first solution which I will name as AccumulatorOne.

public class AccumulatorOne {
 
 private static final String[] NAMES = { "A", "B" };
 private static final int NB_THREADS = 1_000;
 private final Map<String, Integer> countsMap = new HashMap<>();
 private static final Lock readWriteLock = new ReentrantLock(true);

 public void testIt() {
  ExecutorService executor = Executors.newFixedThreadPool(NB_THREADS);
  for (int i = 0; i < NB_THREADS; i++) {
   Runnable task = new WorkerThread();
   executor.submit(task);
  }
  executor.shutdown();
  try {
   executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
  }
  System.out.println(countsMap);
 }

 private void accumulate(String name) {
  readWriteLock.lock();
  try {
   Integer cnt = countsMap.get(name);
   if (cnt == null) {
    countsMap.put(name, 1);
   } else {
    countsMap.put(name, cnt + 1);
   }
  } finally {
   readWriteLock.unlock();
  }
 }

 private class WorkerThread implements Runnable {
  @Override
  public void run() {
   accumulate(NAMES[ThreadLocalRandom.current().nextInt(0, NAMES.length)]);
  }
 }
}

This can be called/tested using the following class:

public class AccumulatorMainApp {
 public static void main(String[] args) {
  AccumulatorOne accumulator = new AccumulatorOne();
  accumulator.testIt();
 }
}

Now if we observe the problem carefully we notice few things:
  • We probably do not need any lock as we can make use of ConcurrentHashMap.
  • We can use AtomicInteger to keep track of count for values A and B.
  • We can also use method computeIfAbsent of ConcurrentHashMap

The second version is AccumulatorTwo as:

public class AccumulatorTwo {

 private static final String[] NAMES = { "A", "B" };
 private static final int NB_THREADS = 1000;
 private final Map<String, AtomicInteger> countsMap = new ConcurrentHashMap<>();

 public void testIt() {
  ExecutorService executor = Executors.newFixedThreadPool(NB_THREADS);
  for (int i = 0; i < NB_THREADS; i++) {
   Runnable task = new WorkerThread();
   executor.submit(task);
  }
  executor.shutdown();
  try {
   executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
  }
  System.out.println(countsMap);
 }

 private void accumulate(String name) {
  countsMap.computeIfAbsent(name, k -> new AtomicInteger()).incrementAndGet();
 }

 private class WorkerThread implements Runnable {
  @Override
  public void run() {
   accumulate(NAMES[ThreadLocalRandom.current().nextInt(0, NAMES.length)]);
  }
 }
}

The code seems pretty neat. Can we improve it further? Yes. It seems that the LongAdder is the better candidate in case of high contention as mentioned in Java Doc:
This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.
Then we get third and final version of it as:

public class AccumulatorThree {
 private static final String[] NAMES = { "A", "B" };
 private static final int NB_THREADS = 1000;
 private final Map<String, LongAdder> countsMap = new ConcurrentHashMap<>();

 public void testIt() {
  ExecutorService executor = Executors.newFixedThreadPool(NB_THREADS);
  for (int i = 0; i < NB_THREADS; i++) {
   Runnable task = new WorkerThread();
   executor.submit(task);
  }
  executor.shutdown();
  try {
   executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
  }
  System.out.println(countsMap);
 }

 private void accumulate(String name) {
  countsMap.computeIfAbsent(name, k -> new LongAdder()).increment();
 }

 private class WorkerThread implements Runnable {
  @Override
  public void run() {
   accumulate(NAMES[ThreadLocalRandom.current().nextInt(0, NAMES.length)]);
  }
 }
}

And it can be used as shown below:
public class AccumulatorMainApp {
 public static void main(String[] args) {
  AccumulatorThree accumulatorThree = new AccumulatorThree();
  accumulatorThree.testIt();
 }
}

Let me know your comments/feedback. I hope you enjoyed this post.

Thursday, March 12, 2015

Computing execution time using CountDownLatch and CyclicBarrier

A CountDownLatch is synchronizer that allows one or more threads to wait until a set of operations being performed in other threads completes. A CyclicBarrier is a synchronizer that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

I first encountered timer utility to compute time using CountdownLatch in amazing book  "Concurrency in Practice" by Brian Goetz.
public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task)
        throws InterruptedException {

        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {

                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException ignored) { }
                }
            };
            t.start();
        }
        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end-start;
    }
}




Using the similar idea, I have written a utility class which can compute time taken when a code is making use of CyclicBarrier or CountDownLatch. The utility class has two methods: elapsedTimeUsingCountDownLatch and elapsedTimeUsingCyclicBarrier as shown below:
public class ConcurrentExecutionActionTimer {
 /***
  * This method captures the time taken by all worker threads to execute. The executor that is passed to the time must allow for the creation of   at least as many threads as the
  * given concurrency level or the test will never complete. This is known as thread starvation.
  * @param executor to execute the action
  * @param concurrency level representing the number of actions to be executed concurrently
  * @param action runnable representing the action.
  * @return time taken
  * @throws InterruptedException
  */
 public static long elapsedTimeUsingCountDownLatch(Executor executor, int concurrency, final Runnable action) throws InterruptedException
 {
  final CountDownLatch ready = new CountDownLatch(concurrency);
  final CountDownLatch start = new CountDownLatch(1);
  final CountDownLatch done = new CountDownLatch(concurrency);
  
  for(int i=0; i<concurrency; i++ ){
   executor.execute(new Runnable() {
    
    @Override
    public void run() {
     ready.countDown(); //Tell timer we are ready.
     
     try {
      start.await(); //Wait till peers are ready.
      action.run();
     } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
     } finally {
      done.countDown(); //Tell timer we are done.
     }
    }
   });
  }
  
  ready.await();  //Wait for all workers to be ready
  long startNanoTime = System.nanoTime();
  start.countDown();  //And here they go!!
  done.await();   // Wait for all workers to finish.
  return System.nanoTime() - startNanoTime;
  
 }
 
public static long elapsedTimeUsingCyclicBarrier(Executor executor, int concurrency, final Runnable action) throws InterruptedException, BrokenBarrierException
 {
  final Runnable barrierAction = new Runnable() {
   @Override
   public void run() {
    System.out.println("Condition of barrier is met.");
   }
  };
  
  final CyclicBarrier barrier = new CyclicBarrier(concurrency + 1, barrierAction);
  
  for(int i=0; i<concurrency; i++ ){
   executor.execute(new Runnable() {
    @Override
    public void run() {
     try {
      System.out.println("Waiting at barrier.");
      barrier.await();
      action.run();
      //Cyclic barrier gets reset automatically. Again wait for them to finish.
      barrier.await();
     } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
     } catch (BrokenBarrierException e) {
      e.printStackTrace();
     } 
    }
   });
  }
  barrier.await();
  long startNanoTime = System.nanoTime();
  barrier.await();
  return System.nanoTime() - startNanoTime;
 }




Now we can use the above utility class as follows:
class Worker implements Runnable {
 @Override
 public void run() {
  System.out.println("Doing work.");
  for(int i=0; i<20; i++) {
   try {
    Thread.sleep(500);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
  System.out.println("Finished.");
 }
}

public class TimerExample {
 public static void main(String[] args) {
  //Executor is replacement for common thread idiom: (new Thread(r)).start() to e.execute(r)
  ExecutorService executor = Executors.newFixedThreadPool(10);
  Worker action = new Worker();
  int concurrency = 5;
  try {
   //long elapsedTime = ConcurrentExecutionActionTimer.elapsedTimeUsingCountDownLatch(executor, concurrency, action);
   long elapsedTime = ConcurrentExecutionActionTimer.elapsedTimeUsingCyclicBarrier(executor, concurrency, action);
   double seconds = (double)elapsedTime / 1000000000.0;
   System.out.println("Time Taken approximately: " + seconds + "seconds.");
  } catch (InterruptedException | BrokenBarrierException e) {
   e.printStackTrace();
  }
 }
}

That is it. I hope you enjoyed this post.

Wednesday, March 11, 2015

Why and How to use Phaser in Java?

Before we learn about Phaser its advisable to understand CyclicBarrier and CountdownLatch in Java. As we know a CountdownLatch is not reusable whereas a CyclicBarrier is reusable but not very flexible. In both of them number of registered parties (threads) cannot vary whereas they can vary in Phaser.

A Phaser is best of both worlds so we can say:

Phaser = CountdownLatch + CyclicBarrier.




How parties register?
In a Phaser tasks may be registered at anytime and can optionally be deregistered upon any arrival. Like a CyclicBarrier tasks may be repeatedly awaited and method arriveAndAwaitAdvance has similar effect as await method  of CyclicBarrier. Each generation of a phaser is represented by a phase number which starts from zero to Integer.MAX_VALUE and then again is wrapped to zero.

How synchronization works?
The methods arrive and arriveAndDeregister record arrival and return the arrival phase number (the phase number to which this arrival is applicable) without blocking. When the final party (thread) for a particular phase arrives an action (optional action) may be performed and phase is advanced (incremented).

The method awaitAdvance(int phaseNumber) takes arrival phase number and it returns when the phaser advances to (or already is at) a different phase.

How can a phaser terminate?
A phaser terminates when the method onAdvance returns true. In default implementation of this method it returns true when all the partied have deregistered and number of registered parties becomes zero. We can check whether a phaser has terminated or not by calling method isTerminated on phaser instance.



What do we mean by tiering in a phaser?
A phaser may be tiered that is constructed in tree structures to reduce contention. If number of parties in a phaser is huge they may suffer from heavy contention costs, in that case we can create group of subphasers which share a common parent. This will increase the output but may cost a little overhead.

A Phaser is a synchronizer that can be used to synchronize a batch of threads (parties) where each party can register in the batch with phaser and then use the phaser to have them blocked until every thread in the batch has arrived (notified) the phaser and at that point any blocked thread will resume execution.

CyclicBarrier vs CountdownLatch vs Phaser


CountdownLatchCyclicBarrierPhaser
Fixed number of partiesFixed number of partiesDynamic number of parties
Non-cyclic in nature hence not reusablecyclic in nature hence reusableReusable
Can be advanced using countDown (advance) and await (must wait)Cannot be advancedCan be advanced using relevant methods.

Example
Lets us consider an example of a buffer which can be operated upon by multiple writers to write some value in this memory and it is timely cleaned by a single cleaner task. Now while cleaner is cleaning the memory locations in the buffer all writers must wait whereas they are all free to write once cleaning is over. Lets first define an interface:
public interface IBuffer {
    public void write(String value);
    public void cleanUp();
    public int size();
}



I am not in favor of making much use of low level constructs like wait and notify, so lets first see an ugly implementation:
public class UglyBuffer implements IBuffer {
    private volatile long lastFlushTime = System.currentTimeMillis();
    private final Object flushMonitor = new Object();
    private final Queue<String> bufferMemory = new ConcurrentLinkedQueue<>();

    public void write(String value){
        long entryTime = System.currentTimeMillis();
        synchronized (flushMonitor) {
            while(lastFlushTime <= entryTime) {
                try {
                    flushMonitor.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        // Seems like all writes can go now.
        bufferMemory.add(value);
        System.out.println("Value added: " + value);
    }

    public void cleanUp() {
        // Only one reader will be there that will read all items from bufferMemory.
        synchronized (flushMonitor) {
            while(!bufferMemory.isEmpty()){
                String item = bufferMemory.remove();
                //System.out.println("Removed: " + item);
            }
            System.out.println("Clean up over");
            lastFlushTime = System.currentTimeMillis();
            flushMonitor.notifyAll();
        }
    }

    public int size() {
        return bufferMemory.size();
    }
}



Now this implementation can be tested as below:
public class BufferMainApp {
    public static void main(String[] args) {
        System.out.println("Starting");
        IBuffer buffer = new UglyBuffer();
        Set<Thread> writers = initiateRandomWriters(buffer);
        BufferCleaner cleaner = initiateSingleCleaner(buffer);

        try {
            Thread.currentThread().sleep(60000);    //sleep for 60s
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        stopAllWorkers(writers);
        cleaner.stopCleanup();
    }

    private static void stopAllWorkers(Set<Thread> writers) {
        for (Thread writer : writers) {
            ((BufferWriter)writer).stopWriting();
        }
    }

    private static BufferCleaner initiateSingleCleaner(IBuffer buffer) {
        BufferCleaner cleaner = new BufferCleaner(buffer,"Cleaner");
        cleaner.start();
        return cleaner;
    }

    private static Set<Thread> initiateRandomWriters(IBuffer buffer) {
        Set<Thread> writers = new HashSet<>();
        Random random = new Random();
        BufferWriter writer;
        int writersCount = random.nextInt(12);
        if(writersCount == 0) writersCount =+ 4;     // At least 4 threads.
        for (int i = 0; i < writersCount; i++) {
            writer = new BufferWriter(buffer,"Writer" + i, "Value" + i);
            writers.add(writer);
            writer.start();
        }
        System.out.println("Total writers are: " + writers.size());
        return writers;
    }
}



This may not be the best way to test it but it will server the purpose. We just want to run some random number of threads (we dont know the thread count in advance) and while cleaner is running all writers must wait, once cleanup is over they are all free to go. We can improve upon the buffer implementation by making use of CountdownLatch and Lock constructs as:
public class NeatBuffer implements IBuffer {
    private CountDownLatch flushCDL = new CountDownLatch(1);
    private final Lock flushLock = new ReentrantLock();
    private final Queue<String> bufferMemory = new ConcurrentLinkedQueue<>();

    public void write(String value){
        try {
            flushCDL.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // Clean up is over. Reset the latch.
        flushLock.lock();
        flushCDL = new CountDownLatch(1);
        flushLock.unlock();
        // Writes are green to go now.
        bufferMemory.add(value);
        System.out.println("Value added: " + value);
    }

    public void cleanUp() {
        // Only one reader will be there that will read all items from bufferMemory.
        flushLock.lock();
        while(!bufferMemory.isEmpty()){
            String item = bufferMemory.remove();
            //System.out.println("Removed: " + item);
        }
        flushCDL.countDown();
        flushLock.unlock();
        System.out.println("Clean up over");
    }

    public int size() {
        return bufferMemory.size();
    }
}

This implementation makes use of CountdownLatch which makes all the writers to wait by calling await method, but once cleanup is over it is instantiated again to be reused (by default CountdownLatch is not cyclic) and the countdown is dropped to zero which signals the writes to go ahead. Also it is always recommended to use Lock implementations rather than synchronization block as they perform better because they make use of CAS (Compare And Sweep) technique. We can check this implementation using the same code only the instantiation will be little different:
IBuffer buffer = new NeatBuffer();



Now we can observe the pattern there is a batch of threads which waits for cleaner to finish and the process repeats itself. This is an ideal candidate for a phaser where each time the process is repeated a new phase will be created. As we know this phase number can reach max to Integer.MAX_VALUE and then again will wrap to zero. Let us check out this final implementation which is pretty clean.
public class BetterBuffer implements IBuffer {
    private final Phaser phaser = new Phaser(1); // One party to arrive.
    private final Queue<String> bufferMemory = new ConcurrentLinkedQueue<>();

    public void write(String value){
        int phase = phaser.getPhase();
        phaser.awaitAdvance(phase);
        // Writes are green to go now.
        bufferMemory.add(value);
        System.out.println("Value added: " + value);
    }

    public void cleanUp() {
        while(!bufferMemory.isEmpty()){
            String item = bufferMemory.remove();
            //System.out.println("Removed: " + item);
        }
        System.out.println("Clean up over");
        phaser.arrive();
    }

    public int size() {
        return bufferMemory.size();
    }
}

And I hope we know the changes required to test this class as well. The major motivation to write this post is to help in identifying the pattern where phaser can be useful. This is a mix of CountdownLatch and CyclicBarrier and can be quite handy sometimes.

Thursday, March 5, 2015

CyclicBarrier and its internal implementation in Java

CyclicBarrier is a synchronizer that allows a set of threads to all wait for each other to reach a common barrier point. Like CountDownLatch it also involves a fixed size party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the all the threads have arrived at barrier and it differentiates it from CountDownLatch. We can also specify a method that is executed once per barrier point when all threads have arrived at barrier, and no thread is yet released.

public static void main(String[] args) {
        int threadCount = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        //Executor is replacement for common thread idiom: (new Thread(r)).start() to e.execute(r)

        for (int i=0; i<threadCount; i++) {
            executorService.execute(() ->  {
                try {
                    System.out.println("Waiting at barrier");
                    barrier.await();
                    System.out.println("Working Now..");
                    Thread.sleep(5000);
                    System.out.println("Work is over..");
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }


In the above example five parties are involved. Each of the parties enters into try block and then waits for barrier. Once they all reach at barrier, barrier is broken and all parties (threads) are released to do some work (represented by sleep). Again the barrier is reset and used again. Each of the parties when finished with the work again waits at the barrier. Once all reach at the barrier it is open again and then they continue. The point to note here is how barrier can be reused.

As mentioned we can execute a method once barrier condition is met. In the above example we can specify it as shown below:
CyclicBarrier barrier = new CyclicBarrier(threadCount,() -> {
            System.out.println("Barrier is met.");
        });

CountDownLatch also does the same thing. What is the difference in two?
We can reuse it but CountDownLatch can not be reused. We have reset() method in CyclicBarrier to reset it, so it is useful for events that get repeated whereas CountDownLatch is more suitable for one time activities e.g. loading data from various portals at application start up. Also when we call reset() then threads which have not yet reached the barrier will terminate with BrokenBarrierException.

How CyclicBarrier works internally?
Every time we use the barrier an instance of Generation class is used to represent it. The generation changes when the barrier is tripped or reset. There can be many instances (generations) associated with threads using barrier but only one of them will be active at a time whereas remaining ones will be either broken or tripped. The barrierCommand represents the method to be executed when all parties arrive at barrier, trip represents the condition object to wait on until tripped, lock is what will be used to get barrier status and other activities.
 public class CyclicBarrier {
    private static class Generation {
        boolean broken = false;
    }
    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();
    // Rest of the code.
 }



As I said we can reset the barrier which will reset the barrier to be used again. This method actually breaks the current barrier and creates a new generation (new use of CyclicBarrier).
 public void reset() {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
           breakBarrier();   // break the current generation
           nextGeneration(); // start a new generation
     } finally {
           lock.unlock();
     }
}

As we can expect the method breakBarrier() should be breaking the barrier (setting generation to broken) and waking(signalling)all the threads:
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

And the method nextGeneration() should be instantiating a new Generation instance, updating the status on barrier trip and waking up everyone:
private void nextGeneration() {
   // signal completion of last generation
   trip.signalAll();
   // set up next generation
   count = parties;
   generation = new Generation();
}



As we know we have two variants of method await: one takes nothing other takes time to wait and when times out TimeoutException is thrown. The thread which calls await method waits until all parties have invoked await on this barrier or the specified time elapses. If current party (thread) is not the last to arrive then it is parked until one of the following happens:
  • The last thread arrives; or
  • Some other thread interrupts the current thread; or
  • Some other thread interrupts one of the other waiting threads; or
  • Some other thread times out while waiting for barrier; or
  • Some other thread invokes reset() on this barrier.
In case of timed version one more condition is "the specified time elapses". Both of these method internally makes use of private method doWait and return an int  which is arrival index of current thread.
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
}

public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
 
The method doWait will handle the following cases:
  • If generation is broken throw exception
  • If current thread is interrupted break barrier and throw InterruptedException
  • Decrement the number of parties as one more thread has reached the barrier and 
    • If it is last thread to arrive (index is zero) then execute the method (runnableCommand) supplied at creation time.
    • But if something wrong happens break the barrier (final clause).
  • Loop until tripped or broken or interrupted or timed out.
The method itself explains these cases:
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }



The method isBroken() is pretty simple and self explanatory. It checks the broken property of generation:
public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

The methods getParties() and getNumberWaiting() are also self-explanatory. That's it for now and hope you enjoyed it.

Sunday, March 1, 2015

ExecutorService vs ExecutorCompletionService in Java

You can also check my post on dzone here.

Suppose we have list of four tasks: Task A, Task B, Task C and Task D which perform some complex computation and result into an integer value. These tasks may take random time depending upon various parameters. We can submit these tasks to executor as:
ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));

Then we can iterate over the list to get the computed result of each future:
for (Future future:futures) {
    Integer result = future.get();
    // rest of the code here.
}



Now the similar functionality can also be achieved using ExecutorCompletionService as:
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService );
Then again we can submit the tasks and get the result like:
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));
 
for (int i=0; i<futures.size(); i++) {
    Integer result = executorCompletionService.take().get();
    // Some processing here
}

Now what is the difference between the two?

Suppose task B finished first followed by task C. But task A was still going on. In that case when using ExecutorService the for loop would be waiting for the result of task A to be available. So in case of ExecutorService tasks will be processed in the same order in which they were submitted.



But in later case the tasks will be processed in order the result becomes available, the order tasks are completed. One interesting example is where we want to download some file which can be downloaded from various mirrors. In that case we can quickly get the response from the server which is located closest to us. In that case we can get the first available result and discard the others. Consider the following example from Java Doc:

void solve(Executor e, Collection<Callable<Result>> solvers) 
      throws InterruptedException {
        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
        int n = solvers.size();
        List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
        Result result = null;
        try {
            for (Callable<Result> s : solvers)
                futures.add(ecs.submit(s));
            for (int i = 0; i < n; ++i) {
                try {
                    Result r = ecs.take().get();
                    if (r != null) {
                        result = r;
                        break;
                    }
                } catch(ExecutionException ignore) {}
            }
        }
        finally {
            for (Future<Result> f : futures)
                f.cancel(true);
        }

        if (result != null)
            use(result);
    }

In the above example the moment we get the result we break out of the loop and cancel out all the other futures. One important thing to note is that the implementation of ExecutorCompletionService contains a queue of results. We need to remember the number of tasks we have added to the service and then should use take or poll to drain the queue otherwise a memory leak will occur. Some people use the Future returned by submit to process results and this is NOT correct usage. There is one interesting solution provided by Dr. Heinz M, Kabutz here.

Peeking into ExecutorCompletionService 
When we look inside the code for this we observe that it makes use of Executor, AbstractExecutorService (default implementations of ExecutorService execution methods) and BlockingQueue (actual instance is of class LinkedBlockingQueue<Future<V>>).

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
    // Remaining code..
}



Another important thing to observe is class QueueingFuture which extends FutureTask and in the method done() the result is pushed to queue.

private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

For the curios ones, class FutureTask is base implementation of Future interface with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. And constructor takes RunnableFuture as parameter which is again an interface which extends Future interface and adds only one method run as:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

That is all for now. Enjoy!!