{"id":5482,"date":"2025-03-10T14:32:04","date_gmt":"2025-03-10T14:32:04","guid":{"rendered":"https:\/\/codethataint.com\/blog\/?p=5482"},"modified":"2025-03-10T15:30:04","modified_gmt":"2025-03-10T15:30:04","slug":"producer-consumer-implementation-using-blockingqueue","status":"publish","type":"post","link":"https:\/\/codethataint.com\/blog\/producer-consumer-implementation-using-blockingqueue\/","title":{"rendered":"Producer Consumer Implementation using BlockingQueue"},"content":{"rendered":"<p><strong class=\"ctaHeader3\">What is BlockingQueue?<\/strong><\/p>\n<ol>\n<li> BlockingQueue is a Interface which has 4 Implementations &#8211; LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue<\/li>\n<li>Thread Safe: BlockingQueue implementations are thread-safe, with all methods being atomic.<\/li>\n<li>Blocking Operation: Has blocking behavior if the queue is full (for producers) or empty (for consumers).<\/li>\n<li>No Null Elements: Attempts to insert a null will result in a NullPointerException.<\/li>\n<\/ol>\n<p><strong>Two Types of BlockingQueue<\/strong><\/p>\n<ul>\n<li>Bounded BlockingQueue: Fixed capacity, blocking producers when full.<\/li>\n<li>Unbounded BlockingQueue: Expands as needed (e.g., backed by a LinkedList), though subject to memory constraints.<\/li>\n<\/ul>\n<p><strong class=\"ctaHeader3\">Simple Producer Consumer Implementation using BlockingQueue?<\/strong><\/p>\n<ol>\n<li>We have a queBuffer which take max of 10 printing task at a time<\/li>\n<li>The printing Task are added from PrintProducer whereas it is polled at PrintConsumer end<\/li>\n<li>When you start the thread for producer you should use start() method rather than run() as run executes by taking control of main thread whereas start() spawns two new thread which makes producer and consumer run at same time in two different threads.<\/li>\n<\/ol>\n<p><strong>PrintProducer.java<\/strong><\/p>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\nimport java.util.concurrent.BlockingQueue;\r\nimport java.util.concurrent.ThreadLocalRandom;\r\n\r\npublic class PrintProducer extends Thread {\r\n    private BlockingQueue queBuffer;\r\n\r\n    public PrintProducer(BlockingQueue queBuffer) {\r\n        this.queBuffer = queBuffer;\r\n    }\r\n\r\n    @Override\r\n    public void run() {\r\n        while(true){\r\n            try {\r\n                Integer randomNo = ThreadLocalRandom.current().nextInt(100);\r\n                queBuffer.put(randomNo);\r\n                System.out.println(&quot;Added Task No &quot; + String.valueOf(randomNo));\r\n                Thread.sleep(500);\r\n            } catch (InterruptedException e) {\r\n                throw new RuntimeException(e);\r\n            }\r\n        }\r\n    }\r\n}\r\n<\/pre>\n<p><strong>PrintConsumer.java<\/strong><\/p>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\nimport java.util.concurrent.BlockingQueue;\r\n\r\npublic class PrintConsumer extends Thread{\r\n    private BlockingQueue queBuffer;\r\n\r\n    public PrintConsumer(BlockingQueue queBuffer) {\r\n        this.queBuffer = queBuffer;\r\n    }\r\n\r\n    @Override\r\n    public void run() {\r\n        while(true){\r\n            try {\r\n                System.out.println(&quot;Polled Task No &quot; + queBuffer.take());\r\n                Thread.sleep(1500);\r\n            } catch (InterruptedException e) {\r\n\r\n            }\r\n        }\r\n    }\r\n}\r\n<\/pre>\n<p><strong>ProcessPrints.java<\/strong><\/p>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\nimport java.util.concurrent.ArrayBlockingQueue;\r\nimport java.util.concurrent.BlockingQueue;\r\n\r\npublic class ProcessPrints {\r\n    static BlockingQueue queBuffer = new ArrayBlockingQueue(10);\r\n\r\n    public static void main(String&#x5B;] args) {\r\n        PrintProducer objPrintProducer = new PrintProducer(queBuffer);\r\n        PrintConsumer objPrintConsumer = new PrintConsumer(queBuffer);\r\n\r\n        objPrintProducer.start();\r\n        objPrintConsumer.start();\r\n    }\r\n}\r\n<\/pre>\n<p><strong>Output<\/strong><\/p>\n<pre>\r\nPolled Task No 61\r\nAdded Task No 61\r\nAdded Task No 33\r\nAdded Task No 0\r\nPolled Task No 33\r\nAdded Task No 29\r\nAdded Task No 93\r\nAdded Task No 20\r\nPolled Task No 0\r\nAdded Task No 24\r\nAdded Task No 2\r\nAdded Task No 31\r\n.\r\n.\r\n.\r\n.\r\n<\/pre>\n<p>The above code can be implemented as below as Thread takes Runnable as argument with run() method definition in lambda expression<\/p>\n<p><strong>ProcessPrints.java<\/strong><\/p>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\npublic class ProcessPrints {\r\n    static BlockingQueue queBuffer = new ArrayBlockingQueue(10);\r\n\r\n    public static void main(String&#x5B;] args) {\r\n        \/\/Producer Implementation\r\n        new Thread(()-&gt;{\r\n            while(true){\r\n                try {\r\n                    Integer randomNo = ThreadLocalRandom.current().nextInt(100);\r\n                    queBuffer.put(randomNo);\r\n                    System.out.println(&quot;Added Task No &quot; + String.valueOf(randomNo));\r\n                    Thread.sleep(500);\r\n                } catch (InterruptedException e) {\r\n                    throw new RuntimeException(e);\r\n                }\r\n            }\r\n        }).start();\r\n\r\n        \/\/Consumer Implementation\r\n        new Thread(()-&gt;{\r\n            while(true){\r\n                try {\r\n                    System.out.println(&quot;Polled Task No &quot; + queBuffer.poll());\r\n                    Thread.sleep(1500);\r\n                } catch (InterruptedException e) {\r\n\r\n                }\r\n            }\r\n        }).start();\r\n    }\r\n}\r\n<\/pre>\n","protected":false},"excerpt":{"rendered":"<p>What is BlockingQueue? BlockingQueue is a Interface which has 4 Implementations &#8211; LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue Thread Safe: BlockingQueue implementations are thread-safe, with all methods being atomic. Blocking Operation: Has blocking behavior if the queue is full (for producers) or empty (for consumers). No Null Elements: Attempts to insert a null will result in a&hellip; <a href=\"https:\/\/codethataint.com\/blog\/producer-consumer-implementation-using-blockingqueue\/\">Continue reading <span class=\"meta-nav\">&rarr;<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[141],"tags":[],"class_list":["post-5482","post","type-post","status-publish","format-standard","hentry","category-threads"],"_links":{"self":[{"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/posts\/5482","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/comments?post=5482"}],"version-history":[{"count":3,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/posts\/5482\/revisions"}],"predecessor-version":[{"id":5485,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/posts\/5482\/revisions\/5485"}],"wp:attachment":[{"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/media?parent=5482"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/categories?post=5482"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/tags?post=5482"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}