{"id":5281,"date":"2024-10-15T12:31:27","date_gmt":"2024-10-15T12:31:27","guid":{"rendered":"https:\/\/codethataint.com\/blog\/?p=5281"},"modified":"2024-10-15T13:01:55","modified_gmt":"2024-10-15T13:01:55","slug":"simple-subscriber-publisher-using-reactive-streams","status":"publish","type":"post","link":"https:\/\/codethataint.com\/blog\/simple-subscriber-publisher-using-reactive-streams\/","title":{"rendered":"Simple Subscriber Publisher Using Reactive Streams"},"content":{"rendered":"<ol>\n<li>publisher does not produce data unless subscriber requests for it.<\/li>\n<li>publisher will produce only <= subscriber requested items. publisher can also produce 0 items!<\/li>\n<li>subscriber can cancel the subscription. producer should stop at that moment as subscriber is no longer interested in consuming the data<\/li>\n<li>producer can send the error signal<\/li>\n<\/ol>\n<p><img decoding=\"async\" src=\"https:\/\/codethataint.com\/blog\/wp-content\/uploads\/2024\/10\/simplesubpub.png\" alt=\"\" \/><\/p>\n<ol>\n<li><strong>PublisherImpl<\/strong> instance would call the subscribe method and pass instance of  <strong>SubscriberImpl<\/strong><\/li>\n<li><strong>PublisherImpl<\/strong> subscribe method would inturn call the onSubscribe method using the instance of  <strong>SubscriberImpl<\/strong> passed.<\/li>\n<li><strong>SubscriberImpl<\/strong> would get the same subscription which it has been passed to <strong>publisherImpl<\/strong> subscribe method earlier<\/li>\n<li><strong>PublisherImpl <\/strong>has following methods\n<ul>\n<li>subscribe &#8211; takes subscriber as argument and creates new subscription and notify the subscriber by calling the onSubscribe method\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\npublic void subscribe(Subscriber subscriber) {\r\n        var subscription = new SubscriptionImpl(subscriber);\r\n        subscriber.onSubscribe(subscription);\r\n}\r\n<\/pre>\n<\/li>\n<\/ul>\n<li><strong>SubscriberImpl <\/strong>has following methods\n<ul>\n<li>onSubscribe &#8211; To get the same subscription passed to publisher subscribe method. This is inturn called from publisherImpl<\/li>\n<li>onNext &#8211; called from subscriptionImpl to pass the data during iteration<\/li>\n<li>onError &#8211; called from subscriptionImpl during error<\/li>\n<li>onComplete &#8211; called from when all data is completed<\/li>\n<\/ul>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\n@Override\r\n    public void onSubscribe(Subscription subscription) {\r\n        this.subscription = subscription;\r\n    }\r\n\r\n    @Override\r\n    public void onNext(String emailId) {\r\n        logger.info(&quot;Received {}&quot;, emailId);\r\n    }\r\n\r\n    @Override\r\n    public void onError(Throwable throwable) {\r\n        logger.info(&quot;---------------------------------------------&quot;);\r\n        logger.info(&quot;Received error {}&quot;, throwable.getMessage());\r\n    }\r\n\r\n    @Override\r\n    public void onComplete() {\r\n        logger.info(&quot;Subscription ended&quot;);\r\n    }\r\n<\/pre>\n<\/li>\n<li><strong>SubscriptionImpl<\/strong>has following methods\n<ul>\n<li>request &#8211; you can request date using subscriptionImpl instance by passing no of records<\/li>\n<li>cancel &#8211; cancel subscription<\/li>\n<\/ul>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\n@Override\r\n    public void request(long requestedItemCnt) {\r\n        if(isCancelled){\r\n            return;\r\n        }\r\n\r\n        logger.info(&quot;Subscriber has requested {} items &quot;, requestedItemCnt);\r\n\r\n        if(requestedItemCnt &gt;MAX_ITEMS){\r\n            this.subscriber.onError(new RuntimeException(&quot; Items requested is more than Total Items Available&quot;));\r\n            this.isCancelled = true;\r\n            return;\r\n        }\r\n\r\n        \/\/Check if all items(MAX_ITEMS) were sent\r\n        for(int idx=0;idx&lt;requestedItemCnt &amp;&amp; count&lt;MAX_ITEMS; idx++){\r\n            count++;\r\n            this.subscriber.onNext(this.faker.internet().emailAddress());\r\n        }\r\n\r\n        \/\/If all items were sent complete subscription\r\n        if(count == MAX_ITEMS){\r\n            logger.info(&quot;No More data from Producer&quot;);\r\n            this.subscriber.onComplete();\r\n            isCancelled = true;\r\n        }\r\n    }\r\n\r\n    @Override\r\n    public void cancel() {\r\n        logger.info(&quot;Cancelling Subscription... . . .&quot;);\r\n        isCancelled = true;\r\n    }\r\n<\/pre>\n<\/li>\n<\/ol>\n<p><strong>pom.xml<\/strong><\/p>\n<pre class=\"brush: xml; title: ; notranslate\" title=\"\">\r\n  &lt;dependency&gt;\r\n            &lt;groupId&gt;io.projectreactor&lt;\/groupId&gt;\r\n            &lt;artifactId&gt;reactor-core&lt;\/artifactId&gt;\r\n        &lt;\/dependency&gt;\r\n        &lt;dependency&gt;\r\n            &lt;groupId&gt;io.projectreactor.netty&lt;\/groupId&gt;\r\n            &lt;artifactId&gt;reactor-netty-core&lt;\/artifactId&gt;\r\n        &lt;\/dependency&gt;\r\n        &lt;dependency&gt;\r\n            &lt;groupId&gt;io.projectreactor.netty&lt;\/groupId&gt;\r\n            &lt;artifactId&gt;reactor-netty-http&lt;\/artifactId&gt;\r\n        &lt;\/dependency&gt;\r\n        &lt;dependency&gt;\r\n            &lt;groupId&gt;ch.qos.logback&lt;\/groupId&gt;\r\n            &lt;artifactId&gt;logback-classic&lt;\/artifactId&gt;\r\n            &lt;version&gt;${logback.version}&lt;\/version&gt;\r\n        &lt;\/dependency&gt;\r\n        &lt;dependency&gt;\r\n            &lt;groupId&gt;com.github.javafaker&lt;\/groupId&gt;\r\n            &lt;artifactId&gt;javafaker&lt;\/artifactId&gt;\r\n            &lt;version&gt;${faker.version}&lt;\/version&gt;\r\n        &lt;\/dependency&gt;\r\n<\/pre>\n<p><strong>PublisherImpl.java<\/strong><\/p>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\nimport org.reactivestreams.Publisher;\r\nimport org.reactivestreams.Subscriber;\r\n\r\npublic class PublisherImpl implements Publisher {\r\n    @Override\r\n    public void subscribe(Subscriber subscriber) {\r\n        var subscription = new SubscriptionImpl(subscriber);\r\n        subscriber.onSubscribe(subscription);\r\n    }\r\n}\r\n<\/pre>\n<p><strong>SubscriptionImpl.java<\/strong><\/p>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\npublic class SubscriptionImpl implements Subscription {\r\n    private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class);\r\n    private final Subscriber&lt;? super String&gt; subscriber;\r\n    private boolean isCancelled = false;\r\n    private final Faker faker;\r\n\r\n    private final int MAX_ITEMS = 10;\r\n    private static int count = 0;\r\n\r\n\r\n    public SubscriptionImpl(Subscriber subscriber){\r\n        this.subscriber = subscriber;\r\n        this.faker = Faker.instance();\r\n    }\r\n\r\n    @Override\r\n    public void request(long requestedItemCnt) {\r\n        if(isCancelled){\r\n            return;\r\n        }\r\n\r\n        logger.info(&quot;Subscriber has requested {} items &quot;, requestedItemCnt);\r\n\r\n        if(requestedItemCnt &gt;MAX_ITEMS){\r\n            this.subscriber.onError(new RuntimeException(&quot; Items requested is more than Total Items Available&quot;));\r\n            this.isCancelled = true;\r\n            return;\r\n        }\r\n\r\n        \/\/Check if all items(MAX_ITEMS) were sent\r\n        for(int idx=0;idx&lt;requestedItemCnt &amp;&amp; count&lt;MAX_ITEMS; idx++){\r\n            count++;\r\n            this.subscriber.onNext(this.faker.internet().emailAddress());\r\n        }\r\n\r\n\r\n        \/\/If all items were sent complete subscription\r\n        if(count == MAX_ITEMS){\r\n            logger.info(&quot;No More data from Producer&quot;);\r\n            this.subscriber.onComplete();\r\n            isCancelled = true;\r\n        }\r\n    }\r\n\r\n    @Override\r\n    public void cancel() {\r\n        logger.info(&quot;Cancelling Subscription... . . .&quot;);\r\n        isCancelled = true;\r\n    }\r\n}\r\n<\/pre>\n<p><strong>SubscriberImpl.java<\/strong><\/p>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\nimport org.reactivestreams.Subscriber;\r\nimport org.reactivestreams.Subscription;\r\nimport org.slf4j.Logger;\r\nimport org.slf4j.LoggerFactory;\r\n\r\npublic class SubscriberImpl implements Subscriber&lt;String&gt; {\r\n    private static final Logger logger = LoggerFactory.getLogger(SubscriberImpl.class);\r\n    private Subscription subscription;\r\n\r\n    @Override\r\n    public void onSubscribe(Subscription subscription) {\r\n        this.subscription = subscription;\r\n    }\r\n\r\n    @Override\r\n    public void onNext(String emailId) {\r\n        logger.info(&quot;Received {}&quot;, emailId);\r\n    }\r\n\r\n    @Override\r\n    public void onError(Throwable throwable) {\r\n        logger.info(&quot;---------------------------------------------&quot;);\r\n        logger.info(&quot;Received error {}&quot;, throwable.getMessage());\r\n    }\r\n\r\n    @Override\r\n    public void onComplete() {\r\n        logger.info(&quot;Subscription ended&quot;);\r\n    }\r\n\r\n    public Subscription getSubscription() {\r\n        return subscription;\r\n    }\r\n}\r\n<\/pre>\n<p><strong>Main.java<\/strong><\/p>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\nimport org.mugil.publisher.PublisherImpl;\r\nimport org.mugil.subscriber.SubscriberImpl;\r\n\r\nimport java.time.Duration;\r\n\r\npublic class Main {\r\n    public static void main(String&#x5B;] args) throws InterruptedException {\r\n        getMessages();\r\n    }\r\n\r\n    public static void getMessages() throws InterruptedException {\r\n        var objPublisher = new PublisherImpl();\r\n        var objSubscriber = new SubscriberImpl();\r\n\r\n        objPublisher.subscribe(objSubscriber);\r\n        objSubscriber.getSubscription().request(1);\r\n        Thread.sleep(Duration.ofSeconds(2));\r\n\r\n        objSubscriber.getSubscription().request(2);\r\n        Thread.sleep(Duration.ofSeconds(2));\r\n\r\n        objSubscriber.getSubscription().request(3);\r\n        Thread.sleep(Duration.ofSeconds(2));\r\n\r\n        objSubscriber.getSubscription().cancel();\r\n\r\n        objSubscriber.getSubscription().request(1);\r\n    }\r\n}\r\n<\/pre>\n<p><strong>Output<\/strong><\/p>\n<pre>\r\n18:00:57.240 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 1 items \r\n18:00:57.534 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received jani.mante@gmail.com\r\n18:00:59.537 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 2 items \r\n18:00:59.541 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received sunny.quigley@yahoo.com\r\n18:00:59.544 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received hang.gutkowski@yahoo.com\r\n18:01:01.546 [main] INFO org.mugil.publisher.SubscriptionImpl -- Subscriber has requested 3 items \r\n18:01:01.548 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received malik.thiel@hotmail.com\r\n18:01:01.549 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received andre.purdy@gmail.com\r\n18:01:01.550 [main] INFO org.mugil.subscriber.SubscriberImpl -- Received kim.greenfelder@gmail.com\r\n18:01:03.560 [main] INFO org.mugil.publisher.SubscriptionImpl -- Cancelling Subscription... . . .\r\n<\/pre>\n","protected":false},"excerpt":{"rendered":"<p>publisher does not produce data unless subscriber requests for it. publisher will produce only<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[369],"tags":[],"class_list":["post-5281","post","type-post","status-publish","format-standard","hentry","category-reactive-programming"],"_links":{"self":[{"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/posts\/5281","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/comments?post=5281"}],"version-history":[{"count":6,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/posts\/5281\/revisions"}],"predecessor-version":[{"id":5290,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/posts\/5281\/revisions\/5290"}],"wp:attachment":[{"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/media?parent=5281"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/categories?post=5281"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/codethataint.com\/blog\/wp-json\/wp\/v2\/tags?post=5281"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}