Thread – direction or path that is taken while a program is executed

  1. Below code would explain how dirty read happens when multiple thread(2 threads) tries to access Instance variable at once from two different class objects
  2. Below we have 3 classes, One for Printing Report and Other for removing the report printed. Third for tracking the report status
  3. The Output of the code would be Consistently 0 every time which is expected when the number of reports to be printed is in range of less than 100. However the output changes with more the no of reports to be printed. I.E. totalReportsToBePrinted = 10000000
  4. This happens because for lower value of reports to be printed the thread executes fast with out context switching however for higher values other thread(RemovePrintedReports thread takes control before PrintExcelReports get completed) takes control which leads to inconsistency
  5. The Same code would return 0 every time if one thread(PrintExcelReports) completes before other(RemovePrintedReports) as below when we use join and didn’t start both the threads at once.
    .
    .
            t1.start();
            t1.join();
            
            t2.start();
            t2.join();
    
    .
    .
    

PrintExcelReports.java

public class PrintExcelReports implements Runnable {
    TotalReportCount totalReportCount;

    public PrintExcelReports(TotalReportCount totalReportCount) {
        this.totalReportCount = totalReportCount;
    }

    int totalReportsToBePrinted = 1000000;

    @Override
    public void run() {
        for(int i=0;i<totalReportsToBePrinted;i++){
            totalReportCount.totalReportsCntVal -= i;
        }
    }
}

RemovePrintedReports.java

public class RemovePrintedReports implements Runnable {
    TotalReportCount totalReportCount;

    public RemovePrintedReports(TotalReportCount totalReportCount) {
        this.totalReportCount = totalReportCount;
    }

    int totalReportsToBePrinted = 1000000;

    @Override
    public void run() {
        for(int i=0;i<totalReportsToBePrinted;i++){
            totalReportCount.totalReportsCntVal -= i;
        }
    }
}

ReportCurrentStatus.java

public class ReportCurrentStatus {
    public static void main(String[] args) throws InterruptedException {
        TotalReportCount objTotalReportCount = new TotalReportCount();
        objTotalReportCount.totalReportsCntVal = 0;

        PrintExcelReports objPrinter1 = new PrintExcelReports(objTotalReportCount);
        RemovePrintedReports objPrinter2 = new RemovePrintedReports(objTotalReportCount);

        Thread t1 = new Thread(objPrinter1);
        Thread t2 = new Thread(objPrinter2);

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println(objTotalReportCount.totalReportsCntVal);
    }
}

Output when totalReportsToBePrinted is greater than 1000

RANDOM NUMBER

Output when totalReportsToBePrinted is less than 100

0

Simple Program using Runnable and Callable

HelloThread1.java

public class HelloThread1 implements Runnable{
    @Override
    public void run() {
        System.out.println("Hello World from Thread Name (" + Thread.currentThread().getName() +") using Runnable ");
    }
}

HelloThread2.java

public class HelloThread2 implements Callable {
    @Override
    public Object call() throws Exception {
        return "Hello World from Thread Name (" + Thread.currentThread().getName() +") using Callable";
    }
}

Main.java

public class Main {
    public static void main(String[] args) throws Exception {
        HelloThread1 objThread1 = new HelloThread1(); //Instance of Runnable
        HelloThread2 objThread2 = new HelloThread2(); //Instance of Callable

        objThread1.run();
        System.out.println(objThread2.call());
    }
}

Output

Hello World from Thread Name (main) using Runnable 
Hello World from Thread Name (main) using Callable

Simple Program to print numbers using threads
NumberPrinter.java

public class NumberPrinter implements Runnable{
    int number;
    public NumberPrinter(int number){
        this.number = number;
    }

    public void run(){
        System.out.println("Printing Number from Thread "+ this.number);
    }
}

Main.java

public class Main {
    public static void main(String[] args) {
        for (int idx=1;idx<=5;idx++){
            Thread objthread = new Thread(new NumberPrinter(idx));
            objthread.start();
        }
    }
}

Output

Printing Number from Thread 5
Printing Number from Thread 1
Printing Number from Thread 4
Printing Number from Thread 3
Printing Number from Thread 2

Simple Program using Executor Service taking Runnable as Argument

ExecutorService is a framework which allows to create thread. Threads can be created from FixedThreadPool, CachedThreadPool and ScheduledThreadPool. submit() method takes runnable or callable object (Functional Interface Type) as argument. The Same code above can be rewritten as below
Main.java

public class Main {
    public static void main(String[] args) throws Exception {
        ExecutorService objExecService = Executors.newFixedThreadPool(2);

        //Lambda Expresssion passed as Argument as Runnable is FI
        objExecService.submit(() -> {
            System.out.println(Thread.currentThread().getName());
        });

        objExecService.shutdown();
    }
}

Output

pool-1-thread-1

Same code with Runnable instance passed as argument to submit

  .
  . 
  //Instance of Runnable passed as argument
  HelloThread1 objHT1 = new HelloThread1();
  objExecService.submit(objHT1);
  .
  .

Output

Hello World from Thread Name (pool-1-thread-1) using Runnable 

Same code with Runnable as Anonymous Class passed as argument

ExecutorService exec = Executors.newFixedThreadPool(2);

//Instance of Runnable passed as Anonymous class
exec.execute(new Runnable() {
  public void run() {
    System.out.println("Hello world");
  }
});

exec.shutdown();

Simple Program using Executor Service taking Callable as Argument

public class Main {
    public static void main(String[] args) throws Exception {
        ExecutorService objExecService = Executors.newFixedThreadPool(2);
        Future<String> objFuture = objExecService.submit(new HelloThread2());
        System.out.println(objFuture.get());
        objExecService.shutdown();
    }
}

Output

Hello World from Thread Name (pool-1-thread-1) using Callable

Using Lambda Expression as Submi

.
.
ExecutorService objExecService = Executors.newFixedThreadPool(2);

Future<String> objFuture = objExecService.submit(() -> {
  Thread.sleep(3000);
  return Thread.currentThread().getName();
});

System.out.println(objFuture.get());
.
.

The above could be rewritten in anonymous class as below

ExecutorService objExecService = Executors.newFixedThreadPool(2);

Future<String> objFuture = objExecService.submit(new Callable<String>() {
 @Override
 public String call() throws Exception {
   Thread.sleep(3000);
   return Thread.currentThread().getName();
 }
});

System.out.println(objFuture.get());
objExecService.shutdown();

Program for Creating Thread Pool and executing Task

ThreadPoolExample.java

public class ThreadPoolExample {
    public static void main(String args[]) {
       ExecutorService service = Executors.newFixedThreadPool(10); //create 10 worker threads in Thread Pool
       for (int i =0; i<100; i++){
           service.submit(new Task(i)); //submit that to be done 
       }
       service.shutdown();
    }  
}

Task.java

final class Task implements Runnable {
    private int taskId;  
    public Task(int id){
        this.taskId = id;
    }
  
    @Override
    public void run() {
        System.out.println("Task ID : " + this.taskId +" performed by " 
                           + Thread.currentThread().getName());
    }  
}
Task ID : 0 performed by pool-1-thread-1
Task ID : 3 performed by pool-1-thread-4
Task ID : 2 performed by pool-1-thread-3
Task ID : 1 performed by pool-1-thread-2
Task ID : 5 performed by pool-1-thread-6
Task ID : 4 performed by pool-1-thread-5
  1. NEW – a newly created thread that has not yet started the execution
  2. RUNNABLE – either running or ready for execution but it’s waiting for resource allocation
  3. BLOCKED – waiting to acquire a monitor lock to enter or re-enter a synchronized block/method
  4. WAITING – waiting for some other thread to perform a particular action without any time limit
  5. TIMED_WAITING – waiting for some other thread to perform a specific action for a specified period
  6. TERMINATED – has completed its execution

NEW Thread (or a Born Thread) is a thread that’s been created but not yet started.
It remains in this state until we start it using the start() method

NewState.java

public class NewState implements Runnable{
    public void run(){
        System.out.println("I am in new State");
    }
}

Main.java

public class Main {
    public static void main(String[] args) throws InterruptedException {
       Thread objThread = new Thread(new NewState());
       System.out.println(objThread.getState());
    }
}

Output

NEW

Runnable When we’ve created a new thread and called the start() method on that, it’s moved from NEW to RUNNABLE state. Threads in this state are either running or ready to run, but
they’re waiting for resource allocation from the system. In a multi-threaded environment, the Thread-Scheduler (which is part of JVM) allocates a fixed amount of time to each thread. So it runs for a particular amount of time, then leaves the control to other RUNNABLE threads.

RunnableState .java

public class RunnableState implements Runnable{
    public void run(){
        System.out.println("I would be in Runnable State");
    }
}

Main.java

public class Main {
    public static void main(String[] args) throws InterruptedException {
       Thread objRThread = new Thread(new RunnableState());
       objRThread.start();
       System.out.println(objRThread.getState());
    }
}

Output

RUNNABLE
I would be in Runnable State

This is the state of a dead thread. It’s in the TERMINATED state when it has either finished execution or was terminated abnormally.
TerminatedState.java

public class TerminatedState implements Runnable{
    public void run(){
        Thread objNewState = new Thread(new NewState());
        objNewState.start();
    }
}

Main.java

public class Main {
    public static void main(String[] args) throws InterruptedException {
       Thread objTState = new Thread(new TerminatedState());
       objTState.start();
       objTState.sleep(1000);
       System.out.println("T1 : "+ objTState.getState());
    }
}

Output

I am in new State
T1 : TERMINATED

A thread is in the BLOCKED state when it’s currently not eligible to run. It enters this state when it is waiting for a monitor lock and is trying to access a section of code that is locked by some other thread.
BlockedState.java

public class BlockedState implements Runnable{
    public void run(){
      blockedResource();
    }

    public static synchronized void blockedResource(){
        while(true){
            //Do Nothing
        }
    }
}

Main.java

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread objB1Thread = new Thread(new BlockedState());
        Thread objB2Thread = new Thread(new BlockedState());

        objB1Thread.start();
        objB2Thread.start();

        Thread.sleep(1000);

        System.out.println(objB1Thread.getState());
        System.out.println(objB2Thread.getState());
        System.exit(0);
    }
}

Output

RUNNABLE
BLOCKED

A thread is in WAITING state when it’s waiting for some other thread to perform a particular action. According to JavaDocs, any thread can enter this state by calling any one of the following
object.wait() (or) thread.join() (or) LockSupport.park()

WaitingState.java

public class WaitingState implements Runnable{
    public void run(){
        Thread objWaitState = new Thread(new SleepState());

        objWaitState.start();

        try {
            objWaitState.join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

SleepState.java

public class SleepState implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

Main.java

public class Main {
    public static void main(String[] args) throws InterruptedException {
       Thread objWaitingThread = new Thread(new WaitingState());
       objWaitingThread.start();
       objWaitingThread.sleep(1000);
       System.out.println("T1 : "+ objWaitingThread.getState());
       System.out.println("Main : "+Thread.currentThread().getState());
    }
}

Output

T1 : WAITING
Main : RUNNABLE

A thread is in TIMED_WAITING state when it’s waiting for another thread to perform a particular action within a stipulated amount of time. According to JavaDocs, there are five ways to put a thread on TIMED_WAITING state:
thread.sleep(long millis) (or) wait(int timeout) (or) wait(int timeout, int nanos) thread.join(long millis) (or) LockSupport.parkNanos (or) LockSupport.parkUntil

TimedWaitState.java

public class TimedWaitState implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

Main.java

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread objTWState = new Thread(new TimedWaitState());
        objTWState.start();
        Thread.sleep(2000);
        System.out.println("T1 : "+ objTWState.getState());
    }
}

Output

T1 : TIMED_WAITING
  1. We use ReentrantLock for locking the Resource(totalSeats)
  2. Incase anything goes wrong (Exception being thrown etc.) you want to make sure the lock is released no matter what.
  3. Calling the reserveSeats method should be done inside separate threads

ReservationSystem.java

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReservationSystem {
    private Integer totalSeats;
    private final Lock lock = new ReentrantLock();

    public ReservationSystem(Integer totalSeats){
        this.totalSeats = totalSeats;
    }

    public Integer getTotalSeats(){
        return totalSeats;
    }

    public void reserveSeats(String userName, int numOfSeats){
        lock.lock();

        try{
            if(numOfSeats >0 && totalSeats>numOfSeats){
                totalSeats -= numOfSeats;
                System.out.println(userName + " has reserved "+ numOfSeats + " with " + totalSeats + " still available");
            }else{
                System.out.println("Seats not Available");
            }
        }finally {
            lock.unlock();
        }
    }
}

BookSeat.java

public class BookSeat {
    public static void main(String[] args) {
        ReservationSystem objResSys = new ReservationSystem(100);

        System.out.println("Total available Seats "+ objResSys.getTotalSeats());

        Thread objThread1 = new Thread(() -> {objResSys.reserveSeats("User1", 10);});
        Thread objThread2 = new Thread(() -> {objResSys.reserveSeats("User2", 20);});
        Thread objThread3 = new Thread(() -> {objResSys.reserveSeats("User3", 5);});


        objThread1.start();
        objThread2.start();
        objThread3.start();

        try {
            objThread1.join();
            objThread2.join();
            objThread3.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        System.out.println("Remaining available Seats "+ objResSys.getTotalSeats());
    }
}


Total available Seats 100
User2 has reserved 20 with 80 still available
User1 has reserved 10 with 70 still available
User3 has reserved 5 with 65 still available
Remaining available Seats 65

Banking System

  1. We have Bank Account with 2 Fields – balance and Account Number
  2. We have Transaction class implementing Runnable
  3. We create object for account with some initial balance and try to pass as parameter to runnable Transaction Object

BankAccount.java

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BankAccount {
    private Integer balance;
    private Integer accountNumber;

    private final Lock reLock = new ReentrantLock();

    public BankAccount(Integer balance, Integer accountNumber){
        this.balance = balance;
        this.accountNumber = accountNumber;
    }

    public void debitAmount(Integer amount){
        reLock.lock();

        try{
            balance -= amount;
        }finally {
            reLock.unlock();
        }

    }

    public void creditAmount(Integer amount){
        reLock.lock();

        try{
            balance += amount;
        }finally {
            reLock.unlock();
        }
    }

    public Integer getAccountNumber(){
        return this.accountNumber;
    }

    public Integer getBalance(){
        return this.balance;
    }

}

BankTransaction.java

public class BankTransaction implements Runnable{
    public Integer transAmount;
    public BankAccount bankAccount;

    public BankTransaction(Integer transAmount, BankAccount bankAccount){
        this.transAmount  = transAmount;
        this.bankAccount  = bankAccount;
    }


    @Override
    public void run() {
        if(transAmount >= 0){
            bankAccount.creditAmount(transAmount);
        }else{
            bankAccount.debitAmount(Math.abs(transAmount));
        }
    }
}

BankSystem.java

public class BankSystem {
    public static void main(String[] args) {
        BankAccount objAcc1 = new BankAccount(1000, 101);
        BankAccount objAcc2 = new BankAccount(2000, 102);

        Thread objThread1 = new Thread(new BankTransaction(50, objAcc1));
        Thread objThread2 = new Thread(new BankTransaction(-150, objAcc2));
        Thread objThread3 = new Thread(new BankTransaction(250, objAcc2));
        Thread objThread4 = new Thread(new BankTransaction(250, objAcc1));

        objThread1.start();
        objThread2.start();
        objThread3.start();
        objThread4.start();

        try{
            objThread1.join();
            objThread2.join();
            objThread3.join();
            objThread4.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        System.out.println("Final Balance in Account " + objAcc1.getAccountNumber() + " with balance " + objAcc1.getBalance());
        System.out.println("Final Balance in Account " + objAcc2.getAccountNumber() + " with balance " + objAcc2.getBalance());
    }
}

Output

Final Balance in Account 101 with balance 1300
Final Balance in Account 102 with balance 2100

Communication Models

  1. Synchronous + Blocking – Calling Customer Service and waiting for response online
  2. Asynchronous- Asking My Friend to Call Customer Service and I carry forward with my work
  3. Non-Blocking- Asking Call back from Customer Service and I carry forward with my work
  4. Asynchronous + Non Blocking – I am Calling customer Service and asking to call back My Friend and I carry forward with my work

request -> response
request -> streaming response (Stock Price in Stock Market App, Heart Beat for Health Check in Spring Boot Appp)
streaming request -> response (Using Google Docs and updating in drive in regular time intervals)
streaming request -> streaming response (Playing Game Online)

Reactive Stream Specification
Process Stream of Messages in a Non Blocking Asynchronous manner with Observer Design Pattern(Observe and React incase of change)

  1. Publisher: Emits a sequence of elements to its subscribers.
    void subscribe(Subscriber<? super T> s)
    
  2. Subscriber: Consumes elements provided by a Publisher.
    void onSubscribe(Subscription s)
    void onNext(T t)
    void onError(Throwable t)
    void onComplete()
    
  3. Subscription: Represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher.
    void request(long n)
    void cancel()
    
  4. Processor: Represents a processing stage, which is both a Subscriber and a Publisher.Inherits both Subscriber and Publisher interfaces.

There would be one Publisher at Top, similar to root of tree and there would be 0 to N intermediate processors(subscriber + publisher) and there would be leaf Subscriber

Publisher, Subscriber and Subscription

public interface Publisher<T> {
   public void subscribe(Subscriber<? super T> s);
}
public interface Subscription {
   public void request(long n);
   public void cancel();
}
public interface Subscriber<T> {
   public void onSubscribe(Subscription s);
   public void onNext(T t);
   public void onError(Throwable t);
   public void onComplete();
}
  1. Publisher will have subscribe method through which we would pass the subscriber instance. The Subscription object would be returned by Publisher
  2. The Publisher hands over subscription object to Subscriber. Subscriber uses onSubscribe method to accept subscription.
  3. Subscriber could use subscription object using request method and could cancel subscription. Communication between Publisher and Subscriber happens using subscription object
  4. Subscriber can request N items using subscription object. The Publisher can iterate to N object using onNext method. Publisher only give 3 items if 3 items is requested by subscriber.
  5. If the Publisher has completed transferring all Items, then Publisher can call onComplete() method in Subscriber to notify the subscriber that its work is done
  6. Publisher calls onError() method to notify error.

  1. publisher does not produce data unless subscriber requests for it.
  2. publisher will produce only <= subscriber requested items. publisher can also produce 0 items!
  3. subscriber can cancel the subscription. producer should stop at that moment as subscriber is no longer interested in consuming the data
  4. producer can send the error signal

  1. PublisherImpl instance would call the subscribe method and pass instance of SubscriberImpl
  2. PublisherImpl subscribe method would inturn call the onSubscribe method using the instance of SubscriberImpl passed.
  3. SubscriberImpl would get the same subscription which it has been passed to publisherImpl subscribe method earlier
  4. PublisherImpl has following methods
    • subscribe – takes subscriber as argument and creates new subscription and notify the subscriber by calling the onSubscribe method
      public void subscribe(Subscriber subscriber) {
              var subscription = new SubscriptionImpl(subscriber);
              subscriber.onSubscribe(subscription);
      }
      
  5. SubscriberImpl has following methods
    • onSubscribe – To get the same subscription passed to publisher subscribe method. This is inturn called from publisherImpl
    • onNext – called from subscriptionImpl to pass the data during iteration
    • onError – called from subscriptionImpl during error
    • onComplete – called from when all data is completed
    @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
        }
    
        @Override
        public void onNext(String emailId) {
            logger.info("Received {}", emailId);
        }
    
        @Override
        public void onError(Throwable throwable) {
            logger.info("---------------------------------------------");
            logger.info("Received error {}", throwable.getMessage());
        }
    
        @Override
        public void onComplete() {
            logger.info("Subscription ended");
        }
    
  6. SubscriptionImplhas following methods
    • request – you can request date using subscriptionImpl instance by passing no of records
    • cancel – cancel subscription
    @Override
        public void request(long requestedItemCnt) {
            if(isCancelled){
                return;
            }
    
            logger.info("Subscriber has requested {} items ", requestedItemCnt);
    
            if(requestedItemCnt >MAX_ITEMS){
                this.subscriber.onError(new RuntimeException(" Items requested is more than Total Items Available"));
                this.isCancelled = true;
                return;
            }
    
            //Check if all items(MAX_ITEMS) were sent
            for(int idx=0;idx<requestedItemCnt && count<MAX_ITEMS; idx++){
                count++;
                this.subscriber.onNext(this.faker.internet().emailAddress());
            }
    
            //If all items were sent complete subscription
            if(count == MAX_ITEMS){
                logger.info("No More data from Producer");
                this.subscriber.onComplete();
                isCancelled = true;
            }
        }
    
        @Override
        public void cancel() {
            logger.info("Cancelling Subscription... . . .");
            isCancelled = true;
        }
    

pom.xml

  <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty-http</artifactId>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>com.github.javafaker</groupId>
            <artifactId>javafaker</artifactId>
            <version>${faker.version}</version>
        </dependency>

PublisherImpl.java

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class PublisherImpl implements Publisher {
    @Override
    public void subscribe(Subscriber subscriber) {
        var subscription = new SubscriptionImpl(subscriber);
        subscriber.onSubscribe(subscription);
    }
}

SubscriptionImpl.java

public class SubscriptionImpl implements Subscription {
    private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class);
    private final Subscriber<? super String> subscriber;
    private boolean isCancelled = false;
    private final Faker faker;

    private final int MAX_ITEMS = 10;
    private static int count = 0;


    public SubscriptionImpl(Subscriber subscriber){
        this.subscriber = subscriber;
        this.faker = Faker.instance();
    }

    @Override
    public void request(long requestedItemCnt) {
        if(isCancelled){
            return;
        }

        logger.info("Subscriber has requested {} items ", requestedItemCnt);

        if(requestedItemCnt >MAX_ITEMS){
            this.subscriber.onError(new RuntimeException(" Items requested is more than Total Items Available"));
            this.isCancelled = true;
            return;
        }

        //Check if all items(MAX_ITEMS) were sent
        for(int idx=0;idx<requestedItemCnt && count<MAX_ITEMS; idx++){
            count++;
            this.subscriber.onNext(this.faker.internet().emailAddress());
        }


        //If all items were sent complete subscription
        if(count == MAX_ITEMS){
            logger.info("No More data from Producer");
            this.subscriber.onComplete();
            isCancelled = true;
        }
    }

    @Override
    public void cancel() {
        logger.info("Cancelling Subscription... . . .");
        isCancelled = true;
    }
}

SubscriberImpl.java

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriberImpl implements Subscriber<String> {
    private static final Logger logger = LoggerFactory.getLogger(SubscriberImpl.class);
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
    }

    @Override
    public void onNext(String emailId) {
        logger.info("Received {}", emailId);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.info("---------------------------------------------");
        logger.info("Received error {}", throwable.getMessage());
    }

    @Override
    public void onComplete() {
        logger.info("Subscription ended");
    }

    public Subscription getSubscription() {
        return subscription;
    }
}

Main.java

import org.mugil.publisher.PublisherImpl;
import org.mugil.subscriber.SubscriberImpl;

import java.time.Duration;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        getMessages();
    }

    public static void getMessages() throws InterruptedException {
        var objPublisher = new PublisherImpl();
        var objSubscriber = new SubscriberImpl();

        objPublisher.subscribe(objSubscriber);
        objSubscriber.getSubscription().request(1);
        Thread.sleep(Duration.ofSeconds(2));

        objSubscriber.getSubscription().request(2);
        Thread.sleep(Duration.ofSeconds(2));

        objSubscriber.getSubscription().request(3);
        Thread.sleep(Duration.ofSeconds(2));

        objSubscriber.getSubscription().cancel();

        objSubscriber.getSubscription().request(1);
    }
}

Output

18:00:57.240 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 1 items 
18:00:57.534 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received jani.mante@gmail.com
18:00:59.537 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 2 items 
18:00:59.541 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received sunny.quigley@yahoo.com
18:00:59.544 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received hang.gutkowski@yahoo.com
18:01:01.546 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 3 items 
18:01:01.548 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received malik.thiel@hotmail.com
18:01:01.549 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received andre.purdy@gmail.com
18:01:01.550 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received kim.greenfelder@gmail.com
18:01:03.560 [main] INFO org.mugil.publisher.SubscriptionImpl -- Cancelling Subscription... . . .

Mono is a type of instant Publisher that represents a single or empty value. This means it can emit only one value at most for the onNext() request and then terminates with the onComplete() signal. In case of failure, it only emits a single onError() signal.Most Mono implementations are expected to immediately call onComplete on their Subscriber after having called onNext.a combination of onNext and onError is explicitly forbidden.

Overridden Lambda implementation available in mono

subscribe();  //1

subscribe(Consumer<? super T> consumer);  //2

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer);  //3

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer);  //4

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); //5

//1

        Publisher<String> rctMono  = Mono.just("Hello React"); // Simple Mono Publisher using Just
        var subs = new SubscriberImpl();
        rctMono.subscribe(subs);
        subs.getSubscription().request(10);

Output

08:21:46.241 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received Hello React
08:21:46.252 [main] INFO org.mugil.subscriber.SubscriberImpl -- Subscription ended

//5


       Mono<Integer> rctMono2 = Mono.just(1)
                                    .map(i -> i/0);
       rctMono2.subscribe(i -> System.out.println(i),   //Consumer
                          err -> System.out.println("Error Msg -" + err.getMessage()), //onError, Not Mandatory
                          () -> System.out.println("Completed"), //onComplete, Not Mandatory
                          subscription -> subscription.request(1)); //onRequest, Not Mandatory

Output

Error Msg -/ by zero

Simple code which returns Mono based on switch case

    public static Mono<String> getUserName(Integer num){
        return switch (num){
            case 1 -> Mono.just("How are you");
            case 2 -> Mono.empty();
            default -> Mono.error(new RuntimeException("Invalid Input"));
        };
    }

Using mono.just would invoke the sumOfNums as it always fetches value from JVM memory rather than streaming

public static void main(String[] args) {
  List<Integer> lstNums = List.of(1,2,3);
  Mono.just(sumOfNums(lstNums)); //Mono.just takes the value from memory so wont be suitable for streaming data incase large data should be handled
}

public static int sumOfNums(List<Integer> lstNums){
   System.out.println("Sum invoked");
   return lstNums.stream().mapToInt(num -> num).sum();
}

Output

Sum invoked

Using mono.fromSupplier would invoke the sumOfNums during Terminal Operation rather than Intermediate Operation

List<Integer> lstNums = List.of(1,2,3);
        Mono.fromSupplier(() -> sumOfNums(lstNums)); // Intermediate Operation                 


  public static int sumOfNums(List<Integer> lstNums){
        System.out.println("Sum invoked");
        return lstNums.stream().mapToInt(num -> num).sum();
  }

Output


Mono.fromSupplier vs Mono.fromCallable
fromCallable calls a checked exception where as fromSupplier doesnot throws checked exception. So if you substitute supplier in place of callable you should also write try catch block to handle exception

Reading a File extending Thread API

  1. ReadFile.java has a run() method which implements the reading the file code within the try with resources block
  2. In the main method start method is called over the ReadFile class instance
  3. In thread we have coded is asynchrobnous(order of execution cannot be guaranteed) which we can see from the output below

TestThread.java

package com.mugil.test;

import com.mugil.runnables.ReadFile;

public class TestThread {
	public static void main(String[] args) {
		ReadFile objReadFileThread1 = new ReadFile();
		ReadFile objReadFileThread2 = new ReadFile();
		ReadFile objReadFileThread3 = new ReadFile();
				
		objReadFileThread1.start();
		objReadFileThread2.start();
		objReadFileThread3.start();
	}
}

ReadFile.java

package com.mugil.runnables;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;

public class ReadFile extends Thread {

 public void run() {

  try (BufferedReader reader = new BufferedReader(new FileReader(new File("E:\\JavaProjects\\JavaThreads\\src\\Sample.txt")))) {
   String line = null;

   while ((line = reader.readLine()) != null) {
    System.out.println(Thread.currentThread().getName() + " reading line " + line);
   }

  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }

 }
}

Output

Thread-2 reading line Line1
Thread-0 reading line Line1
Thread-0 reading line Line2
Thread-0 reading line Line3
Thread-1 reading line Line1
Thread-1 reading line Line2
Thread-2 reading line Line2
Thread-1 reading line Line3
Thread-1 reading line Line4
Thread-1 reading line Line5
Thread-0 reading line Line4
Thread-0 reading line Line5
Thread-2 reading line Line3
Thread-2 reading line Line4
Thread-2 reading line Line5

Reading a File implementing Runnable API

  1. Now in the below code the runnable API is implemented rather than extending like Thread
  2. The run() is called over instance of ReadFile rather than start() method
  3. Calling run() method will start the execution of thread in the present running thread rather than creating new Thread for execution which can been seen in output main reading line rather than Thread-N reading line

TestThread.java

package com.mugil.test;

import com.mugil.runnables.ReadFile;

public class TestThread {
	public static void main(String[] args) {
		ReadFile objReadFileThread1 = new ReadFile();
		ReadFile objReadFileThread2 = new ReadFile();
		ReadFile objReadFileThread3 = new ReadFile();
				
		objReadFileThread1.run();
		objReadFileThread2.run();
		objReadFileThread3.run();
	}
}

ReadFile.java

public class ReadFile implements Runnable {

 public void run() {

  try (BufferedReader reader = new BufferedReader(new FileReader(new File("E:\\JavaProjects\\JavaThreads\\src\\Sample.txt")))) {
   String line = null;

   while ((line = reader.readLine()) != null) {
    System.out.println(Thread.currentThread().getName() + " reading line " + line);
   }

  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
}

Output

main reading line Line1
main reading line Line2
main reading line Line3
main reading line Line4
main reading line Line5
main reading line Line1
main reading line Line2
main reading line Line3
main reading line Line4
main reading line Line5
main reading line Line1
main reading line Line2
main reading line Line3
main reading line Line4
main reading line Line5

Methods to Manage thread are available on Thread class not in Runnable. So we can pass the runnable instance as parameter like one below
TestThread.java

.
.
.
Thread objThread = new Thread(runObj);
objThread.start();
.
.

In a real world analogy. Let’s say you have to get done 2 very important tasks in one day:

  • Get a passport
  • Get a presentation done

Now, the problem is that task-1 requires you to go to an extremely bureaucratic government office that makes you wait for 4 hours in a line to get your passport. Meanwhile, task-2 is required by your office, and it is a critical task. Both must be finished on a specific day.

Case 1: Sequential Execution
Ordinarily, you will drive to passport office for 2 hours, wait in the line for 4 hours, get the task done, drive back two hours, go home, stay awake 5 more hours and get presentation done.

Case 2: Concurrent Execution
But you’re smart. You plan ahead. You carry a laptop with you, and while waiting in the line, you start working on your presentation. This way, once you get back at home, you just need to work 1 extra hour instead of 5.

In this case, both tasks are done by you, just in pieces. You interrupted the passport task while waiting in the line and worked on presentation. When your number was called, you interrupted presentation task and switched to passport task. The saving in time was essentially possible due to interruptability of both the tasks.

Concurrency, IMO, can be understood as the “isolation” property in ACID. Two database transactions are considered isolated if sub-transactions can be performed in each and any interleaved way and the final result is same as if the two tasks were done sequentially. Remember, that for both the passport and presentation tasks, you are the sole executioner.

Case 3: Parallel Execution
Now, since you are such a smart fella, you’re obviously a higher-up, and you have got an assistant. So, before you leave to start the passport task, you call him and tell him to prepare first draft of the presentation. You spend your entire day and finish passport task, come back and see your mails, and you find the presentation draft. He has done a pretty solid job and with some edits in 2 more hours, you finalize it.

Now since, your assistant is just as smart as you, he was able to work on it independently, without needing to constantly ask you for clarifications. Thus, due to the independentability of the tasks, they were performed at the same time by two different executioners.

Still with me? Alright…

Case 4: Concurrent But Not Parallel
Remember your passport task, where you have to wait in the line? Since it is your passport, your assistant cannot wait in line for you. Thus, the passport task has interruptability (you can stop it while waiting in the line, and resume it later when your number is called), but no independentability (your assistant cannot wait in your stead).

Case 5: Parallel But Not Concurrent
Suppose the government office has a security check to enter the premises. Here, you must remove all electronic devices and submit them to the officers, and they only return your devices after you complete your task.

In this, case, the passport task is neither independentable nor interruptible. Even if you are waiting in the line, you cannot work on something else because you do not have necessary equipment.

Similarly, say the presentation is so highly mathematical in nature that you require 100% concentration for at least 5 hours. You cannot do it while waiting in line for passport task, even if you have your laptop with you.

In this case, the presentation task is independentable (either you or your assistant can put in 5 hours of focused effort), but not interruptible.

Case 6: Concurrent and Parallel Execution
Now, say that in addition to assigning your assistant to the presentation, you also carry a laptop with you to passport task. While waiting in the line, you see that your assistant has created the first 10 slides in a shared deck. You send comments on his work with some corrections. Later, when you arrive back home, instead of 2 hours to finalize the draft, you just need 15 minutes.

This was possible because presentation task has independentability (either one of you can do it) and interruptability (you can stop it and resume it later). So you concurrently executed both tasks, and executed the presentation task in parallel.

Let’s say that, in addition to being overly bureaucratic, the government office is corrupt. Thus, you can show your identification, enter it, start waiting in line for your number to be called, bribe a guard and someone else to hold your position in the line, sneak out, come back before your number is called, and resume waiting yourself.

In this case, you can perform both the passport and presentation tasks concurrently and in parallel. You can sneak out, and your position is held by your assistant. Both of you can then work on the presentation, etc.

Back to Computer Science
In computing world, here are example scenarios typical of each of these cases:

Case 1: Interrupt processing.
Case 2: When there is only one processor, but all executing tasks have wait times due to I/O.
Case 3: Often seen when we are talking about map-reduce or hadoop clusters.
Case 4: I think Case 4 is rare. It’s uncommon for a task to be concurrent but not parallel. But it could happen. For example, suppose your task requires access to a special computational chip which can be accessed through only processor-1. Thus, even if processor-2 is free and processor-1 is performing some other task, the special computation task cannot proceed on processor-2.
Case 5: also rare, but not quite as rare as Case 4. A non-concurrent code can be a critical region protected by mutexes. Once it is started, it must execute to completion. However, two different critical regions can progress simultaneously on two different processors.
Case 6: IMO, most discussions about parallel or concurrent programming are basically talking about Case 6. This is a mix and match of both parallel and concurrent executions.

1 server , 1 job queue (with 5 jobs) -> no concurrency, no parallelism (Only one job is being serviced to completion, the next job in the queue has to wait till the serviced job is done and there is no other server to service it)

1 server, 2 or more different queues (with 5 jobs per queue) -> concurrency (since server is sharing time with all the 1st jobs in queues, equally or weighted) , still no parallelism since at any instant, there is one and only job being serviced.

2 or more servers , one Queue -> parallelism ( 2 jobs done at the same instant) but no concurrency ( server is not sharing time, the 3rd job has to wait till one of the server completes.)

2 or more servers, 2 or more different queues -> concurrency and parallelism

In other words, concurrency is sharing time to complete a job, it MAY take up the same time to complete its job but at least it gets started early. Important thing is , jobs can be sliced into smaller jobs, which allows interleaving.Parallelism is achieved with just more CPUs , servers, people etc that run in parallel.If the resources are shared, pure parallelism cannot be achieved, but this is where concurrency would have it’s best practical use, taking up another job that doesn’t need that resource.