Java Concurrency - wait(), notify(), and notifyAll()

How to Work With wait(), notify() and notifyAll() in Java?

Multithreading in java is pretty complex topic and requires a lot of attention while writing application code dealing with multiple threads accessing one/more shared resources at any given time.

Java 5, introduced some classes like BlockingQueue and Executors which take away some of the complexity by providing easy to use APIs. Programmers using these classes will feel a lot more confident than programmers directly handling synchronization stuff using wait() and notify() method calls.

I will also recommend to use these newer APIs over synchronization yourself, BUT many times we are required to do so for various reasons e.g. maintaining legacy code. A good knowledge around these methods will help you in such situation when arrived.

I am discussing some concepts around methods wait(), notify() and notifyAll().

What are wait(), notify() and notifyAll() methods?

Before moving into concepts, lets note down a few very basic definitions involved for these methods.

The Object class in Java has three final methods that allow threads to communicate about the locked status of a resource. These are :


  • wait() :

    It tells the calling thread to give up the lock and go to sleep until some other thread enters the same monitor and calls notify().

    The wait() method releases the lock prior to waiting and re-acquires the lock prior to returning from the wait() method.

    The wait() method is actually tightly integrated with the synchronization lock, using a feature not available directly from the synchronization mechanism. In other words, it is not possible for us to implement the wait() method purely in Java: it is a native method.

    General syntax for calling wait() method is like this:

    1
    2
    3
    4
    5
    6
    7
    8
    synchronized( lockObject ) { 
    while( ! condition )
    {
    lockObject.wait();
    }

    //take the action here;
    }

  • notify() :

    It wakes up one single thread that called wait() on the same object.

    It should be noted that calling notify() does not actually give up a lock on a resource. It tells a waiting thread that that thread can wake up. However, the lock is not actually given up until the notifier’s synchronized block has completed.

    So, if a notifier calls notify() on a resource but the notifier still needs to perform 10 seconds of actions on the resource within its synchronized block, the thread that had been waiting will need to wait at least another additional 10 seconds for the notifier to release the lock on the object, even though notify() had been called.

    General syntax for callingnotify()` method is like this:

    1
    2
    3
    4
    5
    6
    7
    synchronized(lockObject) {
    //establish_the_condition;

    lockObject.notify();

    //any additional code if needed
    }

  • notifyAll() :

    It wakes up all the threads that called wait() on the same object. The highest priority thread will run first in most of the situation, though not guaranteed. Other things are same as notify() method above.

    General syntax for calling notify() method is like this:

    1
    2
    3
    4
    5
    synchronized(lockObject) {
    establish_the_condition;

    lockObject.notifyAll();
    }

Practice: Counting Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class MyBoundedSemaphore {
int signal;
int capacity;

public MyBoundedSemaphore(int cap) {
signal = 0;
capacity = cap;
}

/*
* Notice how the acquire() method now blocks if the number of signals is equal to the upper bound.
*
* Not until a thread has called release() will the thread calling take() be allowed to deliver its signal,
* if the BoundedSemaphore has reached its upper signal limit.
*
* If the queue size is not equal to either bound when enqueue() or dequeue() is called,
* there can be no threads waiting to either enqueue or dequeue items.
* */
public synchronized void acquire() throws InterruptedException {
while(signal == capacity) {
wait();
}

if(signal == 0) {
notifyAll();
}

signal ++;
}

public synchronized void release() throws InterruptedException {
while(signal == 0) {
wait();
}

if(signal == capacity) {
notifyAll();
}

signal --;
}
}

For more details about Semaphore, please read https://phoenixjiangnan.github.io/2016/04/03/java/concurrency/Java-Concurrency-Semaphore/

Practice: Blocking Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class MyBlockingQueue<T> {
private List<T> queue;
private int limit = 10;

public MyBlockingQueue(int limit) {
queue = new LinkedList<>();
this.limit = limit;
}

/*
* Notice how notifyAll() is only called from enqueue() and dequeue() if the queue size is equal
* to the size bounds (0 or limit).
*
* If the queue size is not equal to either bound when enqueue() or dequeue() is called,
* there can be no threads waiting to either enqueue or dequeue items.
* */
public synchronized void enqueue(T t) throws InterruptedException {
while(queue.size() == limit) {
wait();
}

if(queue.size() == 0) {
notifyAll();
}

queue.add(t);
}

public synchronized T dequeue() throws InterruptedException {
while(queue.size() == 0) {
wait();
}

if(queue.size() == limit) {
notifyAll();
}

return queue.remove(0);
}
}

Practice: Thread Pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class MyThreadPool {
private MyBlockingQueue<Runnable> myqueue;
private List<MyThread> threads;
private boolean isStopped;

public MyThreadPool(int threadCount, int maxNoOfTasks) {
myqueue = new MyBlockingQueue<>(maxNoOfTasks);

threads = new ArrayList<>();
for(int i = 0; i < threadCount; i ++) {
threads.add(new MyThread(myqueue));
}

for(MyThread thread : threads) {
thread.start();
}
}

public synchronized void execute(Runnable task) throws InterruptedException {
if(isStopped) {
throw new IllegalStateException("ThreadPool is stopped");
}

myqueue.enqueue(task);
}

public synchronized void stop() {
isStopped = true;
for(MyThread pt : threads) {
pt.doStop();
}
}

private static class MyThread extends Thread {
private MyBlockingQueue<Runnable> taskQueue;
private boolean isStopped;

public MyThread(MyBlockingQueue<Runnable> blockingQueue) {
taskQueue = blockingQueue;
isStopped = false;
}

public void run() {
while(!isStopped) {
try {
Runnable r = taskQueue.dequeue();
r.run();
} catch(Exception e) {
// log
}
}
}

public synchronized void doStop() {
isStopped = true;
this.interrupt();
}

public synchronized boolean isStopped() {
return isStopped;
}
}
}

References: