- publisher does not produce data unless subscriber requests for it.
- publisher will produce only <= subscriber requested items. publisher can also produce 0 items!
- subscriber can cancel the subscription. producer should stop at that moment as subscriber is no longer interested in consuming the data
- producer can send the error signal
- PublisherImpl instance would call the subscribe method and pass instance of SubscriberImpl
- PublisherImpl subscribe method would inturn call the onSubscribe method using the instance of SubscriberImpl passed.
- SubscriberImpl would get the same subscription which it has been passed to publisherImpl subscribe method earlier
- PublisherImpl has following methods
- subscribe – takes subscriber as argument and creates new subscription and notify the subscriber by calling the onSubscribe method
public void subscribe(Subscriber subscriber) { var subscription = new SubscriptionImpl(subscriber); subscriber.onSubscribe(subscription); }
- subscribe – takes subscriber as argument and creates new subscription and notify the subscriber by calling the onSubscribe method
- SubscriberImpl has following methods
- onSubscribe – To get the same subscription passed to publisher subscribe method. This is inturn called from publisherImpl
- onNext – called from subscriptionImpl to pass the data during iteration
- onError – called from subscriptionImpl during error
- onComplete – called from when all data is completed
@Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; } @Override public void onNext(String emailId) { logger.info("Received {}", emailId); } @Override public void onError(Throwable throwable) { logger.info("---------------------------------------------"); logger.info("Received error {}", throwable.getMessage()); } @Override public void onComplete() { logger.info("Subscription ended"); }
- SubscriptionImplhas following methods
- request – you can request date using subscriptionImpl instance by passing no of records
- cancel – cancel subscription
@Override public void request(long requestedItemCnt) { if(isCancelled){ return; } logger.info("Subscriber has requested {} items ", requestedItemCnt); if(requestedItemCnt >MAX_ITEMS){ this.subscriber.onError(new RuntimeException(" Items requested is more than Total Items Available")); this.isCancelled = true; return; } //Check if all items(MAX_ITEMS) were sent for(int idx=0;idx<requestedItemCnt && count<MAX_ITEMS; idx++){ count++; this.subscriber.onNext(this.faker.internet().emailAddress()); } //If all items were sent complete subscription if(count == MAX_ITEMS){ logger.info("No More data from Producer"); this.subscriber.onComplete(); isCancelled = true; } } @Override public void cancel() { logger.info("Cancelling Subscription... . . ."); isCancelled = true; }
pom.xml
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty-http</artifactId> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>com.github.javafaker</groupId> <artifactId>javafaker</artifactId> <version>${faker.version}</version> </dependency>
PublisherImpl.java
import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; public class PublisherImpl implements Publisher { @Override public void subscribe(Subscriber subscriber) { var subscription = new SubscriptionImpl(subscriber); subscriber.onSubscribe(subscription); } }
SubscriptionImpl.java
public class SubscriptionImpl implements Subscription { private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class); private final Subscriber<? super String> subscriber; private boolean isCancelled = false; private final Faker faker; private final int MAX_ITEMS = 10; private static int count = 0; public SubscriptionImpl(Subscriber subscriber){ this.subscriber = subscriber; this.faker = Faker.instance(); } @Override public void request(long requestedItemCnt) { if(isCancelled){ return; } logger.info("Subscriber has requested {} items ", requestedItemCnt); if(requestedItemCnt >MAX_ITEMS){ this.subscriber.onError(new RuntimeException(" Items requested is more than Total Items Available")); this.isCancelled = true; return; } //Check if all items(MAX_ITEMS) were sent for(int idx=0;idx<requestedItemCnt && count<MAX_ITEMS; idx++){ count++; this.subscriber.onNext(this.faker.internet().emailAddress()); } //If all items were sent complete subscription if(count == MAX_ITEMS){ logger.info("No More data from Producer"); this.subscriber.onComplete(); isCancelled = true; } } @Override public void cancel() { logger.info("Cancelling Subscription... . . ."); isCancelled = true; } }
SubscriberImpl.java
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SubscriberImpl implements Subscriber<String> { private static final Logger logger = LoggerFactory.getLogger(SubscriberImpl.class); private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; } @Override public void onNext(String emailId) { logger.info("Received {}", emailId); } @Override public void onError(Throwable throwable) { logger.info("---------------------------------------------"); logger.info("Received error {}", throwable.getMessage()); } @Override public void onComplete() { logger.info("Subscription ended"); } public Subscription getSubscription() { return subscription; } }
Main.java
import org.mugil.publisher.PublisherImpl; import org.mugil.subscriber.SubscriberImpl; import java.time.Duration; public class Main { public static void main(String[] args) throws InterruptedException { getMessages(); } public static void getMessages() throws InterruptedException { var objPublisher = new PublisherImpl(); var objSubscriber = new SubscriberImpl(); objPublisher.subscribe(objSubscriber); objSubscriber.getSubscription().request(1); Thread.sleep(Duration.ofSeconds(2)); objSubscriber.getSubscription().request(2); Thread.sleep(Duration.ofSeconds(2)); objSubscriber.getSubscription().request(3); Thread.sleep(Duration.ofSeconds(2)); objSubscriber.getSubscription().cancel(); objSubscriber.getSubscription().request(1); } }
Output
18:00:57.240 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 1 items 18:00:57.534 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received jani.mante@gmail.com 18:00:59.537 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 2 items 18:00:59.541 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received sunny.quigley@yahoo.com 18:00:59.544 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received hang.gutkowski@yahoo.com 18:01:01.546 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 3 items 18:01:01.548 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received malik.thiel@hotmail.com 18:01:01.549 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received andre.purdy@gmail.com 18:01:01.550 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received kim.greenfelder@gmail.com 18:01:03.560 [main] INFO org.mugil.publisher.SubscriptionImpl -- Cancelling Subscription... . . .