Tuesday, February 24, 2015

Lock, ReentrantLock, ReentrantReadWriteLock and StampedLock in Java

The Lock implementations in Java are more flexible than using synchronized methods and statements. They support quite different properties and multiple Condition objects. The Lock interface has the following structure:

public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}

When using any of the Lock implementation the lock is released in finally block. The general usage is:

Lock l = ...;
     l.lock();
     try {
         // access the resource protected by this lock
     } finally {
         l.unlock();
     }

The implementation classes for Lock provide additional functionality over synchronized methods and statements e.g. non-blocking or optimistic attempt to acquire a lock using tryLock(), attempt to acquire the lock that can be interrupted (lockInterruptibly()), and attempt to acquire a lock that can timeout (tryLock(long, TimeUnit)). A Lock class can also provide behavior quite different from implicit monitor lock, such as guaranteed ordering, non-reentrant usage, or deadlock detection.

Another interface related to locking is ReadWriteLock which has the following structure:
public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

As per the Java Doc:
ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive.
It provided more flexibility as it exploits the fact that only a single thread can modify whereas multiple threads can read at same time. The actual performance depends upon frequency of read-writes, duration of each operation, and the contention for the data i.e. no of threads that will try to read-write the data at same time.

New Request is for?Other thread is already readingOther thread is already writing
ReadGood to go.Stop and wait.
WriteStop and wait.Stop and wait.

The implementation classes for these interfaces are given below:
Lock: ReentrantLockReentrantReadWriteLock.ReadLock, ReentrantReadWriteLock.WriteLock
ReadWriteLock: ReentrantReadWriteLock




Although the basic operation of a read-write lock is straight-forward, there are many policy decisions that an implementation must make:
  • When both readers and writers are waiting whether to grant read or write lock?
  • Whether readers that request the read lock while a reader is active and a writer is waiting, are granted the read lock.
  • Whether the locks are reentrant: can a thread with the write lock reacquire it? Can it acquire a read lock while holding the write lock? Is the read lock itself reentrant?
  • Can the write lock be downgraded to a read lock without allowing an intervening writer? Can a read lock be upgraded to a write lock?
ReentrantLock
If a thread invokes lock() and does not own the lock then it will successfully acquire the lock and method will return.The method will return immediately if the current thread already owns the lock. This can be checked using methods isHeldByCurrentThread(), and getHoldCount(). In the following example we have two methods that will be invoked by two different threads, but each method will increment the count.

public class Incrementer {
    int count = 0;
    Lock lock = new ReentrantLock(); // Non-fair lock.

    public void incrementByFirstThread() {
        lock.lock();
        try {
            increment();
        } finally {
            lock.unlock();  // In any case lock must be released.
        }
    }

    public void incrementBySecondThread() {
        lock.lock();
        try{
            increment();
        } finally {
            lock.unlock();
        }
    }

    private void increment() {
        for(int i=0; i<1_000; i++) {
            count++;
        }
    }

    public void finished() {
        System.out.println("Finished with value: " + count);
    }
}

This is called by main method as shown below:
public static void main(String[] args) {
        Incrementer incrementer = new Incrementer();

        Thread thread1 = new Thread(() -> {
           incrementer.incrementByFirstThread();
        });

        Thread thread2 = new Thread(() -> {
            incrementer.incrementBySecondThread();
        });

        thread1.start(); thread2.start();
        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        incrementer.finished();
    }

Can you guess the output? If you have noticed there is one method in Lock interface newCondition() which returns a Condition object. Conditions provide a means for one thread to suspend execution until notified by another thread that some state condition may now be true. Lets take another example:

public class IncrementerWithCondition {
    int count = 0;
    Lock lock = new ReentrantLock(); // Non-fair lock.
    Condition condition = lock.newCondition();

    public void incrementByFirstThread() throws InterruptedException {
        lock.lock();
        System.out.println("Waiting for condition in incrementByFirstThread....");
        condition.await();

        try{
            increment();
        } finally {
            lock.unlock();
        }

        lock.unlock();
    }

    public void incrementBySecondThread() {
        lock.lock();
        System.out.println("Waiting for return key ....");

        Scanner scanner = new Scanner(System.in);
        scanner.nextLine();
        scanner.close();
        System.out.println("Hurray got return key notifying condition now..");
        condition.signal();

        try{
            increment();
        } finally {
            lock.unlock();
        }
    }

    private void increment() {
        for(int i=0; i<1_000; i++) {
            count++;
        }
    }

    public void finished() {
        System.out.println("Finished with value: " + count);
    }
}

Here in the method incrementBySecondThread lock is acquired and then it waits for the return key to be pressed. Once it is done it signals the condition and incrementByFirstThread resumes.

ReentrantReadWriteLock
The example for this will have Reader, Writer and a Dictionary class in which multiple readers can read but only one can write.

public class Reader extends Thread{
    private final Dictionary dictionary;
    private boolean keepRunning = true;

    public Reader(Dictionary dictionary, String name) {
        this.dictionary = dictionary;
        this.setName(name);
    }

    @Override
    public void run() {
        while (keepRunning) {
            String [] keys = dictionary.getKeys();
            for (String key : keys) {
                //reading from dictionary with READ LOCK
                String value = dictionary.get(key);

                //make what ever you want with the value.
                System.out.println(key + " : " + value);
            }

            //update every seconds
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void stopReader() {
        this.keepRunning = false;
        this.interrupt();
    }
}




The writer class is:
public class Writer extends Thread{
    private boolean keepRunning = true;
    private final Dictionary dictionary;

    public Writer(Dictionary dictionary, String threadName) {
        this.dictionary = dictionary;
        this.setName(threadName);
    }

    @Override
    public void run() {
        while (keepRunning){
            String[] keys = dictionary.getKeys();
            for(String key : keys) {
                String newValue = getNewValueFromBackEnd(key);
                dictionary.set(key,newValue);
            }

            //Update every 5 seconds
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void stopWriter() {
        this.keepRunning = false;
        this.interrupt();
    }

    private String getNewValueFromBackEnd(String key) {
        return "NEW-VALUE";
    }
}

The Dictionary class is:
public class Dictionary {
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private Map<String,String> dictionary = new HashMap<>();

    public void set(String key, String value) {
        readWriteLock.writeLock().lock();
        try {
            dictionary.put(key, value);
        } finally {
             readWriteLock.writeLock().unlock();
        }
    }

    public String get(String key) {
        readWriteLock.readLock().lock();
        try{
            return dictionary.get(key);
        } finally {
            readWriteLock.readLock().unlock();
        }
    }

    public String[] getKeys() {
        readWriteLock.readLock().lock();
        try {
            return (String[]) dictionary.keySet().toArray();
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
    public static void main(String[] args) {
        Dictionary dictionary = new Dictionary();
        dictionary.set("java",  "object oriented");
        dictionary.set("linux", "rules");
        Writer writer  = new Writer(dictionary, "Writer");
        Reader reader1 = new Reader(dictionary ,"ReaderOne");
        Reader reader2 = new Reader(dictionary ,"ReaderTwo");
        Reader reader3 = new Reader(dictionary ,"ReaderThree");
        Reader reader4 = new Reader(dictionary ,"ReaderFour");
        writer.start();
        reader1.start();
        reader2.start();
        reader3.start();
        reader4.start();
    }
}

There was some problem with read-write lock in JDK 5 where multiple readers executing concurrently in critical section can lock all writer threads. In JDK 6 this problem was resolved but then readers seem to be starved. You can read more about this here.

StampedLock
Another related class is StampedLock which has three modes for controlling read/write access. The state of a StampedLock consists of a version and mode. Lock acquisition methods return a stamp that represents and controls access with respect to a lock state. The "try" versions may return the special value zero to represent failure to acquire access. The modes are:

  1. Writing. Method writeLock() possibly blocks waiting for exclusive access, returning a stamp that can be used in method unlockWrite(long) to release the lock. Untimed and timed versions of tryWriteLock are also provided. When the lock is held in write mode, no read locks may be obtained, and all optimistic read validations will fail.
  2. Reading. Method readLock() possibly blocks waiting for non-exclusive access, returning a stamp that can be used in method unlockRead(long) to release the lock. Untimed and timed versions of tryReadLock are also provided.
  3. Optimistic Reading. Method tryOptimisticRead() returns a non-zero stamp only if the lock is not currently held in write mode. Method validate(long) returns true if the lock has not been acquired in write mode since obtaining a given stamp. This mode can be thought of as an extremely weak version of a read-lock, that can be broken by a writer at any time. The use of optimistic mode for short read-only code segments often reduces contention and improves throughput. However, its use is inherently fragile. Optimistic read sections should only read fields and hold them in local variables for later use after validation. 
The following example demonstrates usage of StampedLock:
public class BankAccount {
    private final StampedLock stampedLock = new StampedLock();
    private int balance = 1000;

    public void deposit(int amount) {
        long stamp = stampedLock.writeLock();
        try {
            balance += amount;
        } finally {
            stampedLock.unlockWrite(stamp);
        }
    }

    public int getBalance() {
        long stamp = stampedLock.tryOptimisticRead();


        try {
            return balance;
        } finally {
            stampedLock.unlockRead(stamp);
        }
    }
}

The usage of optimistic locking using tryLock is slightly complicated. Consider this simplistic class of ComplexNumber:

public class ComplexNumber {
    private final double real;
    private final double imaginary;

    public ComplexNumber(double real, double imaginary) {
        this.real = real;
        this.imaginary = imaginary;
    }

    public double abs() { return Math.hypot(real,imaginary);}
    public double phase() { return Math.atan2(real, imaginary);} // between PI and -PI

    public ComplexNumber add(ComplexNumber c) {
        double real = this.real + c.getReal();
        double imaginary = this.imaginary - c.getImaginary();
        return new ComplexNumber(real,imaginary);
    }

    public double getReal() {
        return real;
    }

    public double getImaginary() {
        return imaginary;
    }
}



The method abs() needs to get hold of both real and imaginary parts to compute itself. We may need to lock it which we can do using optimistic locking as:

public double abs() {
        long stamp = stampedLock.tryOptimisticRead();   // No locking just optimistic read.
        double currentReal = real, currentImaginary = imaginary;
        if(!stampedLock.validate(stamp)) {
            stamp = stampedLock.readLock();
            try{
                currentReal = real; currentImaginary = imaginary;
            } finally {
                stampedLock.unlockRead(stamp);
            }
        }
        return Math.hypot(real,imaginary);
    }

In the above method after trying for optimistic read lock (using tryOptimisticRead) we need to validate the stamp to ensure everything is OK. If the stamp does not validate it means a write has acquired the lock in between and the values we have may be stale. So we again acquire the read lock but this time it is pessimistic and after getting it we assign the values and then compute the method.

This was a short introduction and I hope it was useful.

Monday, February 23, 2015

CountDownLatch in Java

CountDownLatch is a synchronizer that allows one or more threads to wait until a set of operations being performed in other threads completes. A CDL is initialized with a given count which specifies the number of parties involved. Whenever the method countDown() is called it decrements the value of count by one.

Generally a CDL is used when one or more threads are supposed to wait for a number of threads (parties) to finish. In that case waiting threads make use of await methods
which block until the count reaches zero, on other hand each party decrements the count whenever it finishes. When count is zero all waiting threads are released
and any subsequent invocations of await return immediately.



Unlike CyclicBarrier it is non-cyclic. If we need a version that resets the count then we should consider using a CyclicBarrier. A useful property of a CountDownLatch is that it doesn't require that threads calling countDown wait for the count to reach zero before proceeding, it simply prevents any thread from proceeding past an await until all threads could pass.

public class LatchBasicMain {

    static class Processor implements Runnable {
        CountDownLatch latch;

        public Processor(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            System.out.println("Started some work..");

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            latch.countDown();
            System.out.println("Finished.");
        }
    }

    public static void main(String[] args) {
        int threadsCount = 5;
        CountDownLatch latch = new CountDownLatch(threadsCount);
        ExecutorService executorService = Executors.newFixedThreadPool(threadsCount);
        for(int i=0; i<threadsCount; i++) {
            executorService.submit(new Processor(latch));
        }

        // Main will wait for all other threads to finish.
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Main Finished");
    }
}

I hope this post was informative.