Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to asynchronously write to and read from the same file in node?

I have data coming in through a websocket. It sends binary data in 20ms chunks. I need to concatenate each of these chunks so that a backend process can read the data as a continuous stream as it comes in.

//Create the file and append binary as it comes in

    tmp.file({postfix: '.raw' },function (err, path, fd, cleanup) {
    if (err) throw err;
    newPath = path
        fs.appendFile(newPath, new Buffer(binary), (err) => {
            if (err) throw err;

        })
    })

//Read the file as it is written
    fs.createReadStream(newPath).pipe(recStream);

For now I just have a simple half second delay on createReadStream to make sure there is data in the file.

This certainly does not feel correct and is not working. What is the correct way to go about this?

like image 381
Phil Andrews Avatar asked Dec 12 '25 02:12

Phil Andrews


1 Answers

The best thing to do in this situation would be to tell the server you're receiving data from to pause until you're ready to process more (drain). Assuming that's not an option for you:

Start by writing incoming data to your destination stream. If write(chunk) returns false, this means the stream's internal buffer is full; it's time to start buffering subsequent data to disk. (The chunk you just wrote resulting in a false return value is buffered; do not write it to disk -- false does not mean the write failed, it's just a signal that the buffer has more data than highWaterMark.)

In a temporary folder, create a new file (A) write stream and write the next chunk(s) of incoming data to it. Do this until your destination stream emits a drain event.

When your destination drains:

  1. Swap out buffer files. Close the current buffer file A and create a new temporary file B to begin writing new incoming data to it.
  2. Open a read stream on the temporary file A and start piping data from it into your destination stream. You probably can't use the actual pipe() method since it will signal the end of data when you reach the end of the temp file, which is not what we want, since it is not the actual end of all incoming data. (Look at what pipe() does and implement that yourself, minus calling end().)
  3. When the temp file's stream A emits end, delete the file A. Then go back to step 1 and begin the process again with file B. (If no data was written to file B in the meantime, go back to unbuffered operation, writing incoming data directly to the destination stream.)

Once the server signals that it is done sending data and all data has been read out of your temporary files, write(null) into the destination stream to signal that there is no more data. All done!

By swapping between temporary buffer files and deleting them once their data is processed, you don't have to worry about reading data as it is written to a file. Plus, you don't have to buffer the entire incoming data stream on disk.

Of course, this does make the assumption that your storage medium is guaranteed to accept writes faster than you will receive data over the network. This is probably safe, but things will likely break down if this assumption is incorrect. Test this using production systems -- what is the peak incoming data rate and how quickly can you write to disk on your prod system?

like image 93
josh3736 Avatar answered Dec 13 '25 17:12

josh3736



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!