Thursday, March 5, 2015

CyclicBarrier and its internal implementation in Java

CyclicBarrier is a synchronizer that allows a set of threads to all wait for each other to reach a common barrier point. Like CountDownLatch it also involves a fixed size party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the all the threads have arrived at barrier and it differentiates it from CountDownLatch. We can also specify a method that is executed once per barrier point when all threads have arrived at barrier, and no thread is yet released.

public static void main(String[] args) {
        int threadCount = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        //Executor is replacement for common thread idiom: (new Thread(r)).start() to e.execute(r)

        for (int i=0; i<threadCount; i++) {
            executorService.execute(() ->  {
                try {
                    System.out.println("Waiting at barrier");
                    barrier.await();
                    System.out.println("Working Now..");
                    Thread.sleep(5000);
                    System.out.println("Work is over..");
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }


In the above example five parties are involved. Each of the parties enters into try block and then waits for barrier. Once they all reach at barrier, barrier is broken and all parties (threads) are released to do some work (represented by sleep). Again the barrier is reset and used again. Each of the parties when finished with the work again waits at the barrier. Once all reach at the barrier it is open again and then they continue. The point to note here is how barrier can be reused.

As mentioned we can execute a method once barrier condition is met. In the above example we can specify it as shown below:
CyclicBarrier barrier = new CyclicBarrier(threadCount,() -> {
            System.out.println("Barrier is met.");
        });

CountDownLatch also does the same thing. What is the difference in two?
We can reuse it but CountDownLatch can not be reused. We have reset() method in CyclicBarrier to reset it, so it is useful for events that get repeated whereas CountDownLatch is more suitable for one time activities e.g. loading data from various portals at application start up. Also when we call reset() then threads which have not yet reached the barrier will terminate with BrokenBarrierException.

How CyclicBarrier works internally?
Every time we use the barrier an instance of Generation class is used to represent it. The generation changes when the barrier is tripped or reset. There can be many instances (generations) associated with threads using barrier but only one of them will be active at a time whereas remaining ones will be either broken or tripped. The barrierCommand represents the method to be executed when all parties arrive at barrier, trip represents the condition object to wait on until tripped, lock is what will be used to get barrier status and other activities.
 public class CyclicBarrier {
    private static class Generation {
        boolean broken = false;
    }
    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();
    // Rest of the code.
 }



As I said we can reset the barrier which will reset the barrier to be used again. This method actually breaks the current barrier and creates a new generation (new use of CyclicBarrier).
 public void reset() {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
           breakBarrier();   // break the current generation
           nextGeneration(); // start a new generation
     } finally {
           lock.unlock();
     }
}

As we can expect the method breakBarrier() should be breaking the barrier (setting generation to broken) and waking(signalling)all the threads:
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

And the method nextGeneration() should be instantiating a new Generation instance, updating the status on barrier trip and waking up everyone:
private void nextGeneration() {
   // signal completion of last generation
   trip.signalAll();
   // set up next generation
   count = parties;
   generation = new Generation();
}



As we know we have two variants of method await: one takes nothing other takes time to wait and when times out TimeoutException is thrown. The thread which calls await method waits until all parties have invoked await on this barrier or the specified time elapses. If current party (thread) is not the last to arrive then it is parked until one of the following happens:
  • The last thread arrives; or
  • Some other thread interrupts the current thread; or
  • Some other thread interrupts one of the other waiting threads; or
  • Some other thread times out while waiting for barrier; or
  • Some other thread invokes reset() on this barrier.
In case of timed version one more condition is "the specified time elapses". Both of these method internally makes use of private method doWait and return an int  which is arrival index of current thread.
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
}

public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
 
The method doWait will handle the following cases:
  • If generation is broken throw exception
  • If current thread is interrupted break barrier and throw InterruptedException
  • Decrement the number of parties as one more thread has reached the barrier and 
    • If it is last thread to arrive (index is zero) then execute the method (runnableCommand) supplied at creation time.
    • But if something wrong happens break the barrier (final clause).
  • Loop until tripped or broken or interrupted or timed out.
The method itself explains these cases:
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }



The method isBroken() is pretty simple and self explanatory. It checks the broken property of generation:
public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

The methods getParties() and getNumberWaiting() are also self-explanatory. That's it for now and hope you enjoyed it.

2 comments:

Amritha said...

I have seen various examples about how to use CyclicBarrier but this one goes to next step and explains its internals. Indeed a nice post.

Anonymous said...

You just cannot understand difference between cyclic barrier and countdown latch just be reading theory, you need to see the difference and best way is to write code. here is one more example of how you can use CyclicBarrier in Java.