1. publisher does not produce data unless subscriber requests for it.
  2. publisher will produce only <= subscriber requested items. publisher can also produce 0 items!
  3. subscriber can cancel the subscription. producer should stop at that moment as subscriber is no longer interested in consuming the data
  4. producer can send the error signal

  1. PublisherImpl instance would call the subscribe method and pass instance of SubscriberImpl
  2. PublisherImpl subscribe method would inturn call the onSubscribe method using the instance of SubscriberImpl passed.
  3. SubscriberImpl would get the same subscription which it has been passed to publisherImpl subscribe method earlier
  4. PublisherImpl has following methods
    • subscribe – takes subscriber as argument and creates new subscription and notify the subscriber by calling the onSubscribe method
      public void subscribe(Subscriber subscriber) {
              var subscription = new SubscriptionImpl(subscriber);
              subscriber.onSubscribe(subscription);
      }
      
  5. SubscriberImpl has following methods
    • onSubscribe – To get the same subscription passed to publisher subscribe method. This is inturn called from publisherImpl
    • onNext – called from subscriptionImpl to pass the data during iteration
    • onError – called from subscriptionImpl during error
    • onComplete – called from when all data is completed
    @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
        }
    
        @Override
        public void onNext(String emailId) {
            logger.info("Received {}", emailId);
        }
    
        @Override
        public void onError(Throwable throwable) {
            logger.info("---------------------------------------------");
            logger.info("Received error {}", throwable.getMessage());
        }
    
        @Override
        public void onComplete() {
            logger.info("Subscription ended");
        }
    
  6. SubscriptionImplhas following methods
    • request – you can request date using subscriptionImpl instance by passing no of records
    • cancel – cancel subscription
    @Override
        public void request(long requestedItemCnt) {
            if(isCancelled){
                return;
            }
    
            logger.info("Subscriber has requested {} items ", requestedItemCnt);
    
            if(requestedItemCnt >MAX_ITEMS){
                this.subscriber.onError(new RuntimeException(" Items requested is more than Total Items Available"));
                this.isCancelled = true;
                return;
            }
    
            //Check if all items(MAX_ITEMS) were sent
            for(int idx=0;idx<requestedItemCnt && count<MAX_ITEMS; idx++){
                count++;
                this.subscriber.onNext(this.faker.internet().emailAddress());
            }
    
            //If all items were sent complete subscription
            if(count == MAX_ITEMS){
                logger.info("No More data from Producer");
                this.subscriber.onComplete();
                isCancelled = true;
            }
        }
    
        @Override
        public void cancel() {
            logger.info("Cancelling Subscription... . . .");
            isCancelled = true;
        }
    

pom.xml

  <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty-http</artifactId>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>com.github.javafaker</groupId>
            <artifactId>javafaker</artifactId>
            <version>${faker.version}</version>
        </dependency>

PublisherImpl.java

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class PublisherImpl implements Publisher {
    @Override
    public void subscribe(Subscriber subscriber) {
        var subscription = new SubscriptionImpl(subscriber);
        subscriber.onSubscribe(subscription);
    }
}

SubscriptionImpl.java

public class SubscriptionImpl implements Subscription {
    private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class);
    private final Subscriber<? super String> subscriber;
    private boolean isCancelled = false;
    private final Faker faker;

    private final int MAX_ITEMS = 10;
    private static int count = 0;


    public SubscriptionImpl(Subscriber subscriber){
        this.subscriber = subscriber;
        this.faker = Faker.instance();
    }

    @Override
    public void request(long requestedItemCnt) {
        if(isCancelled){
            return;
        }

        logger.info("Subscriber has requested {} items ", requestedItemCnt);

        if(requestedItemCnt >MAX_ITEMS){
            this.subscriber.onError(new RuntimeException(" Items requested is more than Total Items Available"));
            this.isCancelled = true;
            return;
        }

        //Check if all items(MAX_ITEMS) were sent
        for(int idx=0;idx<requestedItemCnt && count<MAX_ITEMS; idx++){
            count++;
            this.subscriber.onNext(this.faker.internet().emailAddress());
        }


        //If all items were sent complete subscription
        if(count == MAX_ITEMS){
            logger.info("No More data from Producer");
            this.subscriber.onComplete();
            isCancelled = true;
        }
    }

    @Override
    public void cancel() {
        logger.info("Cancelling Subscription... . . .");
        isCancelled = true;
    }
}

SubscriberImpl.java

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriberImpl implements Subscriber<String> {
    private static final Logger logger = LoggerFactory.getLogger(SubscriberImpl.class);
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
    }

    @Override
    public void onNext(String emailId) {
        logger.info("Received {}", emailId);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.info("---------------------------------------------");
        logger.info("Received error {}", throwable.getMessage());
    }

    @Override
    public void onComplete() {
        logger.info("Subscription ended");
    }

    public Subscription getSubscription() {
        return subscription;
    }
}

Main.java

import org.mugil.publisher.PublisherImpl;
import org.mugil.subscriber.SubscriberImpl;

import java.time.Duration;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        getMessages();
    }

    public static void getMessages() throws InterruptedException {
        var objPublisher = new PublisherImpl();
        var objSubscriber = new SubscriberImpl();

        objPublisher.subscribe(objSubscriber);
        objSubscriber.getSubscription().request(1);
        Thread.sleep(Duration.ofSeconds(2));

        objSubscriber.getSubscription().request(2);
        Thread.sleep(Duration.ofSeconds(2));

        objSubscriber.getSubscription().request(3);
        Thread.sleep(Duration.ofSeconds(2));

        objSubscriber.getSubscription().cancel();

        objSubscriber.getSubscription().request(1);
    }
}

Output

18:00:57.240 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 1 items 
18:00:57.534 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received jani.mante@gmail.com
18:00:59.537 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 2 items 
18:00:59.541 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received sunny.quigley@yahoo.com
18:00:59.544 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received hang.gutkowski@yahoo.com
18:01:01.546 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 3 items 
18:01:01.548 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received malik.thiel@hotmail.com
18:01:01.549 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received andre.purdy@gmail.com
18:01:01.550 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received kim.greenfelder@gmail.com
18:01:03.560 [main] INFO org.mugil.publisher.SubscriptionImpl -- Cancelling Subscription... . . .