Saturday, April 18, 2015

Java Concurrency Problem: A Producer which produces N work items and waits for them to be over.

“With great power often comes great confusion.” ― Dan Allen, Seam in Action

Many times we encounter concurrency problems in Java where existing constructs need to be tweaked. Recently I wrote about one such problem where we wanted to accumulate results of multiple threads and these threads could write or update values randomly.

This time I am going to write about a problem where we will have one producer which will produce an N number of items (of course N is not known in advance) and then producer needs to wait for all those N items to be over. Before we move further I would like to stress again that what I am going to present here is oversimplified version of actual problem. I am not going to write complete code rather only those pieces that would make sense in context of this post.

Using ExecutorCompletionService
If you are not familiar then you need to have a look on this post which explains how ExecutorCompletionService differentiates from ExecutorService. My worker will doing some work and that's all.
public class WorkerThread implements Runnable {
    @Override
    public void run() {
        try {
            Thread.currentThread().sleep(1000);  // do some work
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

At first it seems that we can use this to submit jobs and get list of futures, and then later we can get results out of it. When we will be retrieving the results using take() producer will obviously block.
public class WorkProducerUsingECS implements Runnable{
    private final CompletionService service;
    private final List<Future<Integer>> listOfFutures;

    public WorkProducerUsingECS() {
        this.listOfFutures = new ArrayList<>();
        service = new ExecutorCompletionService(Executors.newFixedThreadPool(5));
    }

    @Override
    public void run() {
        produceRandomWorkers();
    }

    private void produceRandomWorkers() {
        Random random = new Random();
        int numberOfWorkers = random.nextInt(20) + 1;
        System.out.println("Workers count: " + numberOfWorkers);
        for (int i=0; i<numberOfWorkers; i++){
             listOfFutures.add(service.submit(new WorkerThread(),1));
        }
    }

    private void getResultAfterWorkIsOver() throws InterruptedException, ExecutionException {
        for(int i=0; i<listOfFutures.size(); i++) {
            Integer result = (Integer) service.take().get();
            System.out.println("Result: "  + result);
        }
    }
}

This can be called using the following code:
public static void main(String[] args) {
   WorkProducerUsingECS producer =  new WorkProducerUsingECS();
   Thread thread = new Thread(producer);
   thread.start();
}

Now the problem is once all the workers are done how can we signal the producer so that it can call getResultAfterWorkIsOver method.

Using CountDownLatch (CDL)
Using latch seems a good option but the problem is we don't know the actual number of worker threads in advance. If it were available we could have simply created a CDL and have producer thread wait (using await method) until all work was complete.

Lets give it a try. We will create a wrapper class WorkerTask on top of class which will take Runnable (to be executed) and an atomic reference to CDL as constructor parameter. An AtomicReference can be updated atomically.
public class WorkerTask implements Runnable {
    private final Runnable runnable;
    private AtomicReference<CountDownLatch> latchAtomicReference;

    public WorkerTask(Runnable runnable, AtomicReference<CountDownLatch> latchAtomicReference) {
        this.runnable = runnable;
        this.latchAtomicReference = latchAtomicReference;
    }

    @Override
    public void run(){
        runnable.run();
        while (latchAtomicReference.get() == null) {
            try {
                Thread.currentThread().sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        latchAtomicReference.get().countDown();
    }
}

In case any worker-thread is over by the time CDL is not set, it will sleep for some time and again check whether it is set or not. If it is set then it will invoke countdown on it. The AtomicReference is used because this can be updated atomically and will not create unintended problems in multi-threaded code.

Another thing to note is I have called run() and not start() on the Runnable passed in constructor as I do not want to spawn a new thread. You can read more here and here. This can be used with producer as:
public class WorkProducerUsingCDL implements Runnable{
    private final AtomicReference<CountDownLatch> latchAtomicReference;

    public WorkProducerUsingCDL() {
        latchAtomicReference = new AtomicReference<>();
    }

    @Override
    public void run() {
        produceRandomWorkers();
    }

    private void produceRandomWorkers() {
        Random random = new Random();
        int numberOfWorkers = random.nextInt(20) + 1;
        System.out.println("Workers count: " + numberOfWorkers);
        for (int i=0; i<numberOfWorkers; i++){
            try {
                createWorkerTask().start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        // Add some delay to simulate some processing
        try {
            Thread.currentThread().sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //By now all workers have been added. Some of them may be over and some may be processing.
        latchAtomicReference.set(new CountDownLatch(numberOfWorkers));

        // Now producer will wait for latch to be over.
        try {
            latchAtomicReference.get().await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // Now all workers are definitely over.
        try {
            processAfterWorkIsOver();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    private Thread createWorkerTask() {
        WorkerTask workerTask = new WorkerTask(new WorkerThread(), latchAtomicReference);
        Thread thread = new Thread(workerTask);
        return  thread;
    }

    private void processAfterWorkIsOver() throws InterruptedException, ExecutionException {
        System.out.println("Work is over by all workers.");
    }

}

And we can verify this code as:
public static void main(String[] args) {
    WorkProducerUsingCDL producerUsingCDL = new WorkProducerUsingCDL();
    Thread thread = new Thread(producerUsingCDL);
    thread.start();
}

One thing to observe in this code is that the number of threads is unknown to us and it is possible that some of the threads have already near to completion by the time we create a CDL and set it in the AtomicReference of CDL. I have not used ExecutorService here knowingly in this example because that would have returned Future and when we invoke get on it, it will block the producer. Here also producer will be blocked but by the await method which is called on CDL in the AtomicReference.

This code should work and in my opinion not a neat version. One more point which is still not discussed is: the process is cyclic. It means every time a cycle starts producer will produce unknown number of threads and will wait for them to be completed. It seems we can also make use of CyclicBarrier to solve this problem. Think for a moment before going further. Can we solve it using CyclicBarrier?

Using Phaser
As we do not have Flexible CDL, there is one construct that will fit the problem and that is Phaser. To know about why and how to use Phaser check this post. We need to pass the phaser reference to worker as:
public class WorkerThread implements Runnable {
    private final Phaser phaser;

    public WorkerThread(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        try {
            Thread.currentThread().sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Work is over.");
        phaser.arrive();
    }
}

Once the work of worker is over it will call arrive method on phaser to notify that this particular worker is done with the work and has arrived on that particular phase. The phaser is a mix of CyclicBarrier and CountDownLatch and when one phase is over, it starts a new phase and the limit is Integer.MAX_VALUE. Once it reaches max value it rounds off to zero and starts again. The producer can be written as:
public class WorkProducerUsingPhaser implements Runnable{
    private final Phaser phaser;
    private final ExecutorService executorService;

    public WorkProducerUsingPhaser() {
        phaser = new Phaser();
        executorService = Executors.newFixedThreadPool(5);
    }

    @Override
    public void run() {
        produceRandomWorkers();
    }

    private void produceRandomWorkers() {
        Random random = new Random();
        int numberOfWorkers = random.nextInt(20) + 1;
        System.out.println("Workers count: " + numberOfWorkers);

        phaser.register();

        for (int i=0; i<numberOfWorkers; i++){
            phaser.register();
            executorService.submit(getWorker());
        }

        phaser.arriveAndAwaitAdvance();

        // Now all workers are definitely over.
        try {
            processAfterWorkIsOver();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    private Thread getWorker() {
        WorkerThread workerTask = new WorkerThread(phaser);
        Thread thread = new Thread(workerTask);
        return  thread;
    }

    private void processAfterWorkIsOver() throws InterruptedException, ExecutionException {
        System.out.println("Work is over by all workers.");
    }
}

Here I have used a thread pool of 5 (you can take any suitable number) and submit all worker threads to ExecutorService. The producer along with all workers is registered to Phaser and then it waits by calling arriveAndAwaitAdvance method. Once every worker is done it calls method arrive and when the last worker calls this method producer is notified and then producer can move ahead. A new phase will also start. This solution seems more clean and suits better.

That is all and I hope you liked it. Please drop your feedback in comments.

No comments: