//Displays all available Images
>>docker images 

//Displays List of Containers Running 
>>docker ps 

(or)

>>docker container ls

//Start Container by loading latest Image
>>docker run -d ngnix:latest 

//Start Container by loading latest Image
>>docker run -d ngnix:latest 

//Stop Container 
>>docker stop CONTAINER_ID(or)CONTAINER_NAME


//Display all Container irrespective of stopped or running
>>docker rm CONTAINER_ID

//Remove Container
>>docker ps -a

//Get ID of all containers
>>docker ps -aq

//Remove all stopped containers by ID
>>docker rm $(docker ps -aq)

//Remove all containers(stopped and running) by ID
>>docker rm -f $(docker ps -aq)

//Give CUSTOM_NAME to Container
>>docker run --name CONTAINER_CUSTOM_NAME ngnix:latest

//Stop Container by CUSTOM_NAME
>>docker stop CONTAINER_CUSTOM_NAME

//Display list of Containers Running by setting Format
>>docker ps --format="CUSTOM_FORMAT"

//Using Export Constant Variable
>>export format = "CUSTOM_FORMAT"
>>docker ps --format=${format}

//Map Host Port to Docker Container Port
>>docker run -d -p 8080:80 ngnix:latest

Why we need CustomThreadPoolExecutor when we can use ExecutorService Framework to create and manage Threads?

ExecutorService executor = Executors.newFixedThreadPool(20);

is nothing but

return new ThreadPoolExecutor(20, 20,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

under the hoods.

ThreadPoolExecutor would be more effective if you have customized many or all of below parameters.

ThreadPoolExecutor(int corePoolSize, 
               int maximumPoolSize, 
               long keepAliveTime, 
               TimeUnit unit, 
               BlockingQueue<Runnable> workQueue, 
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler)

How it works?

  1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task. The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn’t, by returning false.
  2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method. So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.
  3. If we cannot queue task, then we try to add a new thread. If it fails, we know we are shut down or saturated and so reject the task.

CustomThreadPoolExecutor.java

import java.util.concurrent.*;

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                    TimeUnit unit, BlockingQueue<Runnable> workQueue,
                                    ThreadFactory threadFactory,
                                    RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
}

CustomThreadFactory.java

import java.util.concurrent.ThreadFactory;

public class CustomThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        Thread th = new Thread(r);
        th.setPriority(Thread.NORM_PRIORITY);
        th.setDaemon(false);
        return th;
    }
}

CustomRejectHandler.java

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class CustomRejectHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task Rejected: "+ r.toString());
    }
}

Task.java

import java.util.concurrent.ThreadPoolExecutor;

public class Task implements Runnable{

    String taskName;
    ThreadPoolExecutor executor;
    Long timeInMilliSeconds;


    public Task(String taskName, ThreadPoolExecutor executor, Long timeInMilliSeconds) {
        this.taskName = taskName;
        this.executor = executor;
        this.timeInMilliSeconds = timeInMilliSeconds;
    }

    @Override
    public void run() {
        try{
            Thread.sleep(this.timeInMilliSeconds);

            System.out.print("Tasks in Blocking Queue "+ this.executor.getQueue().stream().toList() + ", ");
            System.out.print(this.taskName + " completed by " +  Thread.currentThread().getName() + " after running "+ timeInMilliSeconds +"ms" );
            System.out.println(", Active Threads available "+ executor.getPoolSize());
        }catch (Exception e){
        }
    }

    @Override
    public String toString() {
        return "Task{" +
                "taskName='" + taskName + '\'' +
                '}';
    }
}

BatchProcessor.java

public class BatchProcessor {
    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = new CustomThreadPoolExecutor(2,4, 10, TimeUnit.MINUTES,
                                                                    new ArrayBlockingQueue<>(2),
                                                                    new CustomThreadFactory(),
                                                                    new CustomRejectHandler());

        System.out.println("Active Threads  available for processing at start "+  executor.getPoolSize());

        executor.submit( new Task("task1", executor, 2500L)); //Directly dealt by CorePool Thread
        executor.submit( new Task("task2", executor, 500L)); //Directly dealt by CorePool Thread
        Thread.sleep(2000L);
        System.out.println("Slept for 2000 Millisecond");
        executor.submit( new Task("task3", executor, 200L)); //Directly dealt by CorePool Thread
        executor.submit( new Task("task4", executor, 1000L)); //Directly dealt by CorePool Thread
        executor.submit( new Task("task5", executor, 300L)); //Dealt by extra thread within Maximum Pool Size
        executor.submit( new Task("task6",executor, 300L)); //Directly dealt by CorePool Thread

        executor.shutdown();
    }
}

Output

Active Threads  available for processing at start 0
Tasks in Blocking Queue [], task2 completed by Thread-1 after running 500ms, Active Threads available 2
Slept for 2000 Millisecond
Tasks in Blocking Queue Task{'task4','task6'}, task3 completed by Thread-1 after running 200ms, Active Threads available 3
Tasks in Blocking Queue Task{'task6'}, task5 completed by Thread-2 after running 300ms, Active Threads available 3
Tasks in Blocking Queue [], task1 completed by Thread-0 after running 2500ms, Active Threads available 3
Tasks in Blocking Queue [], task6 completed by Thread-2 after running 300ms, Active Threads available 2
Tasks in Blocking Queue [], task4 completed by Thread-1 after running 1000ms, Active Threads available 1

What is BlockingQueue?

  1. BlockingQueue is a Interface which has 4 Implementations – LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue
  2. Thread Safe: BlockingQueue implementations are thread-safe, with all methods being atomic.
  3. Blocking Operation: Has blocking behavior if the queue is full (for producers) or empty (for consumers).
  4. No Null Elements: Attempts to insert a null will result in a NullPointerException.

Two Types of BlockingQueue

  • Bounded BlockingQueue: Fixed capacity, blocking producers when full.
  • Unbounded BlockingQueue: Expands as needed (e.g., backed by a LinkedList), though subject to memory constraints.

Simple Producer Consumer Implementation using BlockingQueue?

  1. We have a queBuffer which take max of 10 printing task at a time
  2. The printing Task are added from PrintProducer whereas it is polled at PrintConsumer end
  3. When you start the thread for producer you should use start() method rather than run() as run executes by taking control of main thread whereas start() spawns two new thread which makes producer and consumer run at same time in two different threads.

PrintProducer.java

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;

public class PrintProducer extends Thread {
    private BlockingQueue queBuffer;

    public PrintProducer(BlockingQueue queBuffer) {
        this.queBuffer = queBuffer;
    }

    @Override
    public void run() {
        while(true){
            try {
                Integer randomNo = ThreadLocalRandom.current().nextInt(100);
                queBuffer.put(randomNo);
                System.out.println("Added Task No " + String.valueOf(randomNo));
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

PrintConsumer.java

import java.util.concurrent.BlockingQueue;

public class PrintConsumer extends Thread{
    private BlockingQueue queBuffer;

    public PrintConsumer(BlockingQueue queBuffer) {
        this.queBuffer = queBuffer;
    }

    @Override
    public void run() {
        while(true){
            try {
                System.out.println("Polled Task No " + queBuffer.take());
                Thread.sleep(1500);
            } catch (InterruptedException e) {

            }
        }
    }
}

ProcessPrints.java

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProcessPrints {
    static BlockingQueue queBuffer = new ArrayBlockingQueue(10);

    public static void main(String[] args) {
        PrintProducer objPrintProducer = new PrintProducer(queBuffer);
        PrintConsumer objPrintConsumer = new PrintConsumer(queBuffer);

        objPrintProducer.start();
        objPrintConsumer.start();
    }
}

Output

Polled Task No 61
Added Task No 61
Added Task No 33
Added Task No 0
Polled Task No 33
Added Task No 29
Added Task No 93
Added Task No 20
Polled Task No 0
Added Task No 24
Added Task No 2
Added Task No 31
.
.
.
.

The above code can be implemented as below as Thread takes Runnable as argument with run() method definition in lambda expression

ProcessPrints.java

public class ProcessPrints {
    static BlockingQueue queBuffer = new ArrayBlockingQueue(10);

    public static void main(String[] args) {
        //Producer Implementation
        new Thread(()->{
            while(true){
                try {
                    Integer randomNo = ThreadLocalRandom.current().nextInt(100);
                    queBuffer.put(randomNo);
                    System.out.println("Added Task No " + String.valueOf(randomNo));
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        //Consumer Implementation
        new Thread(()->{
            while(true){
                try {
                    System.out.println("Polled Task No " + queBuffer.poll());
                    Thread.sleep(1500);
                } catch (InterruptedException e) {

                }
            }
        }).start();
    }
}

Simple Program to print numbers using threads
NumberPrinter.java

public class NumberPrinter implements Runnable{
    int number;
    public NumberPrinter(int number){
        this.number = number;
    }

    public void run(){
        System.out.println("Printing Number from Thread "+ this.number);
    }
}

Main.java

public class Main {
    public static void main(String[] args) {
        for (int idx=1;idx<=5;idx++){
            Thread objthread = new Thread(new NumberPrinter(idx));
            objthread.start();
        }
    }
}

Output

Printing Number from Thread 5
Printing Number from Thread 1
Printing Number from Thread 4
Printing Number from Thread 3
Printing Number from Thread 2

Simple Program using Executor Service taking Runnable as Argument

ExecutorService is a framework which allows to create thread. Threads can be created from FixedThreadPool, CachedThreadPool and ScheduledThreadPool. submit() method takes runnable or callable object (Functional Interface Type) as argument. The Same code above can be rewritten as below
Main.java

public class Main {
    public static void main(String[] args) throws Exception {
        ExecutorService objExecService = Executors.newFixedThreadPool(2);

        //Lambda Expresssion passed as Argument as Runnable is FI
        objExecService.submit(() -> {
            System.out.println(Thread.currentThread().getName());
        });

        objExecService.shutdown();
    }
}

Output

pool-1-thread-1

Same code with Runnable instance passed as argument to submit

  .
  . 
  //Instance of Runnable passed as argument
  HelloThread1 objHT1 = new HelloThread1();
  objExecService.submit(objHT1);
  .
  .

Output

Hello World from Thread Name (pool-1-thread-1) using Runnable 

Same code with Runnable as Anonymous Class passed as argument

ExecutorService exec = Executors.newFixedThreadPool(2);

//Instance of Runnable passed as Anonymous class
exec.execute(new Runnable() {
  public void run() {
    System.out.println("Hello world");
  }
});

exec.shutdown();

Simple Program using Executor Service taking Callable as Argument

public class Main {
    public static void main(String[] args) throws Exception {
        ExecutorService objExecService = Executors.newFixedThreadPool(2);
        Future<String> objFuture = objExecService.submit(new HelloThread2());
        System.out.println(objFuture.get());
        objExecService.shutdown();
    }
}

Output

Hello World from Thread Name (pool-1-thread-1) using Callable

Using Lambda Expression as Submi

.
.
ExecutorService objExecService = Executors.newFixedThreadPool(2);

Future<String> objFuture = objExecService.submit(() -> {
  Thread.sleep(3000);
  return Thread.currentThread().getName();
});

System.out.println(objFuture.get());
.
.

The above could be rewritten in anonymous class as below

ExecutorService objExecService = Executors.newFixedThreadPool(2);

Future<String> objFuture = objExecService.submit(new Callable<String>() {
 @Override
 public String call() throws Exception {
   Thread.sleep(3000);
   return Thread.currentThread().getName();
 }
});

System.out.println(objFuture.get());
objExecService.shutdown();

Program for Creating Thread Pool and executing Task

ThreadPoolExample.java

public class ThreadPoolExample {
    public static void main(String args[]) {
       ExecutorService service = Executors.newFixedThreadPool(10); //create 10 worker threads in Thread Pool
       for (int i =0; i<100; i++){
           service.submit(new Task(i)); //submit that to be done 
       }
       service.shutdown();
    }  
}

Task.java

final class Task implements Runnable {
    private int taskId;  
    public Task(int id){
        this.taskId = id;
    }
  
    @Override
    public void run() {
        System.out.println("Task ID : " + this.taskId +" performed by " 
                           + Thread.currentThread().getName());
    }  
}
Task ID : 0 performed by pool-1-thread-1
Task ID : 3 performed by pool-1-thread-4
Task ID : 2 performed by pool-1-thread-3
Task ID : 1 performed by pool-1-thread-2
Task ID : 5 performed by pool-1-thread-6
Task ID : 4 performed by pool-1-thread-5

ProducerDemoWithCallBack.java

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;

public class ProducerDemoWithCallBack {
    private static final Logger log = LoggerFactory.getLogger(ProducerDemoWithCallBack.class);

    public static void main(String[] args) {
        //Connect to Properties
        Properties properties = new Properties();

        //Connect to Localhost
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

        //Set Producer Properties
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());

        //Create Producer by passing Properties above
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int j=0; j<10; j++) {
            //Create Producer Record
            for (int i = 0; i < 30; i++) {

                String keyVal = "Key_"+i;

                // create a Producer Record
                ProducerRecord<String, String> producerRecord =
                        new ProducerRecord<>("TopicFromJava", keyVal, "hello world " + i);

                // send data
                producer.send(producerRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        // executes every time a record successfully sent or an exception is thrown
                        if (e == null) {
                            // the record was successfully sent
                            log.info("Received new metadata " +
                                    "Topic: " + metadata.topic() + " " +
                                    "Key : " + keyVal + " " +
                                    "Partition: " + metadata.partition() + " " +
                                    "Offset: " + metadata.offset() + " " +
                                    "Timestamp: " + metadata.timestamp() + "\n");
                        } else {
                            log.error("Error while producing", e);
                        }
                    }
                });
            }

            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        //Tell Producer to send all data
        producer.flush();
        producer.close();
    }
}

ProducerDemoWithCallBack.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemoWithShutdown {
    public static final Logger log = LoggerFactory.getLogger(KafkaConsumerDemoWithShutdown.class);


    public static void main(String[] args) {
        String strGrpID = "TopicFromJavaGrp";
        String strTopic = "TopicFromJava";

        //Consumer Properties
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());
        properties.setProperty("group.id", strGrpID);
        properties.setProperty("auto.offset.reset", "earliest");

        //Create Consumer with above properties
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        
        final Thread mainThread = Thread.currentThread();

        //Shutdown hook should be started before calling Subscribe and polling
        Runtime.getRuntime().addShutdownHook(new Thread(){
            public void run(){
                log.info("Detected Shutdown Hook, Lets Exit consumer Wakeup..");

                //Telling to call wakeup incase exit is pressed 
                consumer.wakeup();

                try{
                    mainThread.join();
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
            }
        });

        try {
            consumer.subscribe(Arrays.asList(strTopic));

            //Until the loop runs, keep looking for topics from producer
            while(true) {
                log.info("polling... . . ");

                ConsumerRecords<String, String> arrRecords = consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : arrRecords) {
                    log.info("Key:{}, Value:{}, Partition:{}, Offset:{}", record.key(), record.value(), record.partition(), record.offset());
                }
            }
        }catch(WakeupException e){
            //Control comes here incase of exit is called during execution
            log.info("Polling Interuppted.... . . .");
        }catch (Exception e){
            log.error(e.getMessage());
        }finally {
            consumer.close();
            log.info("Consumer is gracefully shutdown...");
        }
    }
}
  1. Is it right to use @SpringBootTest (or) @ContextConfiguration in Unit Test?
    Starting spring for each test is a very expensive operation. It’s not a unit test anymore rather integration test. @SpringBootTest goes further and tries to mimic the processes added by Spring Boot framework for creating the context: Decides what to scan based on package structures, loads external configurations from predefined locations optionally runs autoconfiguration starters and so on and so forth. @SpringBootTest loads the necessary beans and inject into each other.
    When using @ContextConfiguration you present way to filter what exactly should be run, what beans to load and to inject into each other.
  2. How to Optimize the Unit test by selectively loading the resource?
    You can selectively load the resource during unit test

    • To test your Respository : use @DataJpaTest annotation for test slice.
    • To test your Service layer : use JUnit and Mockito. Here you will mock your Repository
    • To test your Controller layer : use @WebMvcTest annotation for test slice or use JUnit and Mockito. Here you will mock your Service in both cases
    • To test a Component, such as a third party library wrapper or load some specific beans: use @ExtendWith(SpringExtension.class) and @ContextConfiguration/@Import or @SpringJUnitWebConfig which is the combinaison of the both.
    • To do an integration test : use @SpringBootTest
  3. When to use @SpringBootTest and @ExtendWith(SpringExtension.class)
    Use @ExtendWith(SpringExtension.class) without loading the entire application context that is included with @SpringBootTest. As you say it’s a lighter weight approach if you just want to mock beans with @MockBean. @SpringBootTest does in fact include @ExtendWith(SpringExtension.class)

    Use @SpringBootTest when,

    • You need to test the application as a whole, including multiple layers like the web layer, service layer, and repository layer.
    • You require an embedded server to run the test, such as when testing web controllers.
    • You want to verify the integration of all components in a fully loaded application context.

    Use SpringRunner/@ExtendWith(SpringExtension.class) when,

    • You are writing unit tests or lightweight integration tests that only need certain features of Spring, such as dependency injection or transaction management, without loading the full application context.
    • You want to test specific layers (e.g., service layer) in isolation without the overhead of starting the entire Spring application.

BaseUnitTest.java

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import java.util.concurrent.TimeUnit;

public class BaseUnitTest {
    long startTime;
    long endTime;

    @BeforeEach
    public void recordStartTime() {
        startTime = System.currentTimeMillis();
    }

    @AfterEach
    public void recordEndAndExecutionTime() {
        endTime = System.currentTimeMillis();
        System.out.println("Last testcase exection time in millisecond : " + 
                              TimeUnit.NANOSECONDS.toMicros((endTime - startTime)) + " Seconds");
    }
}

FirstTest.java

import org.junit.jupiter.api.Test;

class FirstTest extends BaseUnitTest{
    @Test
    void oneSecondTest() throws InterruptedException {
        System.out.println("oneSecondTest() name  =>  " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }

    @Test
    void twoSecondTest() throws InterruptedException {
        System.out.println("twoSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(2000);
    }

    @Test
    void threeSecondTest() throws InterruptedException {
        System.out.println("threeSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(3000);
    }
}

SecondTest.java

import org.junit.jupiter.api.Test;

public class SecondTest extends BaseUnitTest{
    @Test
    void oneSecondTest() throws InterruptedException {
        System.out.println("oneSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }

    @Test
    void twoSecondTest() throws InterruptedException {
        System.out.println("twoSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(2000);
    }

    @Test
    void threeSecondTest() throws InterruptedException {
        System.out.println("threeSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(3000);
    }
}

ThirdTest.java

import org.junit.jupiter.api.Test;

public class ThirdTest extends BaseUnitTest{
    @Test
    void oneSecondTest() throws InterruptedException {
        System.out.println("oneSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }

    @Test
    void twoSecondTest() throws InterruptedException {
        System.out.println("twoSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(2000);
    }

    @Test
    void threeSecondTest() throws InterruptedException {
        System.out.println("threeSecondTest() name => " + Thread.currentThread().getName());
        Thread.sleep(3000);
    }
}

Output

Last testcase exection time in millisecond : 0 Seconds
Last testcase exection time in millisecond : 2 Seconds
Last testcase exection time in millisecond : 2 Seconds
Last testcase exection time in millisecond : 2 Seconds
Last testcase exection time in millisecond : 3 Seconds
Last testcase exection time in millisecond : 3 Seconds
Last testcase exection time in millisecond : 3 Seconds
threeSecondTest() name => ForkJoinPool-1-worker-10
twoSecondTest() name => ForkJoinPool-1-worker-7
oneSecondTest() name  =>  ForkJoinPool-1-worker-1
twoSecondTest() name => ForkJoinPool-1-worker-6
oneSecondTest() name => ForkJoinPool-1-worker-3
threeSecondTest() name => ForkJoinPool-1-worker-5
oneSecondTest() name => ForkJoinPool-1-worker-4
twoSecondTest() name => ForkJoinPool-1-worker-8
threeSecondTest() name => ForkJoinPool-1-worker-9

junit-platform.properties

# Enable parallelism
junit.jupiter.execution.parallel.enabled = true

# Enable parallelism for both test methods and classes
junit.jupiter.execution.parallel.mode.default =  concurrent

In Maven Command

>>mvn test -Djunit.jupiter.execution.parallel.enabled=true  -Djunit.jupiter.execution.parallel.mode.default=concurrent

Four possibilities in junit-platform.properties config

//Only One class and One method in that class would be executed - Sequential Execution
junit.jupiter.execution.parallel.mode.default =  same_thread
junit.jupiter.execution.parallel.mode.classes.default = same_thread

//More than one class would run in parallel but only one method in each class would be executed 
//Test method within one class run sequentially but more than one class run in parallel
junit.jupiter.execution.parallel.mode.default =  same_thread
junit.jupiter.execution.parallel.mode.classes.default = concurrent

//Only One class and Multiple method in that class would be executed
junit.jupiter.execution.parallel.mode.default =  concurrent
junit.jupiter.execution.parallel.mode.classes.default = same_thread

//Multiple classes and Multiple method in  those classes would be executed
junit.jupiter.execution.parallel.mode.default =  concurrent
junit.jupiter.execution.parallel.mode.classes.default = concurrent

Eager Rebalance
Assume we have a topic with 10 partitions (0-9), and one consumer (lets name it consumer1) consuming it. When a second consumer appears (consumer2) the rebalance task triggers for both of them (consumer1 gets an event, consumer2 does the initial rebalance). Now consumer1 closes all the existing connections (even those that will be reopened soon) and releases the partition ownership in Zookeeper for all 10 partitions.

Then it runs the partition assignment algorithm and decides what partitions should be claimed and claims the partition ownership in Zookeeper again. If the claim was successful consumer1 starts fetching his new partitions.

Meanwhile consumer2 runs the partition assignment algorithm as well and tries to claim his partitions in Zookeeper as well. Claim will succeed only when consumer1 releases the ownership on these partitions. When the claim is successful consumer2 starts fetching, or if it fails to claim partitions within a given amount of retries you get a rebalance failed after n retries exception.

As you noticed instead of just closing connections and releasing ownership for partitions consumer1 does not own anymore, it unnecessarily closes ALL his connections and restarts with just a lower amount of partitions. The same story with adding partitions (when we consume by a wildcard filter and new topic appears) – ALL connections are closed and then opened again instead of just opening new ones.

What is Disadvantage of Above?
All consumers:

1.Stop consuming in order to give up their partition ownership
2.Re-join the group via the JoinGroup request
3.Receive a brand new partition assignment via the SyncGroup request, only once the rebalance finishes

There is a short window of unavailability for the entire consumer group between steps 1) and 3) – a “stop the world” event.

Three things may happen which affect the availability of data during rebalance

  1. happy path: how fast the consumers can join back
  2. not-so-happy path: the session_timeout_ms
  3. worst case: until your consumers stabilize

If your rebalance doesn’t execute successfully then rebalance would be restart again
If your consumer doesn’t join the group at all in session_timeout_ms, the group will complete the rebalance without them.When it does catch up and join – a new rebalance will start.
If your consumer starts the rebalance dance but doesn’t complete it in session_timeout_ms – the group will abort the rebalance, kick them out of the group, and begin a new rebalance.

And when it restarts back up and joins – a new rebalance will start again.

Besides the trickiness of the timeout during a rebalance, the basic cases that can start a new rebalance are:
• if any consumer fails.
• if any consumer restarts.
• if a new consumer joins the group.

So we both have:
1. plenty of common cases that trigger rebalances
2. tricky cases that can disrupt ongoing rebalances

Consumer group rebalance is nothing more than a partition reassignment between the consumers. If at all times there’s only one consumer joining/leaving the group.It does not make much sense to pause all the other consumers when they are bound to get the exact same partitions after the rebalance finishes.

To Start Kafka Server

.\bin\windows\kafka-server-start.bat .\config\server.properties
.\bin\windows\kafka-server-stop.bat .\config\server.properties

To Start Zookeeper Server

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\zookeeper-server-stop.bat .\config\zookeeper.properties

Create Topic in Kafka with 5 Partition and Replication factor of 1

kafka-topics.bat  --bootstrap-server localhost:9092 --topic firsttopic --create --partitions 5  --replication-factor 1

Note: Replication Factor cannot be more than 1 incase of localhost.

List Topics

kafka-topics.bat  --bootstrap-server localhost:9092 --list

Describe Topic

kafka-topics.bat  --bootstrap-server localhost:9092 --topic firsttopic --describe

Delete Topic

kafka-topics.bat  --bootstrap-server localhost:9092 --topic firsttopic --delete

Producer to push data into Topic in Kafka

kafka-console-producer.bat --broker-list localhost:9092 --topic test

Producer sending data into Topic as Key:Value pair

kafka-console-producer.bat --broker-list localhost:9092 --topic firsttopic  --property parse.key=true --property key.separator=:

Note:

  1. Kafka Topic with same key would end in same Partition
  2. separator should be sent in command to diff between key and value

Delete Record in Topic

kafka-delete-records.bat --bootstrap-server localhost:9092 --offset-json-file ./offsetfile.json

If you try to push data to a topic which doesn’t exist after 3 attempts the topic would be created.

Consumer to pull data from Topic in Kafka

kafka-console-consumer.bat --topic test --bootstrap-server localhost:9092 --from-beginning

Print Partition, Key, Value in consumer

kafka-console-consumer.bat --topic thirdtopic --bootstrap-server localhost:9092  --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning

Adding consumer to consumer Group

kafka-console-consumer --bootstrap-server localhost:9092 --topic third_topic --group my-first-application

Listing consumer Group

kafka-consumer-groups.sh  --list --bootstrap-server localhost:9092

Describing consumer Group

bin/kafka-consumer-groups.bat --describe --group mygroup --bootstrap-server localhost:9092

Reset Offset in Topic in all partitions

kafka-console-consumer.bat --topic thirdtopic --bootstrap-server localhost:9092  --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning

Note: Resetting Partition makes the consumer to read from the new offset point.

{“Message”: “Hello World from Kafka”}

How Topics, Partitions and Broker are related

Topics are logical categories or streams of data within Kafka. They act as message queues where producers publish data and consumers retrieve it
Brokers are servers that store and manage topics, and handle communication between producers and consumer
Partitions are the basic unit of data storage and distribution within Kafka topics. They are the main method of concurrency for topics, and are used to improve performance and scalability.

What is Broker Discovery?
A client that wants to send or receive messages from the Kafka cluster may connect to any broker in the cluster. Every broker in the cluster has metadata about all the other brokers and will help the client connect to them as well, and therefore any broker in the cluster is also called a bootstrap server.

  1. A client connects to a broker in the cluster
  2. The client sends a metadata request to the broker
  3. The broker responds with the cluster metadata, including a list of all brokers in the cluster
  4. The client can now connect to any broker in the cluster to produce or consume data

What is Replication Factor?
the number of copies of a topic’s partitions across different brokers. When Kafka Connects creates a topic, the replication factor should be at least 3 for a production system. A replication factor of 3 is commonly used because it balances broker loss and replication overhead

topic replication does not increase the consumer parallelism


How to choose the replication factor

It should be at least 2 and a maximum of 4. The recommended number is 3 as it provides the right balance between performance and fault tolerance, and usually cloud providers provide 3 data centers / availability zones to deploy to as part of a region.The advantage of having a higher replication factor is that it provides a better resilience of your system. If the replication factor is N, up to N-1 broker may fail without impacting availability if acks=0 or acks=1

The disadvantages of having a higher replication factor is Higher latency experienced by the producers, as the data needs to be replicated to all the replica brokers before an ack is returned if acks=all.More disk space required on your system

If there is a performance issue due to a higher replication factor, you should get a better broker instead of lowering the replication factor

Maximum Replication Factor = No of Brokers in Cluster

What is min.insync.replica?
min.insync.replicas is the minimum number of copies of the data that you are willing to have online at any time to continue running and accepting new incoming messages. min.insync.replica here is 1 by default

What is role of Zookeeper in kafka?

  1. Electing a controller. The controller is one of the brokers and is responsible for maintaining the leader/follower relationship for all the partitions. When a node shuts down, it is the controller that tells other replicas to become partition leaders to replace the partition leaders on the node that is going away. Zookeeper is used to elect a controller, make sure there is only one and elect a new one it if it crashes.
  2. Cluster membership – which brokers are alive and part of the cluster? this is also managed through ZooKeeper.
  3. Topic configuration – which topics exist, how many partitions each has, where are the replicas, who is the preferred leader, what configuration overrides are set for each topic
  4. (0.9.0) – Quotas – how much data is each client allowed to read and write
  5. (0.9.0) – ACLs – who is allowed to read and write to which topic (old high level consumer) – Which consumer groups exist, who are their members and what is the latest offset each group got from each partition.

What is bootstrap.servers?
bootstrap.servers provides the initial hosts that act as the starting point for a Kafka client to discover the full set of alive servers in the cluster. bootstrap.servers is a configuration we place within clients, which is a comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a “bootstrap” Kafka cluster that a Kafka client connects to initially to bootstrap itself.

Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list does not have to contain the full set of servers (you may want more than one, though, in case a server is down).

It is the URL of one of the Kafka brokers which you give to fetch the initial metadata about your Kafka cluster. The metadata consists of the topics, their partitions, the leader brokers for those partitions etc. Depending upon this metadata your producer or consumer produces or consumes the data.

You can have multiple bootstrap-servers in your producer or consumer configuration. So that if one of the broker is not accessible, then it falls back to other.

Kafka default partitioner doesnt pitch in until the data reaches 16KB

What is Consumer Group?
If more than one consumer comes togeather and tries to read topic, in such case topic which is split across various partitions would be read by various consumer in group.

Is it always one consumer is assigned to one partition of Topic?
The consumers in a group divide the topic partitions as fairly amongst themselves as possible by establishing that each partition is only consumed by a single consumer from the group. When the number of consumers is lower than partitions, same consumers are going to read messages from more than one partition.

If a single consumer is going to read from all your partitions. This type of consumer is known as exclusive consumer.

The number of partitions should be equal to the number of consumers

the number of consumers be greater, the excess consumers were to be idle, wasting client resources. If the number of partitions is greater, some consumers will read from multiple partitions, which should not be an issue unless the ordering of messages is important.

Does kafka ensures ordering of messages across multiple Partitions?
Kafka does not guarantee ordering of messages between partitions. It does provide ordering within a partition. Therefore, Kafka can maintain message ordering for a consumer if it is subscribed to only a single partition. If message ordering is required then messages send from producer should be using a same partition key to be grouped into same partition in kafka broker.

Could there be a scenario where a partition would be read multiple times?
Yes. If the Partitions are read by more than one consumers from different consumer group. Note: Consumers from same group could not read same partitions more than once.