{"id":5452,"date":"2025-02-17T15:39:21","date_gmt":"2025-02-17T15:39:21","guid":{"rendered":"https:\/\/codethataint.com\/blog\/?p=5452"},"modified":"2025-02-17T15:39:21","modified_gmt":"2025-02-17T15:39:21","slug":"simple-producer-and-consumer-in-kafka","status":"publish","type":"post","link":"https:\/\/codethataint.com\/blog\/simple-producer-and-consumer-in-kafka\/","title":{"rendered":"Simple Producer and Consumer in Kafka"},"content":{"rendered":"<p><strong>ProducerDemoWithCallBack.java<\/strong><\/p>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\nimport org.apache.kafka.clients.producer.*;\r\nimport org.apache.kafka.common.serialization.StringSerializer;\r\nimport org.slf4j.Logger;\r\nimport org.slf4j.LoggerFactory;\r\nimport java.util.Properties;\r\n\r\npublic class ProducerDemoWithCallBack {\r\n    private static final Logger log = LoggerFactory.getLogger(ProducerDemoWithCallBack.class);\r\n\r\n    public static void main(String&#x5B;] args) {\r\n        \/\/Connect to Properties\r\n        Properties properties = new Properties();\r\n\r\n        \/\/Connect to Localhost\r\n        properties.setProperty(&quot;bootstrap.servers&quot;, &quot;127.0.0.1:9092&quot;);\r\n\r\n        \/\/Set Producer Properties\r\n        properties.setProperty(&quot;key.serializer&quot;, StringSerializer.class.getName());\r\n        properties.setProperty(&quot;value.serializer&quot;, StringSerializer.class.getName());\r\n\r\n        \/\/Create Producer by passing Properties above\r\n        KafkaProducer&lt;String, String&gt; producer = new KafkaProducer&lt;&gt;(properties);\r\n\r\n        for (int j=0; j&lt;10; j++) {\r\n            \/\/Create Producer Record\r\n            for (int i = 0; i &lt; 30; i++) {\r\n\r\n                String keyVal = &quot;Key_&quot;+i;\r\n\r\n                \/\/ create a Producer Record\r\n                ProducerRecord&lt;String, String&gt; producerRecord =\r\n                        new ProducerRecord&lt;&gt;(&quot;TopicFromJava&quot;, keyVal, &quot;hello world &quot; + i);\r\n\r\n                \/\/ send data\r\n                producer.send(producerRecord, new Callback() {\r\n                    @Override\r\n                    public void onCompletion(RecordMetadata metadata, Exception e) {\r\n                        \/\/ executes every time a record successfully sent or an exception is thrown\r\n                        if (e == null) {\r\n                            \/\/ the record was successfully sent\r\n                            log.info(&quot;Received new metadata &quot; +\r\n                                    &quot;Topic: &quot; + metadata.topic() + &quot; &quot; +\r\n                                    &quot;Key : &quot; + keyVal + &quot; &quot; +\r\n                                    &quot;Partition: &quot; + metadata.partition() + &quot; &quot; +\r\n                                    &quot;Offset: &quot; + metadata.offset() + &quot; &quot; +\r\n                                    &quot;Timestamp: &quot; + metadata.timestamp() + &quot;\\n&quot;);\r\n                        } else {\r\n                            log.error(&quot;Error while producing&quot;, e);\r\n                        }\r\n                    }\r\n                });\r\n            }\r\n\r\n            try {\r\n                Thread.sleep(300);\r\n            } catch (InterruptedException e) {\r\n                throw new RuntimeException(e);\r\n            }\r\n        }\r\n\r\n        \/\/Tell Producer to send all data\r\n        producer.flush();\r\n        producer.close();\r\n    }\r\n}\r\n<\/pre>\n<p><strong>ProducerDemoWithCallBack.java<\/strong><\/p>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\nimport org.apache.kafka.clients.consumer.ConsumerRecord;\r\nimport org.apache.kafka.clients.consumer.ConsumerRecords;\r\nimport org.apache.kafka.clients.consumer.KafkaConsumer;\r\nimport org.apache.kafka.common.errors.WakeupException;\r\nimport org.apache.kafka.common.serialization.StringDeserializer;\r\nimport org.slf4j.Logger;\r\nimport org.slf4j.LoggerFactory;\r\nimport java.time.Duration;\r\nimport java.util.Arrays;\r\nimport java.util.Properties;\r\n\r\npublic class KafkaConsumerDemoWithShutdown {\r\n    public static final Logger log = LoggerFactory.getLogger(KafkaConsumerDemoWithShutdown.class);\r\n\r\n\r\n    public static void main(String&#x5B;] args) {\r\n        String strGrpID = &quot;TopicFromJavaGrp&quot;;\r\n        String strTopic = &quot;TopicFromJava&quot;;\r\n\r\n        \/\/Consumer Properties\r\n        Properties properties = new Properties();\r\n        properties.setProperty(&quot;bootstrap.servers&quot;, &quot;127.0.0.1:9092&quot;);\r\n        properties.setProperty(&quot;key.deserializer&quot;, StringDeserializer.class.getName());\r\n        properties.setProperty(&quot;value.deserializer&quot;, StringDeserializer.class.getName());\r\n        properties.setProperty(&quot;group.id&quot;, strGrpID);\r\n        properties.setProperty(&quot;auto.offset.reset&quot;, &quot;earliest&quot;);\r\n\r\n        \/\/Create Consumer with above properties\r\n        KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(properties);\r\n        \r\n        final Thread mainThread = Thread.currentThread();\r\n\r\n        \/\/Shutdown hook should be started before calling Subscribe and polling\r\n        Runtime.getRuntime().addShutdownHook(new Thread(){\r\n            public void run(){\r\n                log.info(&quot;Detected Shutdown Hook, Lets Exit consumer Wakeup..&quot;);\r\n\r\n                \/\/Telling to call wakeup incase exit is pressed \r\n                consumer.wakeup();\r\n\r\n                try{\r\n                    mainThread.join();\r\n                }catch(InterruptedException e){\r\n                    e.printStackTrace();\r\n                }\r\n            }\r\n        });\r\n\r\n        try {\r\n            consumer.subscribe(Arrays.asList(strTopic));\r\n\r\n            \/\/Until the loop runs, keep looking for topics from producer\r\n            while(true) {\r\n                log.info(&quot;polling... . . &quot;);\r\n\r\n                ConsumerRecords&lt;String, String&gt; arrRecords = consumer.poll(Duration.ofMillis(1000));\r\n\r\n                for (ConsumerRecord&lt;String, String&gt; record : arrRecords) {\r\n                    log.info(&quot;Key:{}, Value:{}, Partition:{}, Offset:{}&quot;, record.key(), record.value(), record.partition(), record.offset());\r\n                }\r\n            }\r\n        }catch(WakeupException e){\r\n            \/\/Control comes here incase of exit is called during execution\r\n            log.info(&quot;Polling Interuppted.... . . .&quot;);\r\n        }catch (Exception e){\r\n            log.error(e.getMessage());\r\n        }finally {\r\n            consumer.close();\r\n            log.info(&quot;Consumer is gracefully shutdown...&quot;);\r\n        }\r\n    }\r\n}\r\n<\/pre>\n","protected":false},"excerpt":{"rendered":"<p>ProducerDemoWithCallBack.java import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; public class ProducerDemoWithCallBack { private static final Logger log = LoggerFactory.getLogger(ProducerDemoWithCallBack.class); public static void main(String&#x5B;] args) { \/\/Connect to Properties Properties properties = new Properties(); \/\/Connect to Localhost properties.setProperty(&quot;bootstrap.servers&quot;, &quot;127.0.0.1:9092&quot;); \/\/Set Producer Properties properties.setProperty(&quot;key.serializer&quot;, StringSerializer.class.getName()); properties.setProperty(&quot;value.serializer&quot;, StringSerializer.class.getName()); \/\/Create Producer by passing Properties above KafkaProducer&lt;String,&hellip; <a href=\"https:\/\/codethataint.com\/blog\/simple-producer-and-consumer-in-kafka\/\">Continue reading <span class=\"meta-nav\">&rarr;<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[365],"tags":[],"class_list":["post-5452","post","type-post","status-publish","format-standard","hentry","category-kafka"],"_links":{"self":[{"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/posts\/5452","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/comments?post=5452"}],"version-history":[{"count":1,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/posts\/5452\/revisions"}],"predecessor-version":[{"id":5453,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/posts\/5452\/revisions\/5453"}],"wp:attachment":[{"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/media?parent=5452"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/categories?post=5452"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/tags?post=5452"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}