- RAM is Secondary Memory and Processor is Main memory.
- Programs always exist in Hard Disk memory and when started, instance of program would be loaded as process in Secondary Memory
- Thread is part of process. Process is unit of resource and thread is unit of execution
- If there is Single core Processor, processor would take turns while executing task called context switching. Task is nothing but piece of code in execution. Task is subset of process.
- OS Scheduler governs the context switching. Thread states are stored during context switching so it could be resumed from where it is left. Each Thread has its own Thread Stack
- Java thread is nothing but wrapper over OS thread or kernel thread. The reason why Java thread is wrapper around OS thread is only with OS thread the scheduling for processing could be done
Monthly Archives: October 2024
Reactive Programming IO Models
Communication Models
- Synchronous + Blocking – Calling Customer Service and waiting for response online
- Asynchronous- Asking My Friend to Call Customer Service and I carry forward with my work
- Non-Blocking- Asking Call back from Customer Service and I carry forward with my work
- Asynchronous + Non Blocking – I am Calling customer Service and asking to call back My Friend and I carry forward with my work
request -> response
request -> streaming response (Stock Price in Stock Market App, Heart Beat for Health Check in Spring Boot Appp)
streaming request -> response (Using Google Docs and updating in drive in regular time intervals)
streaming request -> streaming response (Playing Game Online)
Reactive Stream Specification
Process Stream of Messages in a Non Blocking Asynchronous manner with Observer Design Pattern(Observe and React incase of change)
- Publisher: Emits a sequence of elements to its subscribers.
void subscribe(Subscriber<? super T> s)
- Subscriber: Consumes elements provided by a Publisher.
void onSubscribe(Subscription s) void onNext(T t) void onError(Throwable t) void onComplete()
- Subscription: Represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher.
void request(long n) void cancel()
- Processor: Represents a processing stage, which is both a Subscriber and a Publisher.Inherits both Subscriber and Publisher interfaces.
There would be one Publisher at Top, similar to root of tree and there would be 0 to N intermediate processors(subscriber + publisher) and there would be leaf Subscriber
Publisher, Subscriber and Subscription
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
public interface Subscription { public void request(long n); public void cancel(); }
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
- Publisher will have subscribe method through which we would pass the subscriber instance. The Subscription object would be returned by Publisher
- The Publisher hands over subscription object to Subscriber. Subscriber uses onSubscribe method to accept subscription.
- Subscriber could use subscription object using request method and could cancel subscription. Communication between Publisher and Subscriber happens using subscription object
- Subscriber can request N items using subscription object. The Publisher can iterate to N object using onNext method. Publisher only give 3 items if 3 items is requested by subscriber.
- If the Publisher has completed transferring all Items, then Publisher can call onComplete() method in Subscriber to notify the subscriber that its work is done
- Publisher calls onError() method to notify error.
Simple Subscriber Publisher Using Reactive Streams
- publisher does not produce data unless subscriber requests for it.
- publisher will produce only <= subscriber requested items. publisher can also produce 0 items!
- subscriber can cancel the subscription. producer should stop at that moment as subscriber is no longer interested in consuming the data
- producer can send the error signal
- PublisherImpl instance would call the subscribe method and pass instance of SubscriberImpl
- PublisherImpl subscribe method would inturn call the onSubscribe method using the instance of SubscriberImpl passed.
- SubscriberImpl would get the same subscription which it has been passed to publisherImpl subscribe method earlier
- PublisherImpl has following methods
- subscribe – takes subscriber as argument and creates new subscription and notify the subscriber by calling the onSubscribe method
public void subscribe(Subscriber subscriber) { var subscription = new SubscriptionImpl(subscriber); subscriber.onSubscribe(subscription); }
- subscribe – takes subscriber as argument and creates new subscription and notify the subscriber by calling the onSubscribe method
- SubscriberImpl has following methods
- onSubscribe – To get the same subscription passed to publisher subscribe method. This is inturn called from publisherImpl
- onNext – called from subscriptionImpl to pass the data during iteration
- onError – called from subscriptionImpl during error
- onComplete – called from when all data is completed
@Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; } @Override public void onNext(String emailId) { logger.info("Received {}", emailId); } @Override public void onError(Throwable throwable) { logger.info("---------------------------------------------"); logger.info("Received error {}", throwable.getMessage()); } @Override public void onComplete() { logger.info("Subscription ended"); }
- SubscriptionImplhas following methods
- request – you can request date using subscriptionImpl instance by passing no of records
- cancel – cancel subscription
@Override public void request(long requestedItemCnt) { if(isCancelled){ return; } logger.info("Subscriber has requested {} items ", requestedItemCnt); if(requestedItemCnt >MAX_ITEMS){ this.subscriber.onError(new RuntimeException(" Items requested is more than Total Items Available")); this.isCancelled = true; return; } //Check if all items(MAX_ITEMS) were sent for(int idx=0;idx<requestedItemCnt && count<MAX_ITEMS; idx++){ count++; this.subscriber.onNext(this.faker.internet().emailAddress()); } //If all items were sent complete subscription if(count == MAX_ITEMS){ logger.info("No More data from Producer"); this.subscriber.onComplete(); isCancelled = true; } } @Override public void cancel() { logger.info("Cancelling Subscription... . . ."); isCancelled = true; }
pom.xml
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty-http</artifactId> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>com.github.javafaker</groupId> <artifactId>javafaker</artifactId> <version>${faker.version}</version> </dependency>
PublisherImpl.java
import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; public class PublisherImpl implements Publisher { @Override public void subscribe(Subscriber subscriber) { var subscription = new SubscriptionImpl(subscriber); subscriber.onSubscribe(subscription); } }
SubscriptionImpl.java
public class SubscriptionImpl implements Subscription { private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class); private final Subscriber<? super String> subscriber; private boolean isCancelled = false; private final Faker faker; private final int MAX_ITEMS = 10; private static int count = 0; public SubscriptionImpl(Subscriber subscriber){ this.subscriber = subscriber; this.faker = Faker.instance(); } @Override public void request(long requestedItemCnt) { if(isCancelled){ return; } logger.info("Subscriber has requested {} items ", requestedItemCnt); if(requestedItemCnt >MAX_ITEMS){ this.subscriber.onError(new RuntimeException(" Items requested is more than Total Items Available")); this.isCancelled = true; return; } //Check if all items(MAX_ITEMS) were sent for(int idx=0;idx<requestedItemCnt && count<MAX_ITEMS; idx++){ count++; this.subscriber.onNext(this.faker.internet().emailAddress()); } //If all items were sent complete subscription if(count == MAX_ITEMS){ logger.info("No More data from Producer"); this.subscriber.onComplete(); isCancelled = true; } } @Override public void cancel() { logger.info("Cancelling Subscription... . . ."); isCancelled = true; } }
SubscriberImpl.java
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SubscriberImpl implements Subscriber<String> { private static final Logger logger = LoggerFactory.getLogger(SubscriberImpl.class); private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; } @Override public void onNext(String emailId) { logger.info("Received {}", emailId); } @Override public void onError(Throwable throwable) { logger.info("---------------------------------------------"); logger.info("Received error {}", throwable.getMessage()); } @Override public void onComplete() { logger.info("Subscription ended"); } public Subscription getSubscription() { return subscription; } }
Main.java
import org.mugil.publisher.PublisherImpl; import org.mugil.subscriber.SubscriberImpl; import java.time.Duration; public class Main { public static void main(String[] args) throws InterruptedException { getMessages(); } public static void getMessages() throws InterruptedException { var objPublisher = new PublisherImpl(); var objSubscriber = new SubscriberImpl(); objPublisher.subscribe(objSubscriber); objSubscriber.getSubscription().request(1); Thread.sleep(Duration.ofSeconds(2)); objSubscriber.getSubscription().request(2); Thread.sleep(Duration.ofSeconds(2)); objSubscriber.getSubscription().request(3); Thread.sleep(Duration.ofSeconds(2)); objSubscriber.getSubscription().cancel(); objSubscriber.getSubscription().request(1); } }
Output
18:00:57.240 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 1 items 18:00:57.534 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received jani.mante@gmail.com 18:00:59.537 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 2 items 18:00:59.541 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received sunny.quigley@yahoo.com 18:00:59.544 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received hang.gutkowski@yahoo.com 18:01:01.546 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 3 items 18:01:01.548 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received malik.thiel@hotmail.com 18:01:01.549 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received andre.purdy@gmail.com 18:01:01.550 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received kim.greenfelder@gmail.com 18:01:03.560 [main] INFO org.mugil.publisher.SubscriptionImpl -- Cancelling Subscription... . . .
Mono Subscriber
Mono is a type of instant Publisher that represents a single or empty value. This means it can emit only one value at most for the onNext() request and then terminates with the onComplete() signal. In case of failure, it only emits a single onError() signal.Most Mono implementations are expected to immediately call onComplete on their Subscriber after having called onNext.a combination of onNext and onError is explicitly forbidden.
Overridden Lambda implementation available in mono
subscribe(); //1 subscribe(Consumer<? super T> consumer); //2 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); //3 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); //4 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer); //5
//1
Publisher<String> rctMono = Mono.just("Hello React"); // Simple Mono Publisher using Just var subs = new SubscriberImpl(); rctMono.subscribe(subs); subs.getSubscription().request(10);
Output
08:21:46.241 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received Hello React 08:21:46.252 [main] INFO org.mugil.subscriber.SubscriberImpl -- Subscription ended
//5
Mono<Integer> rctMono2 = Mono.just(1) .map(i -> i/0); rctMono2.subscribe(i -> System.out.println(i), //Consumer err -> System.out.println("Error Msg -" + err.getMessage()), //onError, Not Mandatory () -> System.out.println("Completed"), //onComplete, Not Mandatory subscription -> subscription.request(1)); //onRequest, Not Mandatory
Output
Error Msg -/ by zero
Simple code which returns Mono based on switch case
public static Mono<String> getUserName(Integer num){ return switch (num){ case 1 -> Mono.just("How are you"); case 2 -> Mono.empty(); default -> Mono.error(new RuntimeException("Invalid Input")); }; }
Using mono.just would invoke the sumOfNums as it always fetches value from JVM memory rather than streaming
public static void main(String[] args) { List<Integer> lstNums = List.of(1,2,3); Mono.just(sumOfNums(lstNums)); //Mono.just takes the value from memory so wont be suitable for streaming data incase large data should be handled } public static int sumOfNums(List<Integer> lstNums){ System.out.println("Sum invoked"); return lstNums.stream().mapToInt(num -> num).sum(); }
Output
Sum invoked
Using mono.fromSupplier would invoke the sumOfNums during Terminal Operation rather than Intermediate Operation
List<Integer> lstNums = List.of(1,2,3); Mono.fromSupplier(() -> sumOfNums(lstNums)); // Intermediate Operation public static int sumOfNums(List<Integer> lstNums){ System.out.println("Sum invoked"); return lstNums.stream().mapToInt(num -> num).sum(); }
Output
Mono.fromSupplier vs Mono.fromCallable
fromCallable calls a checked exception where as fromSupplier doesnot throws checked exception. So if you substitute supplier in place of callable you should also write try catch block to handle exception