Introduction to Streams In NodeJS

Introduction to Streams In NodeJS

·

7 min read

Before we go into the specifics of what streams are, it's important to understand why they are significant.

Why are streams important?

Let's consider you are asked to render the content of a text file with a data size of ~400MB

const http = require("http");
const fs = require("fs");

const server = http.createServer((req, res) => {
  fs.readFile('big.txt', 'utf8', function (err, data) {
    res.end(data)
  });
});
server.listen(8000)

The traditional method above works fine but there is a caveat to this approach. Here's a gif that demonstrates what happens to process memory when you visit port. 8000 stream.gif

We can clearly see that this approach leads to a spike in the process memory consumption which can become CPU intensive as the file size grows larger. This happened because the whole data to be read was stored in a buffer (a temporary memory called typically RAM ) before it was displayed.
Using this same method, if the size of the data is larger than that of the buffer, we will run out of memory which can lead to an error. Technically, when working with large data, this approach is not recommended. So, what's the recommended approach? STREAMS

What are Streams

Streams are a design pattern that transforms big data operations into manageable chunks. It is one of Node Js fundamental concepts for handling i/o operations i.e reading/writing files, network communications, bidirectional and event-based communication e.t.c. The stream module provides an API for implementing the stream interface.

What distinguishes streams from the traditional method used earlier is that instead of the program reading the file into the memory all at once, it reads data in smaller chunks and processes their contents without storing it entirely in memory. Streams make it seamless when transferring a large amount of data.

Many streaming services like youtube or Apple music do not require you to load the entire audio or video at once into your RAM. Instead, The material (audio or video) is delivered to the browser/phone in a continuous stream of data chunks, enabling immediately users to start viewing and/or listening nearly instantly.

2-V3-01.jpg

Streams are instances of node js eventEmitter class. As a result, the Stream API emits events in response to certain actions. For instance, when there is no data to write, the Writable stream emits the finish event while the Readable stream emits the data event whenever a chunk of data is available to be returned. Other examples are close, end, error, pipe, pause, resume e.t.c

Types of Streams

There are four main types of streams in Node js

  • Readable
  • Writable
  • Duplex
  • Transform

In this article, we will focus mainly on readable and writable streams using nodeJs fs module.

Readable Streams

A readable stream acts as a source from which data can be read. The source can be a file stored in your computer memory disk, cloud, etc. Below, we will use the Readable stream approach as a solution to the first example above

 const http = require('http');
 const fs = require("fs");

 const server = http.createServer((req, res) => {

   const stream = fs.createReadStream(__dirname + '/big.txt');
    stream.setEncoding("utf-8");
    stream.on("data", (chunk) => {
      console.log('data received', chunk.length,  chunk);
     }).on("end", () => { console.log("No more data.");
     }).on("error", () => { console.log("An error occured"); } )
})

server.listen(8000)

In the snippet above, we are trying to serve the content of big.txt file to a client, by default the stream data is received as Buffer objects.

  • The setEncoding("utf-8") method converts the buffer objects to a string.
  • The data event handler will execute each time a chunk of data has been read.
  • The end event handler will execute once there is no more data to be read while.
  • The error event handler fires when an error occurs while reading the data.

stream2.gif Let's have a look at what transpired in the memory process:

Unlike the first approach, we did not buffer all the data in memory all at once but rather received it in smaller chunks of data sent one at a time. The memory usage grew by a little amount when compared to the former and remained within the same till no data to be read was left. This method is powerful because we can stream a large size of data e.g ~6GB without worrying much about our process memory.

Writable Streams

This type of stream is the inverse of a Readable stream. They act as the destination for data to be written to but not as the source of the data. This type of stream may be used to write data to a file, an HTTP response, or a terminal (process stderr and stdout). Here's a quick rundown of how Writable streams work:

const fs = require("fs");

const writeStream = fs.createWriteStream('./file.txt')
for (let i = 0; i <= 18000; i++) {
  writeStream.write("Nothing in life is to be feared. It is only to be understood.\n")
writeStream.on("finish", () => {
    console.log("Finished writing");
});

writeStream.on("error", (error) => {
    console.log(error.stack);
});
writeStream.on("close", (error) => {
    process.stdout.write("done") // prints done to the terminal
});
  );
}
  • To create a writable stream from the fs module, the createWriteStream method is called and the path of the file to write data into is passed as a parameter in the method. If the file path does not exist, a new file with the supplied path is created; otherwise, the new content passed in .write() overrides the old content in that file.
  • The finish event is emitted once the stream.end() function has been called.

Backpressuring in Streams

If we are requested to read the contents of a file /big.txt and convert the data to uppercase using our understanding of readable and writable streams, we may choose the method shown below:

const { createWriteStream, createReadStream } = require('fs');
const readStream = createReadStream('/big.txt');
readStream.setEncoding("utf-8");
const writeStream = createWriteStream(".format-text.txt");

readStream.on('data', (chunk) => {
   writeStream.write(chunk.toUpperCase());
};

readStream.on('error', (e) => {
   console.log("An error has occurred");
   console.log(e);
};

readStream.on('end', () => {
   writeStream.end();
};

Reading a vast amount of data and conducting a complex operation at the same time, In our situation, it is converting each letter to upper case, can cause a delay before it is written to a file (receiving end). This might lead to the loss of some part of our data due to the propensity for data from the incoming source to pile up. This data handling problem is called Backpressure.

Therefore, backpressure is simply a data accumulation behind a buffer during data transmission. Most times, this causes the node process to consume a lot of memory. The highWaterMark is referred to as the maximum amount of data that our buffer can handle at a given time. The size of the highWaterMark can be manually set, however, it is crucial to note that a big number specified for a highWaterMark consumes more RAM.

4-V2.jpg

To solve this, we will utilize the following flow control stream events:

  • pause: A readable stream event utilized to pause data flowing when there is backpressure.
  • drain: A writable stream event that is emitted when the buffer is available to receive data.
  • resume: A readable stream event causes writing data to the stream to continue, this is usually called when the drain event is emitted.
const { createWriteStream, createReadStream } = require('fs');
const readStream = createReadStream('/big.txt');
readStream.setEncoding("utf-8");
const writeStream = createWriteStream(".format-text.txt");

readStream.on('data', (chunk) => {
// this returns a boolean representing the state of the buffer
  const result =  writeStream.write(chunk.toUpperCase()); 
};

if(!result){
   // There is a backpressure
   console.log("There is a backpressure, pausing to drain the hose...")
   readableStream.pause()
}

writeStream.on("drain", ()=> {
   console.log("hose is drained")
   readableStream.resume()
};

Piping Streams

pipe() is an event method that exists on readable streams. It gets data from a readable stream and writes it to a writable stream source.pipe(destination). This method can be useful in solving backpressure problems. It is also possible for pipe stream to be chained. Let's look at a program that reads user inputs in a terminal and write the value from inputs to a file.

const { createWriteStream } = require('fs');

const readableStream = process.stdin();
const writableStream = createWriteStream('/file.txt');

readableStream.pipe(writableStream);
console.log("Program finished");

Conclusion

In this article, I've explained the basics of streams in Nodejs, the problem they solve, when and how to use them.

Thank you for taking the time to read this. If you found this post useful, please share it and give it some thumbs up. I'd love to read your comments as well.