ProducerDemoWithCallBack.java
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; public class ProducerDemoWithCallBack { private static final Logger log = LoggerFactory.getLogger(ProducerDemoWithCallBack. class ); public static void main(String[] args) { //Connect to Properties Properties properties = new Properties(); //Connect to Localhost properties.setProperty( "bootstrap.servers" , "127.0.0.1:9092" ); //Set Producer Properties properties.setProperty( "key.serializer" , StringSerializer. class .getName()); properties.setProperty( "value.serializer" , StringSerializer. class .getName()); //Create Producer by passing Properties above KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for ( int j= 0 ; j< 10 ; j++) { //Create Producer Record for ( int i = 0 ; i < 30 ; i++) { String keyVal = "Key_" +i; // create a Producer Record ProducerRecord<String, String> producerRecord = new ProducerRecord<>( "TopicFromJava" , keyVal, "hello world " + i); // send data producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { // executes every time a record successfully sent or an exception is thrown if (e == null ) { // the record was successfully sent log.info( "Received new metadata " + "Topic: " + metadata.topic() + " " + "Key : " + keyVal + " " + "Partition: " + metadata.partition() + " " + "Offset: " + metadata.offset() + " " + "Timestamp: " + metadata.timestamp() + "\n" ); } else { log.error( "Error while producing" , e); } } }); } try { Thread.sleep( 300 ); } catch (InterruptedException e) { throw new RuntimeException(e); } } //Tell Producer to send all data producer.flush(); producer.close(); } } |
ProducerDemoWithCallBack.java
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerDemoWithShutdown { public static final Logger log = LoggerFactory.getLogger(KafkaConsumerDemoWithShutdown. class ); public static void main(String[] args) { String strGrpID = "TopicFromJavaGrp" ; String strTopic = "TopicFromJava" ; //Consumer Properties Properties properties = new Properties(); properties.setProperty( "bootstrap.servers" , "127.0.0.1:9092" ); properties.setProperty( "key.deserializer" , StringDeserializer. class .getName()); properties.setProperty( "value.deserializer" , StringDeserializer. class .getName()); properties.setProperty( "group.id" , strGrpID); properties.setProperty( "auto.offset.reset" , "earliest" ); //Create Consumer with above properties KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); final Thread mainThread = Thread.currentThread(); //Shutdown hook should be started before calling Subscribe and polling Runtime.getRuntime().addShutdownHook( new Thread(){ public void run(){ log.info( "Detected Shutdown Hook, Lets Exit consumer Wakeup.." ); //Telling to call wakeup incase exit is pressed consumer.wakeup(); try { mainThread.join(); } catch (InterruptedException e){ e.printStackTrace(); } } }); try { consumer.subscribe(Arrays.asList(strTopic)); //Until the loop runs, keep looking for topics from producer while ( true ) { log.info( "polling... . . " ); ConsumerRecords<String, String> arrRecords = consumer.poll(Duration.ofMillis( 1000 )); for (ConsumerRecord<String, String> record : arrRecords) { log.info( "Key:{}, Value:{}, Partition:{}, Offset:{}" , record.key(), record.value(), record.partition(), record.offset()); } } } catch (WakeupException e){ //Control comes here incase of exit is called during execution log.info( "Polling Interuppted.... . . ." ); } catch (Exception e){ log.error(e.getMessage()); } finally { consumer.close(); log.info( "Consumer is gracefully shutdown..." ); } } } |