What is BlockingQueue?
- BlockingQueue is a Interface which has 4 Implementations – LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue
- Thread Safe: BlockingQueue implementations are thread-safe, with all methods being atomic.
- Blocking Operation: Has blocking behavior if the queue is full (for producers) or empty (for consumers).
- No Null Elements: Attempts to insert a null will result in a NullPointerException.
Two Types of BlockingQueue
- Bounded BlockingQueue: Fixed capacity, blocking producers when full.
- Unbounded BlockingQueue: Expands as needed (e.g., backed by a LinkedList), though subject to memory constraints.
Simple Producer Consumer Implementation using BlockingQueue?
- We have a queBuffer which take max of 10 printing task at a time
- The printing Task are added from PrintProducer whereas it is polled at PrintConsumer end
- When you start the thread for producer you should use start() method rather than run() as run executes by taking control of main thread whereas start() spawns two new thread which makes producer and consumer run at same time in two different threads.
PrintProducer.java
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadLocalRandom; public class PrintProducer extends Thread { private BlockingQueue queBuffer; public PrintProducer(BlockingQueue queBuffer) { this.queBuffer = queBuffer; } @Override public void run() { while(true){ try { Integer randomNo = ThreadLocalRandom.current().nextInt(100); queBuffer.put(randomNo); System.out.println("Added Task No " + String.valueOf(randomNo)); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
PrintConsumer.java
import java.util.concurrent.BlockingQueue; public class PrintConsumer extends Thread{ private BlockingQueue queBuffer; public PrintConsumer(BlockingQueue queBuffer) { this.queBuffer = queBuffer; } @Override public void run() { while(true){ try { System.out.println("Polled Task No " + queBuffer.take()); Thread.sleep(1500); } catch (InterruptedException e) { } } } }
ProcessPrints.java
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ProcessPrints { static BlockingQueue queBuffer = new ArrayBlockingQueue(10); public static void main(String[] args) { PrintProducer objPrintProducer = new PrintProducer(queBuffer); PrintConsumer objPrintConsumer = new PrintConsumer(queBuffer); objPrintProducer.start(); objPrintConsumer.start(); } }
Output
Polled Task No 61 Added Task No 61 Added Task No 33 Added Task No 0 Polled Task No 33 Added Task No 29 Added Task No 93 Added Task No 20 Polled Task No 0 Added Task No 24 Added Task No 2 Added Task No 31 . . . .
The above code can be implemented as below as Thread takes Runnable as argument with run() method definition in lambda expression
ProcessPrints.java
public class ProcessPrints { static BlockingQueue queBuffer = new ArrayBlockingQueue(10); public static void main(String[] args) { //Producer Implementation new Thread(()->{ while(true){ try { Integer randomNo = ThreadLocalRandom.current().nextInt(100); queBuffer.put(randomNo); System.out.println("Added Task No " + String.valueOf(randomNo)); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); //Consumer Implementation new Thread(()->{ while(true){ try { System.out.println("Polled Task No " + queBuffer.poll()); Thread.sleep(1500); } catch (InterruptedException e) { } } }).start(); } }