ProducerDemoWithCallBack.java

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;

public class ProducerDemoWithCallBack {
    private static final Logger log = LoggerFactory.getLogger(ProducerDemoWithCallBack.class);

    public static void main(String[] args) {
        //Connect to Properties
        Properties properties = new Properties();

        //Connect to Localhost
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

        //Set Producer Properties
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());

        //Create Producer by passing Properties above
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int j=0; j<10; j++) {
            //Create Producer Record
            for (int i = 0; i < 30; i++) {

                String keyVal = "Key_"+i;

                // create a Producer Record
                ProducerRecord<String, String> producerRecord =
                        new ProducerRecord<>("TopicFromJava", keyVal, "hello world " + i);

                // send data
                producer.send(producerRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        // executes every time a record successfully sent or an exception is thrown
                        if (e == null) {
                            // the record was successfully sent
                            log.info("Received new metadata " +
                                    "Topic: " + metadata.topic() + " " +
                                    "Key : " + keyVal + " " +
                                    "Partition: " + metadata.partition() + " " +
                                    "Offset: " + metadata.offset() + " " +
                                    "Timestamp: " + metadata.timestamp() + "\n");
                        } else {
                            log.error("Error while producing", e);
                        }
                    }
                });
            }

            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        //Tell Producer to send all data
        producer.flush();
        producer.close();
    }
}

ProducerDemoWithCallBack.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemoWithShutdown {
    public static final Logger log = LoggerFactory.getLogger(KafkaConsumerDemoWithShutdown.class);


    public static void main(String[] args) {
        String strGrpID = "TopicFromJavaGrp";
        String strTopic = "TopicFromJava";

        //Consumer Properties
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());
        properties.setProperty("group.id", strGrpID);
        properties.setProperty("auto.offset.reset", "earliest");

        //Create Consumer with above properties
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        
        final Thread mainThread = Thread.currentThread();

        //Shutdown hook should be started before calling Subscribe and polling
        Runtime.getRuntime().addShutdownHook(new Thread(){
            public void run(){
                log.info("Detected Shutdown Hook, Lets Exit consumer Wakeup..");

                //Telling to call wakeup incase exit is pressed 
                consumer.wakeup();

                try{
                    mainThread.join();
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
            }
        });

        try {
            consumer.subscribe(Arrays.asList(strTopic));

            //Until the loop runs, keep looking for topics from producer
            while(true) {
                log.info("polling... . . ");

                ConsumerRecords<String, String> arrRecords = consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : arrRecords) {
                    log.info("Key:{}, Value:{}, Partition:{}, Offset:{}", record.key(), record.value(), record.partition(), record.offset());
                }
            }
        }catch(WakeupException e){
            //Control comes here incase of exit is called during execution
            log.info("Polling Interuppted.... . . .");
        }catch (Exception e){
            log.error(e.getMessage());
        }finally {
            consumer.close();
            log.info("Consumer is gracefully shutdown...");
        }
    }
}
  1. Is it right to use @SpringBootTest (or) @ContextConfiguration in Unit Test?
    Starting spring for each test is a very expensive operation. It’s not a unit test anymore rather integration test. @SpringBootTest goes further and tries to mimic the processes added by Spring Boot framework for creating the context: Decides what to scan based on package structures, loads external configurations from predefined locations optionally runs autoconfiguration starters and so on and so forth. @SpringBootTest loads the necessary beans and inject into each other.
    When using @ContextConfiguration you present way to filter what exactly should be run, what beans to load and to inject into each other.
  2. How to Optimize the Unit test by selectively loading the resource?
    You can selectively load the resource during unit test

    • To test your Respository : use @DataJpaTest annotation for test slice.
    • To test your Service layer : use JUnit and Mockito. Here you will mock your Repository
    • To test your Controller layer : use @WebMvcTest annotation for test slice or use JUnit and Mockito. Here you will mock your Service in both cases
    • To test a Component, such as a third party library wrapper or load some specific beans: use @ExtendWith(SpringExtension.class) and @ContextConfiguration/@Import or @SpringJUnitWebConfig which is the combinaison of the both.
    • To do an integration test : use @SpringBootTest
  3. When to use @SpringBootTest and @ExtendWith(SpringExtension.class)
    Use @ExtendWith(SpringExtension.class) without loading the entire application context that is included with @SpringBootTest. As you say it’s a lighter weight approach if you just want to mock beans with @MockBean. @SpringBootTest does in fact include @ExtendWith(SpringExtension.class)

    Use @SpringBootTest when,

    • You need to test the application as a whole, including multiple layers like the web layer, service layer, and repository layer.
    • You require an embedded server to run the test, such as when testing web controllers.
    • You want to verify the integration of all components in a fully loaded application context.

    Use SpringRunner/@ExtendWith(SpringExtension.class) when,

    • You are writing unit tests or lightweight integration tests that only need certain features of Spring, such as dependency injection or transaction management, without loading the full application context.
    • You want to test specific layers (e.g., service layer) in isolation without the overhead of starting the entire Spring application.

BaseUnitTest.java

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import java.util.concurrent.TimeUnit;

public class BaseUnitTest {
    long startTime;
    long endTime;

    @BeforeEach
    public void recordStartTime() {
        startTime = System.currentTimeMillis();
    }

    @AfterEach
    public void recordEndAndExecutionTime() {
        endTime = System.currentTimeMillis();
        System.out.println("Last testcase exection time in millisecond : " + 
                              TimeUnit.NANOSECONDS.toMicros((endTime - startTime)) + " Seconds");
    }
}

FirstTest.java

import org.junit.jupiter.api.Test;

class FirstTest extends BaseUnitTest{
    @Test
    void oneSecondTest() throws InterruptedException {
        System.out.println("oneSecondTest() name  =>  " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }

    @Test
    void twoSecondTest() throws InterruptedException {
        System.out.println("twoSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(2000);
    }

    @Test
    void threeSecondTest() throws InterruptedException {
        System.out.println("threeSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(3000);
    }
}

SecondTest.java

import org.junit.jupiter.api.Test;

public class SecondTest extends BaseUnitTest{
    @Test
    void oneSecondTest() throws InterruptedException {
        System.out.println("oneSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }

    @Test
    void twoSecondTest() throws InterruptedException {
        System.out.println("twoSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(2000);
    }

    @Test
    void threeSecondTest() throws InterruptedException {
        System.out.println("threeSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(3000);
    }
}

ThirdTest.java

import org.junit.jupiter.api.Test;

public class ThirdTest extends BaseUnitTest{
    @Test
    void oneSecondTest() throws InterruptedException {
        System.out.println("oneSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }

    @Test
    void twoSecondTest() throws InterruptedException {
        System.out.println("twoSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(2000);
    }

    @Test
    void threeSecondTest() throws InterruptedException {
        System.out.println("threeSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(3000);
    }
}

Output

Last testcase exection time in millisecond : 0 Seconds
Last testcase exection time in millisecond : 2 Seconds
Last testcase exection time in millisecond : 2 Seconds
Last testcase exection time in millisecond : 2 Seconds
Last testcase exection time in millisecond : 3 Seconds
Last testcase exection time in millisecond : 3 Seconds
Last testcase exection time in millisecond : 3 Seconds
threeSecondTest() name => ForkJoinPool-1-worker-10
twoSecondTest() name => ForkJoinPool-1-worker-7
oneSecondTest() name  =>  ForkJoinPool-1-worker-1
twoSecondTest() name => ForkJoinPool-1-worker-6
oneSecondTest() name => ForkJoinPool-1-worker-3
threeSecondTest() name => ForkJoinPool-1-worker-5
oneSecondTest() name => ForkJoinPool-1-worker-4
twoSecondTest() name => ForkJoinPool-1-worker-8
threeSecondTest() name => ForkJoinPool-1-worker-9

junit-platform.properties

# Enable parallelism
junit.jupiter.execution.parallel.enabled = true

# Enable parallelism for both test methods and classes
junit.jupiter.execution.parallel.mode.default =  concurrent

In Maven Command

>>mvn test -Djunit.jupiter.execution.parallel.enabled=true  -Djunit.jupiter.execution.parallel.mode.default=concurrent

Four possibilities in junit-platform.properties config

//Only One class and One method in that class would be executed - Sequential Execution
junit.jupiter.execution.parallel.mode.default =  same_thread
junit.jupiter.execution.parallel.mode.classes.default = same_thread

//More than one class would run in parallel but only one method in each class would be executed 
//Test method within one class run sequentially but more than one class run in parallel
junit.jupiter.execution.parallel.mode.default =  same_thread
junit.jupiter.execution.parallel.mode.classes.default = concurrent

//Only One class and Multiple method in that class would be executed
junit.jupiter.execution.parallel.mode.default =  concurrent
junit.jupiter.execution.parallel.mode.classes.default = same_thread

//Multiple classes and Multiple method in  those classes would be executed
junit.jupiter.execution.parallel.mode.default =  concurrent
junit.jupiter.execution.parallel.mode.classes.default = concurrent