- publisher does not produce data unless subscriber requests for it.
- publisher will produce only <= subscriber requested items. publisher can also produce 0 items!
- subscriber can cancel the subscription. producer should stop at that moment as subscriber is no longer interested in consuming the data
- producer can send the error signal

- PublisherImpl instance would call the subscribe method and pass instance of SubscriberImpl
- PublisherImpl subscribe method would inturn call the onSubscribe method using the instance of SubscriberImpl passed.
- SubscriberImpl would get the same subscription which it has been passed to publisherImpl subscribe method earlier
- PublisherImpl has following methods
- subscribe – takes subscriber as argument and creates new subscription and notify the subscriber by calling the onSubscribe method
public void subscribe(Subscriber subscriber) { var subscription = new SubscriptionImpl(subscriber); subscriber.onSubscribe(subscription); }
- subscribe – takes subscriber as argument and creates new subscription and notify the subscriber by calling the onSubscribe method
- SubscriberImpl has following methods
- onSubscribe – To get the same subscription passed to publisher subscribe method. This is inturn called from publisherImpl
- onNext – called from subscriptionImpl to pass the data during iteration
- onError – called from subscriptionImpl during error
- onComplete – called from when all data is completed
@Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; } @Override public void onNext(String emailId) { logger.info("Received {}", emailId); } @Override public void onError(Throwable throwable) { logger.info("---------------------------------------------"); logger.info("Received error {}", throwable.getMessage()); } @Override public void onComplete() { logger.info("Subscription ended"); } - SubscriptionImplhas following methods
- request – you can request date using subscriptionImpl instance by passing no of records
- cancel – cancel subscription
@Override public void request(long requestedItemCnt) { if(isCancelled){ return; } logger.info("Subscriber has requested {} items ", requestedItemCnt); if(requestedItemCnt >MAX_ITEMS){ this.subscriber.onError(new RuntimeException(" Items requested is more than Total Items Available")); this.isCancelled = true; return; } //Check if all items(MAX_ITEMS) were sent for(int idx=0;idx<requestedItemCnt && count<MAX_ITEMS; idx++){ count++; this.subscriber.onNext(this.faker.internet().emailAddress()); } //If all items were sent complete subscription if(count == MAX_ITEMS){ logger.info("No More data from Producer"); this.subscriber.onComplete(); isCancelled = true; } } @Override public void cancel() { logger.info("Cancelling Subscription... . . ."); isCancelled = true; }
pom.xml
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-http</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>${faker.version}</version>
</dependency>
PublisherImpl.java
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
public class PublisherImpl implements Publisher {
@Override
public void subscribe(Subscriber subscriber) {
var subscription = new SubscriptionImpl(subscriber);
subscriber.onSubscribe(subscription);
}
}
SubscriptionImpl.java
public class SubscriptionImpl implements Subscription {
private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class);
private final Subscriber<? super String> subscriber;
private boolean isCancelled = false;
private final Faker faker;
private final int MAX_ITEMS = 10;
private static int count = 0;
public SubscriptionImpl(Subscriber subscriber){
this.subscriber = subscriber;
this.faker = Faker.instance();
}
@Override
public void request(long requestedItemCnt) {
if(isCancelled){
return;
}
logger.info("Subscriber has requested {} items ", requestedItemCnt);
if(requestedItemCnt >MAX_ITEMS){
this.subscriber.onError(new RuntimeException(" Items requested is more than Total Items Available"));
this.isCancelled = true;
return;
}
//Check if all items(MAX_ITEMS) were sent
for(int idx=0;idx<requestedItemCnt && count<MAX_ITEMS; idx++){
count++;
this.subscriber.onNext(this.faker.internet().emailAddress());
}
//If all items were sent complete subscription
if(count == MAX_ITEMS){
logger.info("No More data from Producer");
this.subscriber.onComplete();
isCancelled = true;
}
}
@Override
public void cancel() {
logger.info("Cancelling Subscription... . . .");
isCancelled = true;
}
}
SubscriberImpl.java
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SubscriberImpl implements Subscriber<String> {
private static final Logger logger = LoggerFactory.getLogger(SubscriberImpl.class);
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(String emailId) {
logger.info("Received {}", emailId);
}
@Override
public void onError(Throwable throwable) {
logger.info("---------------------------------------------");
logger.info("Received error {}", throwable.getMessage());
}
@Override
public void onComplete() {
logger.info("Subscription ended");
}
public Subscription getSubscription() {
return subscription;
}
}
Main.java
import org.mugil.publisher.PublisherImpl;
import org.mugil.subscriber.SubscriberImpl;
import java.time.Duration;
public class Main {
public static void main(String[] args) throws InterruptedException {
getMessages();
}
public static void getMessages() throws InterruptedException {
var objPublisher = new PublisherImpl();
var objSubscriber = new SubscriberImpl();
objPublisher.subscribe(objSubscriber);
objSubscriber.getSubscription().request(1);
Thread.sleep(Duration.ofSeconds(2));
objSubscriber.getSubscription().request(2);
Thread.sleep(Duration.ofSeconds(2));
objSubscriber.getSubscription().request(3);
Thread.sleep(Duration.ofSeconds(2));
objSubscriber.getSubscription().cancel();
objSubscriber.getSubscription().request(1);
}
}
Output
18:00:57.240 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 1 items 18:00:57.534 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received jani.mante@gmail.com 18:00:59.537 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 2 items 18:00:59.541 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received sunny.quigley@yahoo.com 18:00:59.544 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received hang.gutkowski@yahoo.com 18:01:01.546 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 3 items 18:01:01.548 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received malik.thiel@hotmail.com 18:01:01.549 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received andre.purdy@gmail.com 18:01:01.550 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received kim.greenfelder@gmail.com 18:01:03.560 [main] INFO org.mugil.publisher.SubscriptionImpl -- Cancelling Subscription... . . .