Kafka
One of the activity in application is to transfer data from Source System to Target System. Over period of time this communication becomes complex and messy. Kafka provides simplicity to build real-time streaming data pipelines and real-time streaming applications.Kafka Cluster sits in the middle of Source System and Target System. Data from Source System is moved to Cluster by Producer and Data from Cluster is moved from to Target System by Consumer.

        Source System -> {Producer} -> [Kafka Cluster] -> {Consumer} -> Target System

Kafka Provides Seamless integration across applications hosted in multiple platforms by acting as a intermediate.

  1. Sequence of message are called Data Stream. Topic is a particular stream of Data. Topics are organized inside cluster. Topics are like rows in a database which are identified by Topic name.
  2. Cluster Contains Topics -> Topics contains Data Stream -> Data Stream is Made of Seq of Messages
  3. To add(write) Data to Topic, we use Kafka Producer and to read data we use kafka consumers.
  4. Topics
    1. What is Topic? Topics are the categories used to organize messages.Topics are like rows in a table which are identified by Topic name(table name).
    2. Why it is Needed? Logical channel for producers to publish messages and consumers to receive them I.E. Processing payments, Tracking Assets, Monitoring patients, Tracking Customer Interactions
    3. How it works? Logical channel for producers to publish messages and consumers to receive them.A topic is a log of events. Logs are easy to understand, because they are simple data structures with well-known semantics.

  5. Partitions
    1. What are Partitions ? Topics are split into multiple partitions. Messages sent to Topic end up in these partitions, and the messages are ordered by Id(Kafka Partition Offsets). A partition in Kafka is the storage unit that allows for a topic log to be separated into multiple logs and distributed over the Kafka cluster.
      Partitions are immutable. Once data written to partition cannot be changed. Data is kept for one Week which is default configuration.
    2. Why Partition is needed? Partitions allow Kafka to scale horizontally by distributing data across multiple brokers.Multiple consumers can read from different partitions in parallel, and multiple producers can write to different partitions simultaneously. Each partition can have multiple replicas spread across different brokers.
    3. How Partition works? By breaking a single topic log into multiple logs, which are then spread across one or more brokers. This allows Kafka to scale and handle large amounts of data efficiently

  6. Broker

    1. What is Broker? is a server that manages the flow of transactions between producers and consumers in Apache Kafka. Kafka brokers store data in topics, which are divided into partitions. Each broker hosts a set of partitions. Brokers handle requests from clients to write and read events to and from partitions
    2. How Broker works?One broker acts as the Kafka controller (Kafka Broker Leader), which does administrative task, maintaining the state of the other brokers, health check of brokers and reassigning work
    3. Why Broker is required?Producers connect to a broker to write events, while consumers connect to read events.
  7. Offset
    1. What is Offset? Offset is a unique identifier for a message in a Kafka partition. An offset is an integer that represents the position of a message in a partition’s log. The first message in a partition has an offset of 0, the second message has an offset of 1, and so on.
    2. Why it needed? Offsets enable Kafka to provide sequential, ordered, and replayable data processing. This numerical value helps Kafka keep track of progress within a partition. It also allows Kafka to scale horizontally while staying fault-tolerant.
    3. How it works? When a producer publishes a message to a Kafka topic, it’s appended to the end of the partition’s log and assigned a new offset. Consumers maintain their current offset, which indicates the last processed message in each partition.
  8. Producer
    1. What is Producer? Producer writes data to Kafka broker which would be picked by consumer. A producer can send anything, but it’s typically serialized into a byte array. It can also include a message key, timestamp, compression type, and headers.
    2. How Producer works? A producer writes messages to a Kafka broker, which then adds a partition and offset ID to the message.
    3. Why Producer is needed? It allows applications to send streams of data to the Kafka cluster
    4. Producer uses partitioner to decide to which partition the data should write. Producer doesnot decided the broker rather it endup in the respective broker because of the partition presence.
    5. Producer has message keys in message which they send. If the key is null, the data is sent using a round-robin mechanism for writing. If the key is not null, then it would end up in the same partition based on the key. Message ordering is possible with key
  9. Consumer
    1. What is Consumer? Consumer reads data from Kafka broker.
    2. How Consumer works? A consumer issues fetch requests to brokers for partitions it wants to consume. It specifies a log offset, and receives a chunk of log that starts at that offset position. The consumer should know in advance the format of the message.
    3. Why Consumer is needed? It allows applications to receive streams of data from the Kafka broker
  10. Consumer Group
    1. What is Consumer Group? a collection of consumer applications that work together to process data from topics in parallel
    2. How Consumer Group works? A consumer group divides the partitions of a topic among its consumers. Each consumer is assigned a subset of partitions, and only one consumer can process a given partition.
    3. Why Consumer group is needed?allow multiple consumers to work together to process events from a topic in parallel. This is important for scalability, as it enables consumers to read from many events simultaneously.
  11. Messages

    1. What is Message?Kafka messages are created by the producer using serialization mechanism
    2. How Message works? Kafka messages are stored as serialized bytes and have a key-value structure
    3. Why Message is needed? Basic Unit of data in Kafka
    4. Key is a unique identifier of the Partition, which would be null first time. Value is the actual message. Both Key and value would be in Binary format
  12. Partioner

    1. What is Partioner?Kafka’s partitioning feature is a key part of its ability to scale and handle large amounts of data. Partioning is done by partioner
    2. How Partioner works?A Kafka partitioner uses hashing to determine which partition a message should be sent to. It employs Key hashing technique that allows for related messages to be grouped together and processed in the correct order. For example, if a Kafka producer uses a user ID as the key for messages about various users, all messages related to a specific user will be sent to the same partition.
  13. Zookeeper

    1. What is Zookeeper?ZooKeeper is a software tool that helps maintain naming and configuration data, and provides synchronization within distributed systems
    2. How Zookeeper works? Zookeeper keeps track of which brokers are part of the Kafka cluster. Zookeeper is used by Kafka brokers to determine which broker is the leader of a given partition and topic and perform leader elections. Zookeeper stores configurations for topics and permissions. Zookeeper sends notifications to Kafka in case of changes (e.g. new topic, broker dies, broker comes up, delete topics, etc.…)

Define an interface for creating an object, but let subclasses decide which class to instantiate. Factory Method lets a class defer instantiation to subclasses.

  1. In Factory Pattern we have Product(Abstract), ConcreteProduct and Creator(Abstract), ConcreteCreator
  2. ConcreteCreator would create ConcreteProduct by implementing abstract factory method of Creator which has Product return type
  3. Incase if there is any new Product to be added it fully supports Open Closed Principle(Open For Extension, Closed for Changes).
  4. Open for Extension – Adding new ConcreteProduct and ConcreateCreator class, Closed for Changes – No changes in anyother code unlike Simple factory or static factory method which requires change in Switchcase, enum (or) if case
  5. Closed for Changes – No changes in anyother code unlike Simple factory or static factory method which requires change in Switchcase, enum (or) if case

JobProfile.java

abstract class JobProfile {
    public abstract String mandatorySkills();

    public Integer defaultWorkHrs(){
        return 8;
    }
}

JavaProfile.java

public class JavaProfile extends JobProfile{
    @Override
    public String mandatorySkills() {
        return "Java, Springboot, Microservices";
    }
}

SQLProfile.java

public class SQLProfile extends JobProfile{
    @Override
    public String mandatorySkills() {
        return "Cosmos, MySQL, MSSQL";
    }
}

JobProfileCreator.java

abstract class JobProfileCreator {
    public JobProfile getJobProfile(){
        JobProfile objJobProfile = createJobProfileFactory();
        return objJobProfile;
    }

    public abstract JobProfile createJobProfileFactory();
}

JavaProfileCreator.java

public class JavaProfileCreator extends JobProfileCreator {
    @Override
    public JobProfile createJobProfileFactory() {
       return new JavaProfile();
    }
}

SQLProfileCreator.java

public class SQLProfileCreator extends JobProfileCreator {
    @Override
    public JobProfile createJobProfileFactory() {
        return new SQLProfile();
    }
}

Consultancy.java

public class Consultancy {
    public static void main(String[] args) {
        getProfileDetails(new JavaProfileCreator());
    }

    public static void getProfileDetails(JobProfileCreator jobProfileCreator){
        JobProfile objJobProfile = jobProfileCreator.getJobProfile();
        System.out.println(objJobProfile.mandatorySkills() + " with "+ objJobProfile.defaultWorkHrs() + "hrs of Work");
    }
}

Output

Java, Springboot, Microservices with 8hrs of Work

  1. In Simple Factory we have a Factory Class(LoggerFactory.java) and We call the createLogger method which returns different implementation of logger
  2. Logger is a abstract class which has different implementations

Logger.java

public abstract class Logger {
  abstract void log(String logstring);
}

ConsoleLogger.java

public class ConsoleLogger extends Logger{
    @Override
    void log(String logstring) {
        System.out.println("Logging to Console - "+ logstring);
    }
}

DBLogger.java

public class DBLogger extends Logger{
    @Override
    void log(String logstring) {
        System.out.println("Logging to Database - "+ logstring);
    }
}

FileLogger.java

public class FileLogger extends Logger{
    @Override
    void log(String logstring) {
        System.out.println("Logging to File - "+ logstring);
    }
}

LoggerFactory.java

public class LoggerFactory {
    public enum LoggerType {
        DATABASE, FILE, CONSOLE;
    }

    //The same code could be written using if else block instead of switch case
    public Logger createLogger(LoggerType loggerType) {
        Logger logger;

        switch (loggerType) {
            case FILE:
                logger = new FileLogger();
                break;
            case DATABASE:
                logger = new DBLogger();
                break;
            case CONSOLE:
                logger = new ConsoleLogger();
                break;
            default:
                logger = new ConsoleLogger();
                break;
        }

        return logger;
    }
}

ClientApp.java

public class ClientApp {
    public static void main(String[] args) {
        LoggerFactory objLoggerFactory = new LoggerFactory();
        Logger logger = objLoggerFactory.createLogger(LoggerFactory.LoggerType.CONSOLE);
        logger.log("Hello there");
    }
}

Output

Logging to Console - Hello there

Factory allows the consumer to create new objects without having to know the details of how they’re created, or what their dependencies are – they only have to give the information they actually want.

Account.java

abstract class Account {
   abstract Integer calculateInterest();
}

CreditAccount.java

public class CreditAccount extends Account{
    @Override
    Integer calculateInterest() {
        return 11;
    }
}

SalaryAccount.java

public class SalaryAccount extends Account{
    @Override
    Integer calculateInterest() {
        return 5;
    }
}

SavingsAccount.java

public class SavingsAccount extends Account{
   @Override
    Integer calculateInterest() {
        return 7;
    }
}

AccountFactory.java

public class AccountFactory {
    static Map<String, Account>  hmAccountMap =  new HashMap<>();

    static {
        hmAccountMap.put("SavingsAcc", new SavingsAccount());
        hmAccountMap.put("CreditAcc", new CreditAccount());
        hmAccountMap.put("SalaryAcc", new SalaryAccount());
    }

    public static Account getAccount(String accountType){
        return hmAccountMap.get(accountType);
    }
}

CalcInterest.java

public class CalcInterest{
    public static void main(String[] args) {
        Account objAccountFactory = AccountFactory.getAccount("SavingsAcc");
        System.out.println("Interest rate is - " + Optional.of(objAccountFactory.calculateInterest()));
    }
}

Using Streams for AccountFactory Class
AccountFactory.java

public static Optional<Account> getAccount(String accountType) {
        return hmAccountMap.entrySet().stream()
                                      .filter(accParam -> accParam.getKey().equals(accountType))
                                      .findFirst()
                                      .map(Map.Entry::getValue);
}

CalcInterest.java

public class CalcInterest{
    public static void main(String[] args) {
        Account objAccountFactory = AccountFactory.getAccount("SavingsAcc");
        System.out.println("Interest rate is - " + Optional.of(objAccountFactory.calculateInterest()));
    }
}

Output

Interest rate is - 7

Static factory method is a static method that returns an instance of a class. The key idea is to gain control over object creation and delegate it from constructor to static method.

The key idea of static factory method is to gain control over object creation and delegate it from constructor to static method. The decision of object to be created is like in Factory made outside the method (in common case, but not always). While the key (!) idea of Factory Method is to delegate decision of what instance of class to create inside Factory Method. E.g. classic Singleton implementation is a special case of static factory method. Example of commonly used static factory methods:

  • valueOf
  • getInstance(used in singleton)
  • newInstance

When to use?

  1. Static factory methods can have meaningful names, hence explicitly conveying what they do
  2. Static factory methods can return the same type that implements the method(s), a subtype, and also primitives, so they offer a more flexible range of returning types
  3. Static factory methods can encapsulate all the logic required for pre-constructing fully initialized instances, so they can be used for moving this additional logic out of constructors. This prevents constructors from performing further tasks, others than just initializing fields

Another example of static factory is as follow

Optional<String> value1 = Optional.empty();
Optional<String> value2 = Optional.of("Baeldung");
Optional<String> value3 = Optional.ofNullable(null);

Logger.java

public class Logger {
    public String logType;
    public String fileLocation;

    public Logger(String logType) {
        this.logType = logType;
    }

    public Logger() {
    }

    public static Logger getDefaultLogger(){
        return new Logger("Console");
    }

    public static Logger getFileLogger(String fileLocation){
        Logger logger =  new Logger("File");
        logger.fileLocation = fileLocation;
        return logger;
    }

    @Override
    public String toString() {
        return "Logger{" +
                "logType=" + logType +
                ", fileLocation='" + fileLocation + '\'' +
                '}';
    }
}

ClientApp.java

public class ClientApp {
    public static void main(String[] args) {
        Logger objLogger = Logger.getDefaultLogger();
        System.out.println(objLogger);
    }
}

Output

Logger{logType=Console, fileLocation='null'}

Logger.java

public class Logger {
    public String logType;
    public String fileLocation;

    public Logger(String logType) {
        this.logType = logType;
    }

    public Logger() {
    }

    public static Logger getLoggerInstance(LoggerType loggerType){
        Logger logger;
        switch(loggerType) {
            case CONSOLE:
                logger = new Logger("Console Logger");
                logger.fileLocation = "JVM Memory";
                break;
            case DATABASE:
                logger = new Logger("Database Logger");
                logger.fileLocation = "DB Connection";
                break;
            case FILE:
                logger = new Logger("File Logger");
                logger.fileLocation = "C:/logs";
                break;
            case SPLUNK:
                logger = new Logger("Splunk Logger");
                logger.fileLocation = "Splunk URL";
                break;
            default:
                logger = new Logger();

        }

        return logger;
    }

    @Override
    public String toString() {
        return "Logger{" +
                "logType=" + logType +
                ", fileLocation='" + fileLocation + '\'' +
                '}';
    }
}

ClientApp.java

public static void main(String[] args) {
        Logger objLogger = Logger.getLoggerInstance(LoggerType.CONSOLE);
        System.out.println(objLogger);
    }

LoggerType.java

public enum LoggerType {
    CONSOLE, FILE, DATABASE, SPLUNK
}

Output

Logger{logType=Console Logger, fileLocation='JVM Memory'}

  1. RAM is Secondary Memory and Processor is Main memory.
  2. Programs always exist in Hard Disk memory and when started, instance of program would be loaded as process in Secondary Memory
  3. Thread is part of process. Process is unit of resource and thread is unit of execution
  4. If there is Single core Processor, processor would take turns while executing task called context switching. Task is nothing but piece of code in execution. Task is subset of process.
  5. OS Scheduler governs the context switching. Thread states are stored during context switching so it could be resumed from where it is left. Each Thread has its own Thread Stack
  6. Java thread is nothing but wrapper over OS thread or kernel thread. The reason why Java thread is wrapper around OS thread is only with OS thread the scheduling for processing could be done

Communication Models

  1. Synchronous + Blocking – Calling Customer Service and waiting for response online
  2. Asynchronous- Asking My Friend to Call Customer Service and I carry forward with my work
  3. Non-Blocking- Asking Call back from Customer Service and I carry forward with my work
  4. Asynchronous + Non Blocking – I am Calling customer Service and asking to call back My Friend and I carry forward with my work

request -> response
request -> streaming response (Stock Price in Stock Market App, Heart Beat for Health Check in Spring Boot Appp)
streaming request -> response (Using Google Docs and updating in drive in regular time intervals)
streaming request -> streaming response (Playing Game Online)

Reactive Stream Specification
Process Stream of Messages in a Non Blocking Asynchronous manner with Observer Design Pattern(Observe and React incase of change)

  1. Publisher: Emits a sequence of elements to its subscribers.
    void subscribe(Subscriber<? super T> s)
    
  2. Subscriber: Consumes elements provided by a Publisher.
    void onSubscribe(Subscription s)
    void onNext(T t)
    void onError(Throwable t)
    void onComplete()
    
  3. Subscription: Represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher.
    void request(long n)
    void cancel()
    
  4. Processor: Represents a processing stage, which is both a Subscriber and a Publisher.Inherits both Subscriber and Publisher interfaces.

There would be one Publisher at Top, similar to root of tree and there would be 0 to N intermediate processors(subscriber + publisher) and there would be leaf Subscriber

Publisher, Subscriber and Subscription

public interface Publisher<T> {
   public void subscribe(Subscriber<? super T> s);
}
public interface Subscription {
   public void request(long n);
   public void cancel();
}
public interface Subscriber<T> {
   public void onSubscribe(Subscription s);
   public void onNext(T t);
   public void onError(Throwable t);
   public void onComplete();
}
  1. Publisher will have subscribe method through which we would pass the subscriber instance. The Subscription object would be returned by Publisher
  2. The Publisher hands over subscription object to Subscriber. Subscriber uses onSubscribe method to accept subscription.
  3. Subscriber could use subscription object using request method and could cancel subscription. Communication between Publisher and Subscriber happens using subscription object
  4. Subscriber can request N items using subscription object. The Publisher can iterate to N object using onNext method. Publisher only give 3 items if 3 items is requested by subscriber.
  5. If the Publisher has completed transferring all Items, then Publisher can call onComplete() method in Subscriber to notify the subscriber that its work is done
  6. Publisher calls onError() method to notify error.

  1. publisher does not produce data unless subscriber requests for it.
  2. publisher will produce only <= subscriber requested items. publisher can also produce 0 items!
  3. subscriber can cancel the subscription. producer should stop at that moment as subscriber is no longer interested in consuming the data
  4. producer can send the error signal

  1. PublisherImpl instance would call the subscribe method and pass instance of SubscriberImpl
  2. PublisherImpl subscribe method would inturn call the onSubscribe method using the instance of SubscriberImpl passed.
  3. SubscriberImpl would get the same subscription which it has been passed to publisherImpl subscribe method earlier
  4. PublisherImpl has following methods
    • subscribe – takes subscriber as argument and creates new subscription and notify the subscriber by calling the onSubscribe method
      public void subscribe(Subscriber subscriber) {
              var subscription = new SubscriptionImpl(subscriber);
              subscriber.onSubscribe(subscription);
      }
      
  5. SubscriberImpl has following methods
    • onSubscribe – To get the same subscription passed to publisher subscribe method. This is inturn called from publisherImpl
    • onNext – called from subscriptionImpl to pass the data during iteration
    • onError – called from subscriptionImpl during error
    • onComplete – called from when all data is completed
    @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
        }
    
        @Override
        public void onNext(String emailId) {
            logger.info("Received {}", emailId);
        }
    
        @Override
        public void onError(Throwable throwable) {
            logger.info("---------------------------------------------");
            logger.info("Received error {}", throwable.getMessage());
        }
    
        @Override
        public void onComplete() {
            logger.info("Subscription ended");
        }
    
  6. SubscriptionImplhas following methods
    • request – you can request date using subscriptionImpl instance by passing no of records
    • cancel – cancel subscription
    @Override
        public void request(long requestedItemCnt) {
            if(isCancelled){
                return;
            }
    
            logger.info("Subscriber has requested {} items ", requestedItemCnt);
    
            if(requestedItemCnt >MAX_ITEMS){
                this.subscriber.onError(new RuntimeException(" Items requested is more than Total Items Available"));
                this.isCancelled = true;
                return;
            }
    
            //Check if all items(MAX_ITEMS) were sent
            for(int idx=0;idx<requestedItemCnt && count<MAX_ITEMS; idx++){
                count++;
                this.subscriber.onNext(this.faker.internet().emailAddress());
            }
    
            //If all items were sent complete subscription
            if(count == MAX_ITEMS){
                logger.info("No More data from Producer");
                this.subscriber.onComplete();
                isCancelled = true;
            }
        }
    
        @Override
        public void cancel() {
            logger.info("Cancelling Subscription... . . .");
            isCancelled = true;
        }
    

pom.xml

  <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty-http</artifactId>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>com.github.javafaker</groupId>
            <artifactId>javafaker</artifactId>
            <version>${faker.version}</version>
        </dependency>

PublisherImpl.java

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class PublisherImpl implements Publisher {
    @Override
    public void subscribe(Subscriber subscriber) {
        var subscription = new SubscriptionImpl(subscriber);
        subscriber.onSubscribe(subscription);
    }
}

SubscriptionImpl.java

public class SubscriptionImpl implements Subscription {
    private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class);
    private final Subscriber<? super String> subscriber;
    private boolean isCancelled = false;
    private final Faker faker;

    private final int MAX_ITEMS = 10;
    private static int count = 0;


    public SubscriptionImpl(Subscriber subscriber){
        this.subscriber = subscriber;
        this.faker = Faker.instance();
    }

    @Override
    public void request(long requestedItemCnt) {
        if(isCancelled){
            return;
        }

        logger.info("Subscriber has requested {} items ", requestedItemCnt);

        if(requestedItemCnt >MAX_ITEMS){
            this.subscriber.onError(new RuntimeException(" Items requested is more than Total Items Available"));
            this.isCancelled = true;
            return;
        }

        //Check if all items(MAX_ITEMS) were sent
        for(int idx=0;idx<requestedItemCnt && count<MAX_ITEMS; idx++){
            count++;
            this.subscriber.onNext(this.faker.internet().emailAddress());
        }


        //If all items were sent complete subscription
        if(count == MAX_ITEMS){
            logger.info("No More data from Producer");
            this.subscriber.onComplete();
            isCancelled = true;
        }
    }

    @Override
    public void cancel() {
        logger.info("Cancelling Subscription... . . .");
        isCancelled = true;
    }
}

SubscriberImpl.java

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriberImpl implements Subscriber<String> {
    private static final Logger logger = LoggerFactory.getLogger(SubscriberImpl.class);
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
    }

    @Override
    public void onNext(String emailId) {
        logger.info("Received {}", emailId);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.info("---------------------------------------------");
        logger.info("Received error {}", throwable.getMessage());
    }

    @Override
    public void onComplete() {
        logger.info("Subscription ended");
    }

    public Subscription getSubscription() {
        return subscription;
    }
}

Main.java

import org.mugil.publisher.PublisherImpl;
import org.mugil.subscriber.SubscriberImpl;

import java.time.Duration;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        getMessages();
    }

    public static void getMessages() throws InterruptedException {
        var objPublisher = new PublisherImpl();
        var objSubscriber = new SubscriberImpl();

        objPublisher.subscribe(objSubscriber);
        objSubscriber.getSubscription().request(1);
        Thread.sleep(Duration.ofSeconds(2));

        objSubscriber.getSubscription().request(2);
        Thread.sleep(Duration.ofSeconds(2));

        objSubscriber.getSubscription().request(3);
        Thread.sleep(Duration.ofSeconds(2));

        objSubscriber.getSubscription().cancel();

        objSubscriber.getSubscription().request(1);
    }
}

Output

18:00:57.240 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 1 items 
18:00:57.534 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received jani.mante@gmail.com
18:00:59.537 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 2 items 
18:00:59.541 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received sunny.quigley@yahoo.com
18:00:59.544 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received hang.gutkowski@yahoo.com
18:01:01.546 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 3 items 
18:01:01.548 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received malik.thiel@hotmail.com
18:01:01.549 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received andre.purdy@gmail.com
18:01:01.550 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received kim.greenfelder@gmail.com
18:01:03.560 [main] INFO org.mugil.publisher.SubscriptionImpl -- Cancelling Subscription... . . .

Mono is a type of instant Publisher that represents a single or empty value. This means it can emit only one value at most for the onNext() request and then terminates with the onComplete() signal. In case of failure, it only emits a single onError() signal.Most Mono implementations are expected to immediately call onComplete on their Subscriber after having called onNext.a combination of onNext and onError is explicitly forbidden.

Overridden Lambda implementation available in mono

subscribe();  //1

subscribe(Consumer<? super T> consumer);  //2

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer);  //3

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer);  //4

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); //5

//1

        Publisher<String> rctMono  = Mono.just("Hello React"); // Simple Mono Publisher using Just
        var subs = new SubscriberImpl();
        rctMono.subscribe(subs);
        subs.getSubscription().request(10);

Output

08:21:46.241 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received Hello React
08:21:46.252 [main] INFO org.mugil.subscriber.SubscriberImpl -- Subscription ended

//5


       Mono<Integer> rctMono2 = Mono.just(1)
                                    .map(i -> i/0);
       rctMono2.subscribe(i -> System.out.println(i),   //Consumer
                          err -> System.out.println("Error Msg -" + err.getMessage()), //onError, Not Mandatory
                          () -> System.out.println("Completed"), //onComplete, Not Mandatory
                          subscription -> subscription.request(1)); //onRequest, Not Mandatory

Output

Error Msg -/ by zero

Simple code which returns Mono based on switch case

    public static Mono<String> getUserName(Integer num){
        return switch (num){
            case 1 -> Mono.just("How are you");
            case 2 -> Mono.empty();
            default -> Mono.error(new RuntimeException("Invalid Input"));
        };
    }

Using mono.just would invoke the sumOfNums as it always fetches value from JVM memory rather than streaming

public static void main(String[] args) {
  List<Integer> lstNums = List.of(1,2,3);
  Mono.just(sumOfNums(lstNums)); //Mono.just takes the value from memory so wont be suitable for streaming data incase large data should be handled
}

public static int sumOfNums(List<Integer> lstNums){
   System.out.println("Sum invoked");
   return lstNums.stream().mapToInt(num -> num).sum();
}

Output

Sum invoked

Using mono.fromSupplier would invoke the sumOfNums during Terminal Operation rather than Intermediate Operation

List<Integer> lstNums = List.of(1,2,3);
        Mono.fromSupplier(() -> sumOfNums(lstNums)); // Intermediate Operation                 


  public static int sumOfNums(List<Integer> lstNums){
        System.out.println("Sum invoked");
        return lstNums.stream().mapToInt(num -> num).sum();
  }

Output


Mono.fromSupplier vs Mono.fromCallable
fromCallable calls a checked exception where as fromSupplier doesnot throws checked exception. So if you substitute supplier in place of callable you should also write try catch block to handle exception

We use property binding to pass value from component to form element in html and event binding to pass value from html to angular component.

  1. In the below code when we use [value] to get the value from component to html
  2. The same way we use (input) to get back value on event like change of name text
  3. {{employee.Name}} is used to display the value. You can remove [value] or (input) to check the behavior.
  4. Instead of this we can use ngModule by importing FormsModule which takes care of both propery and data binding

empform.html

<form>
<input type="text" [value]="employee.Name" (input)="employee.Name=$event.target.value" />
{{employee.Name}}
</form>

EmployeeModel.ts

export class Employee {
  private _Name: String;

  constructor(name: String) {
    this._Name = name;
  }
}

EmployeeController.ts

export class AddemployeeComponent implements OnInit {
.
.
  public employee: Employee;

  constructor() {
    this.employee = new Employee('Mugilvannan');
  }
.
.
}

  1. For NgModeul to work name attribute(i.e. employeeName) is mandatory in form field.
    Otherwise the value wont be reflected on change
  2. [ngModel] is for value binding and (ngModelChange) is for event binding. Both can be grouped in to format called banana-in-a-box. [(ngModel)]
  3. For using ngModel, FormsModule should be added to app.module.ts
  4. So when to use expanded syntax of ngModel. There would be times where you want to change the text into uppercase or lowercase once it is entered into textbox or formfields. In suchcase we should call an event which does it. At that time you would use (ngModelChange) instead of [(ngModel)].

addemployee.component.html

  <form>
    <table style="border-collapse:collapse;" border="1" cellpadding="5">
      <tbody>
        <tr>
          <td>Name</td>
          <td><input type="text" name="employeeName" [ngModel]="employee.Name" (ngModelChange)="employee.Name=$event" /></td>
        </tr>
        <tr>
          <td colspan="2">
            <input type="submit" value="Add Employee" (click)="employee.Name='test'" />
          </td>
        </tr>
      </tbody>
    </table>
  </form>
  {{employee.Name}}

addemployee.component.html – Banana-in-a-Box format

.
.
<td><input type="text" name="employeeName" [(ngModel)]="employee.Name"/></td>
.
.