ProducerDemoWithCallBack.java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerDemoWithCallBack {
private static final Logger log = LoggerFactory.getLogger(ProducerDemoWithCallBack.class);
public static void main(String[] args) {
//Connect to Properties
Properties properties = new Properties();
//Connect to Localhost
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
//Set Producer Properties
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
//Create Producer by passing Properties above
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int j=0; j<10; j++) {
//Create Producer Record
for (int i = 0; i < 30; i++) {
String keyVal = "Key_"+i;
// create a Producer Record
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("TopicFromJava", keyVal, "hello world " + i);
// send data
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
// executes every time a record successfully sent or an exception is thrown
if (e == null) {
// the record was successfully sent
log.info("Received new metadata " +
"Topic: " + metadata.topic() + " " +
"Key : " + keyVal + " " +
"Partition: " + metadata.partition() + " " +
"Offset: " + metadata.offset() + " " +
"Timestamp: " + metadata.timestamp() + "\n");
} else {
log.error("Error while producing", e);
}
}
});
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//Tell Producer to send all data
producer.flush();
producer.close();
}
}
ProducerDemoWithCallBack.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerDemoWithShutdown {
public static final Logger log = LoggerFactory.getLogger(KafkaConsumerDemoWithShutdown.class);
public static void main(String[] args) {
String strGrpID = "TopicFromJavaGrp";
String strTopic = "TopicFromJava";
//Consumer Properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id", strGrpID);
properties.setProperty("auto.offset.reset", "earliest");
//Create Consumer with above properties
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
final Thread mainThread = Thread.currentThread();
//Shutdown hook should be started before calling Subscribe and polling
Runtime.getRuntime().addShutdownHook(new Thread(){
public void run(){
log.info("Detected Shutdown Hook, Lets Exit consumer Wakeup..");
//Telling to call wakeup incase exit is pressed
consumer.wakeup();
try{
mainThread.join();
}catch(InterruptedException e){
e.printStackTrace();
}
}
});
try {
consumer.subscribe(Arrays.asList(strTopic));
//Until the loop runs, keep looking for topics from producer
while(true) {
log.info("polling... . . ");
ConsumerRecords<String, String> arrRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : arrRecords) {
log.info("Key:{}, Value:{}, Partition:{}, Offset:{}", record.key(), record.value(), record.partition(), record.offset());
}
}
}catch(WakeupException e){
//Control comes here incase of exit is called during execution
log.info("Polling Interuppted.... . . .");
}catch (Exception e){
log.error(e.getMessage());
}finally {
consumer.close();
log.info("Consumer is gracefully shutdown...");
}
}
}




