Continuously flowing chain of replies from a series of queries using RxJS

I am exploring the world of RxJS and seeking guidance from experienced individuals.

My goal is to establish a synchronized flow of responses, along with their corresponding requests, from a stream of payload data.

The desired approach involves sending each request sequentially, with every subsequent request awaiting the response from its predecessor.

Although my initial attempt (shown in this jsbin) resulted in simultaneous transmission of all requests:

 // JavaScript code snippet

// Define variables for request and response streams
var requestStream, responseStream;

// Populate the request stream with some sample data
requestStream = Rx.Observable.from(['a','b','c','d','e']);

// Use flatMap to handle the asynchronous nature of sendRequest function
responseStream = requestStream.flatMap(
  sendRequest,
  (val, response) => { return {val, response}; }
);

// Subscribe to the response stream to get the desired output
responseStream.subscribe(
  item => {
    console.log(item);
  },
  err => {
    console.err(err);
  },
  () => {
    console.log('Done');
  }
);

// Function to handle request processing asynchronously
function sendRequest(val) {
  return new Promise((resolve, reject) => {
    setTimeout(() => { resolve('result for ' + val); }, 1000);
  });
};

An alternative method that partially addresses the issue can be found in this jsbin, although it does not utilize a stream for request data:

 // JavaScript code snippet

// Define variables for data and response stream
var data, responseStream;
data = ['a', 'b', 'c', 'd', 'e'];

// Create a custom observable to process the requests one by one
responseStream = Rx.Observable.create(observer => {
  var sendNext = function() {
    var val = data.shift();
    if (!val) {
      observer.onCompleted();
      return;
    }
    sendRequest(val).then(response => {
      observer.onNext({val, response});
      sendNext();
    });
  };
  sendNext();
});

// Subscribe to the response stream to observe the output
responseStream.subscribe(
  item => {
    console.log(item);
  },
  err => {
    console.err(err);
  },
  () => {
    console.log('Done');
  }
);

// Function to simulate request handling with random response times
function sendRequest(val) {
  return new Promise((resolve, reject) => {
    setTimeout(() => { resolve('response for ' + val); }, Math.random() * 2500 + 500);
  });
};

Your insights are greatly appreciated!

EDIT:

To clarify my objective, I aim to achieve the following sequence: "Send A, receive response for A, send B, receive response for B, send C, and so on..." Utilizing concatMap and defer as suggested by user3743222 seems to accomplish this task (check out this jsbin):

 // JavaScript code snippet

// Implementing concatMap and defer for sequential processing
responseStream = requestStream.concatMap(
  (val) => {
    return Rx.Observable.defer(() => {
      return sendRequest(val);
    });
  },
  (val, response) => { return {val, response}; }
);

Answer №1

To achieve the desired behavior, consider replacing flatMap with concatMap in your initial code snippet and observe if it produces the result you are seeking.

responseStream = requestStream.concatMap(//By substituting `flatMap`
  sendRequest,
  (val, response)=>{ return {val, response}; }
);

concatMap shares a similar signature with flatMap, but the key distinction lies in how it waits for the current observable to finish before moving on to the next one. This means:

  • A value from requestStream will be fed into the concatMap operator.
  • The concatMap operator will create a new sendRequest observable, passing any resulting values (likely a tuple (val, response)) through the selector function and emitting the resultant object downstream.
  • Upon completion of the current sendRequest, the next value from requestStream is processed.
  • Essentially, requests are handled sequentially, one after the other.

Alternatively, if delaying the execution of sendRequest is preferred, consider using defer.

responseStream = requestStream.concatMap(//Swapping out `flatMap`
  function(x){return Rx.Observable.defer(function(){return sendRequest(x);})},
  (val, response)=>{ return {val, response}; }
);

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

The Express.js server seems to be having trouble rendering a static JavaScript file

Currently, I am in the process of constructing a website and have implemented an express.js server to collect data submitted through a form. Prior to configuring the server, I had already developed the site using static js and css files. Once the connectio ...

Obtain the coordinates of the pixel in an image on the canvas when a mouse

I am currently working on a project that involves using canvas. I have set a picture as the background of the canvas and would like to be able to get the original pixel point when clicking in the image area. In order to achieve this, I need to convert canv ...

Exchange one HTML element with a different HTML element

I've been attempting to change an HTML tag using PHP or jQuery. The current default tag is: <li class="dropdown"> <a href="index.html" class="dropdown-toggle"> Home</a></li> My desired replacement for the above HTML tag is: ...

Navigate to the editing page with Thymeleaf in the spring framework, where the model attribute is passed

My goal is to redirect the request to the edit page if the server response status is failed. The updated code below provides more clarity with changed variable names and IDs for security reasons. Controller: @Controller @RequestMapping("abc") public clas ...

How can we access a value within a deeply nested JSON object in Node.js when the key values in between are not

In the nested configuration object provided below, I am seeking to retrieve the value associated with key1, which in this case is "value1". It's important to note that while key1 remains static, the values for randomGeneratedNumber and randomGenerated ...

Learn how to use the Firebase Adapter for Next Auth to easily sign in using your email and password

I am currently using next-auth along with a Firebase adapter for authentication, but I am uncertain about the correct way to sign in users. I do not want to utilize my Google account for signing in; instead, I have user accounts within a Firebase project a ...

Access a PHP file using XMLHttpRequest to run additional JavaScript code

My main page, referred to as Main.php, contains a button that triggers the display of results from Results.php within a div (divResults) on Main.php. The HTML content "These Are The Results" returned by Results.php is successfully displayed in the divResu ...

How to implement a self-invoking function in React JS like you would in regular JavaScript?

Is it possible to invoke the function good without triggering it from an event? I want it to run as soon as the page loads, similar to a self-invoking JavaScript function. Check out this example import React from 'react'; class App extends Reac ...

Discover the process of loading one controller from another controller in Angular JS

Is it possible to load an entire controller1 from a different controller2, not just a function? ...

Saving iFrame as Image using Codemirror and html2canvas

Here are a few experiments I conducted with html2canvas: Fiddle 1 (Using html2canvas): Fiddle 2 (Using html2canvas without Codemirror): Fiddle 3 (Using html2canvas with Codemirror): Fiddle 4 (Using html2canvas with Codemirror): I recently wante ...

typescript defining callback parameter type based on callback arguments

function funcOneCustom<T extends boolean = false>(isTrue: T) { type RETURN = T extends true ? string : number; return (isTrue ? "Nice" : 20) as RETURN; } function funcCbCustom<T>(cb: (isTrue: boolean) => T) { const getFirst = () => ...

Understanding the Importance and Benefits of Using the Classnames Utility in React Components

Can you break down for me the purpose of utilizing the Classnames utility in React code? I've reviewed the Classnames documentation, but I'm still struggling to comprehend why it is used in code like this: import classnames from 'classnames ...

Safeguarding intellectual property rights

I have some legally protected data in my database and I've noticed that Google Books has a system in place to prevent copying and printing of content. For example, if you try to print a book from this link, it won't appear: How can I protect my ...

Combine the values in the rows over a period of time

I have a set of three times in the format of Minute:Seconds:Milliseconds that I need to add together to get the total time. For example, let's say I have: 0:31.110 + 0:50.490 + 0:32.797, which equals 1:54.397. So how can I achieve this using JavaScr ...

Connecting the mat-progress bar to a specific project ID in a mat-table

In my Job Execution screen, there is a list of Jobs along with their status displayed. I am looking to implement an Indeterminate mat-progress bar that will be visible when a Job is executing, and it should disappear once the job status changes to stop or ...

What is the best approach to synchronize checkboxes with boolean values in my data model?

I've spent hours searching through similar questions, but haven't found a solution that perfectly matches my issue. What I need is to have a checkbox automatically checked based on a true/false value in my data using data binding. While I can suc ...

Using the foreach Loop in Javascript and AngularJs

Having trouble with a foreach loop because you're not sure of the column name to access specific data? Here's a solution to display all columns along with their corresponding data: angular.forEach(data, function(value, key) { console.log( &a ...

Mapping prop passed to client component in NEXT 13: A step-by-step guide

Hello, I'm currently navigating through the Next 13 APP directory and have encountered a scenario where everything functions smoothly when I integrate the server component as shown below: const Tasks = async () => { const { tasks } = await getAll ...

Receive JSON data with camel-case in a Web API 2.0 using a model in pascal-case style

My attempt to execute a PUT call on my Web API involves configuring the WebApiConfig.cs file to send data back to my Web project in camel case format. config.Formatters.JsonFormatter.SerializerSettings.ContractResolver = new CamelCasePropertyNamesCont ...

Is the npm mqtt module continuously running but not performing any tasks?

I'm relatively new to the world of JS, node.js, and npm. I am attempting to incorporate a mqtt broker into a project for one of my classes. To gain a better understanding of how it functions, I installed the mqtt module from npm. However, when I tried ...