ProducerDemoWithCallBack.java
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; public class ProducerDemoWithCallBack { private static final Logger log = LoggerFactory.getLogger(ProducerDemoWithCallBack.class); public static void main(String[] args) { //Connect to Properties Properties properties = new Properties(); //Connect to Localhost properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); //Set Producer Properties properties.setProperty("key.serializer", StringSerializer.class.getName()); properties.setProperty("value.serializer", StringSerializer.class.getName()); //Create Producer by passing Properties above KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int j=0; j<10; j++) { //Create Producer Record for (int i = 0; i < 30; i++) { String keyVal = "Key_"+i; // create a Producer Record ProducerRecord<String, String> producerRecord = new ProducerRecord<>("TopicFromJava", keyVal, "hello world " + i); // send data producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { // executes every time a record successfully sent or an exception is thrown if (e == null) { // the record was successfully sent log.info("Received new metadata " + "Topic: " + metadata.topic() + " " + "Key : " + keyVal + " " + "Partition: " + metadata.partition() + " " + "Offset: " + metadata.offset() + " " + "Timestamp: " + metadata.timestamp() + "\n"); } else { log.error("Error while producing", e); } } }); } try { Thread.sleep(300); } catch (InterruptedException e) { throw new RuntimeException(e); } } //Tell Producer to send all data producer.flush(); producer.close(); } }
ProducerDemoWithCallBack.java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerDemoWithShutdown { public static final Logger log = LoggerFactory.getLogger(KafkaConsumerDemoWithShutdown.class); public static void main(String[] args) { String strGrpID = "TopicFromJavaGrp"; String strTopic = "TopicFromJava"; //Consumer Properties Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); properties.setProperty("key.deserializer", StringDeserializer.class.getName()); properties.setProperty("value.deserializer", StringDeserializer.class.getName()); properties.setProperty("group.id", strGrpID); properties.setProperty("auto.offset.reset", "earliest"); //Create Consumer with above properties KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); final Thread mainThread = Thread.currentThread(); //Shutdown hook should be started before calling Subscribe and polling Runtime.getRuntime().addShutdownHook(new Thread(){ public void run(){ log.info("Detected Shutdown Hook, Lets Exit consumer Wakeup.."); //Telling to call wakeup incase exit is pressed consumer.wakeup(); try{ mainThread.join(); }catch(InterruptedException e){ e.printStackTrace(); } } }); try { consumer.subscribe(Arrays.asList(strTopic)); //Until the loop runs, keep looking for topics from producer while(true) { log.info("polling... . . "); ConsumerRecords<String, String> arrRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : arrRecords) { log.info("Key:{}, Value:{}, Partition:{}, Offset:{}", record.key(), record.value(), record.partition(), record.offset()); } } }catch(WakeupException e){ //Control comes here incase of exit is called during execution log.info("Polling Interuppted.... . . ."); }catch (Exception e){ log.error(e.getMessage()); }finally { consumer.close(); log.info("Consumer is gracefully shutdown..."); } } }