Wednesday, March 11, 2015

Why and How to use Phaser in Java?

Before we learn about Phaser its advisable to understand CyclicBarrier and CountdownLatch in Java. As we know a CountdownLatch is not reusable whereas a CyclicBarrier is reusable but not very flexible. In both of them number of registered parties (threads) cannot vary whereas they can vary in Phaser.

A Phaser is best of both worlds so we can say:

Phaser = CountdownLatch + CyclicBarrier.




How parties register?
In a Phaser tasks may be registered at anytime and can optionally be deregistered upon any arrival. Like a CyclicBarrier tasks may be repeatedly awaited and method arriveAndAwaitAdvance has similar effect as await method  of CyclicBarrier. Each generation of a phaser is represented by a phase number which starts from zero to Integer.MAX_VALUE and then again is wrapped to zero.

How synchronization works?
The methods arrive and arriveAndDeregister record arrival and return the arrival phase number (the phase number to which this arrival is applicable) without blocking. When the final party (thread) for a particular phase arrives an action (optional action) may be performed and phase is advanced (incremented).

The method awaitAdvance(int phaseNumber) takes arrival phase number and it returns when the phaser advances to (or already is at) a different phase.

How can a phaser terminate?
A phaser terminates when the method onAdvance returns true. In default implementation of this method it returns true when all the partied have deregistered and number of registered parties becomes zero. We can check whether a phaser has terminated or not by calling method isTerminated on phaser instance.



What do we mean by tiering in a phaser?
A phaser may be tiered that is constructed in tree structures to reduce contention. If number of parties in a phaser is huge they may suffer from heavy contention costs, in that case we can create group of subphasers which share a common parent. This will increase the output but may cost a little overhead.

A Phaser is a synchronizer that can be used to synchronize a batch of threads (parties) where each party can register in the batch with phaser and then use the phaser to have them blocked until every thread in the batch has arrived (notified) the phaser and at that point any blocked thread will resume execution.

CyclicBarrier vs CountdownLatch vs Phaser


CountdownLatchCyclicBarrierPhaser
Fixed number of partiesFixed number of partiesDynamic number of parties
Non-cyclic in nature hence not reusablecyclic in nature hence reusableReusable
Can be advanced using countDown (advance) and await (must wait)Cannot be advancedCan be advanced using relevant methods.

Example
Lets us consider an example of a buffer which can be operated upon by multiple writers to write some value in this memory and it is timely cleaned by a single cleaner task. Now while cleaner is cleaning the memory locations in the buffer all writers must wait whereas they are all free to write once cleaning is over. Lets first define an interface:
public interface IBuffer {
    public void write(String value);
    public void cleanUp();
    public int size();
}



I am not in favor of making much use of low level constructs like wait and notify, so lets first see an ugly implementation:
public class UglyBuffer implements IBuffer {
    private volatile long lastFlushTime = System.currentTimeMillis();
    private final Object flushMonitor = new Object();
    private final Queue<String> bufferMemory = new ConcurrentLinkedQueue<>();

    public void write(String value){
        long entryTime = System.currentTimeMillis();
        synchronized (flushMonitor) {
            while(lastFlushTime <= entryTime) {
                try {
                    flushMonitor.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        // Seems like all writes can go now.
        bufferMemory.add(value);
        System.out.println("Value added: " + value);
    }

    public void cleanUp() {
        // Only one reader will be there that will read all items from bufferMemory.
        synchronized (flushMonitor) {
            while(!bufferMemory.isEmpty()){
                String item = bufferMemory.remove();
                //System.out.println("Removed: " + item);
            }
            System.out.println("Clean up over");
            lastFlushTime = System.currentTimeMillis();
            flushMonitor.notifyAll();
        }
    }

    public int size() {
        return bufferMemory.size();
    }
}



Now this implementation can be tested as below:
public class BufferMainApp {
    public static void main(String[] args) {
        System.out.println("Starting");
        IBuffer buffer = new UglyBuffer();
        Set<Thread> writers = initiateRandomWriters(buffer);
        BufferCleaner cleaner = initiateSingleCleaner(buffer);

        try {
            Thread.currentThread().sleep(60000);    //sleep for 60s
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        stopAllWorkers(writers);
        cleaner.stopCleanup();
    }

    private static void stopAllWorkers(Set<Thread> writers) {
        for (Thread writer : writers) {
            ((BufferWriter)writer).stopWriting();
        }
    }

    private static BufferCleaner initiateSingleCleaner(IBuffer buffer) {
        BufferCleaner cleaner = new BufferCleaner(buffer,"Cleaner");
        cleaner.start();
        return cleaner;
    }

    private static Set<Thread> initiateRandomWriters(IBuffer buffer) {
        Set<Thread> writers = new HashSet<>();
        Random random = new Random();
        BufferWriter writer;
        int writersCount = random.nextInt(12);
        if(writersCount == 0) writersCount =+ 4;     // At least 4 threads.
        for (int i = 0; i < writersCount; i++) {
            writer = new BufferWriter(buffer,"Writer" + i, "Value" + i);
            writers.add(writer);
            writer.start();
        }
        System.out.println("Total writers are: " + writers.size());
        return writers;
    }
}



This may not be the best way to test it but it will server the purpose. We just want to run some random number of threads (we dont know the thread count in advance) and while cleaner is running all writers must wait, once cleanup is over they are all free to go. We can improve upon the buffer implementation by making use of CountdownLatch and Lock constructs as:
public class NeatBuffer implements IBuffer {
    private CountDownLatch flushCDL = new CountDownLatch(1);
    private final Lock flushLock = new ReentrantLock();
    private final Queue<String> bufferMemory = new ConcurrentLinkedQueue<>();

    public void write(String value){
        try {
            flushCDL.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // Clean up is over. Reset the latch.
        flushLock.lock();
        flushCDL = new CountDownLatch(1);
        flushLock.unlock();
        // Writes are green to go now.
        bufferMemory.add(value);
        System.out.println("Value added: " + value);
    }

    public void cleanUp() {
        // Only one reader will be there that will read all items from bufferMemory.
        flushLock.lock();
        while(!bufferMemory.isEmpty()){
            String item = bufferMemory.remove();
            //System.out.println("Removed: " + item);
        }
        flushCDL.countDown();
        flushLock.unlock();
        System.out.println("Clean up over");
    }

    public int size() {
        return bufferMemory.size();
    }
}

This implementation makes use of CountdownLatch which makes all the writers to wait by calling await method, but once cleanup is over it is instantiated again to be reused (by default CountdownLatch is not cyclic) and the countdown is dropped to zero which signals the writes to go ahead. Also it is always recommended to use Lock implementations rather than synchronization block as they perform better because they make use of CAS (Compare And Sweep) technique. We can check this implementation using the same code only the instantiation will be little different:
IBuffer buffer = new NeatBuffer();



Now we can observe the pattern there is a batch of threads which waits for cleaner to finish and the process repeats itself. This is an ideal candidate for a phaser where each time the process is repeated a new phase will be created. As we know this phase number can reach max to Integer.MAX_VALUE and then again will wrap to zero. Let us check out this final implementation which is pretty clean.
public class BetterBuffer implements IBuffer {
    private final Phaser phaser = new Phaser(1); // One party to arrive.
    private final Queue<String> bufferMemory = new ConcurrentLinkedQueue<>();

    public void write(String value){
        int phase = phaser.getPhase();
        phaser.awaitAdvance(phase);
        // Writes are green to go now.
        bufferMemory.add(value);
        System.out.println("Value added: " + value);
    }

    public void cleanUp() {
        while(!bufferMemory.isEmpty()){
            String item = bufferMemory.remove();
            //System.out.println("Removed: " + item);
        }
        System.out.println("Clean up over");
        phaser.arrive();
    }

    public int size() {
        return bufferMemory.size();
    }
}

And I hope we know the changes required to test this class as well. The major motivation to write this post is to help in identifying the pattern where phaser can be useful. This is a mix of CountdownLatch and CyclicBarrier and can be quite handy sometimes.

2 comments:

Anonymous said...

Very good article, here is another nice example of CountDownLatch in Java with nice explanation.

Anonymous said...

Very Nice Article