I have the code:
const readStream = fs.createReadStream(readFilename, {
    highWaterMark: 10 * 1024
});
const writeStream = fs.createWriteStream(writeFilename, {
    highWaterMark: 1 * 1024
});
readStream.pipe(writeStream);
As you can see, the buffer (highWaterMark) size is different for both. The read has a higher buffer, when read pipes to write it is indeed too much for the write buffer to handle. It reserves 9 * 1014 in memory and after it has handled the entire load it calls drain. This is fine.
However. When writing to write manually via writable.write, false is returned so you may alter the read stream to have a lower buffer (if that's what you wish).
My question is, since I'm piping directly, is there anyway to listen to the write event on the writable? The only thing that I can seem to listen to is the drain event after it has already taken in too much.
The general answer is "no, because there's no need to", but the less military one would be "kinda, but in another way and with consequences".
First there's a misunderstanding to what the drain event means in a piped stream: 
Writable buffer is depleted but that's only node.js internal buffer, not the actual pipe to the filesystem.pipe method actually creates a lot of listeners and pause/resume logic around both streams.Readable is listening on Writable#drain event to push some more data into the buffer.Second, as said - Writable does not implement any confirmations that the specific chunk has been written, that's simply because on string and Buffer chunks it would be very hard to tell when those are actually written (even impossible at some point in a simple case of gzip stream when a part of a chunk may be written to actual disk).
There is a way to get close enough though (get nearly precise confirmation per chunk):
const {PassThrough} = require("stream");
fs.createReadStream(readFilename, {
    highWaterMark: 10 * 1024
})
/* we pipe the readable to a buffer in a passthrough stream */
.pipe(new PassThrough({
    highWaterMark: 1024
}))
/* pipe returns the stream we piped to */
/* now we pipe again, but to a stream with no highWaterMark */
.pipe(
    new PassThrough({
        highWaterMark: 1
    })
    .on("data", () => { 
        /* here's your confirmation called just before this chunk is written and after the last one has started to be written */ 
    })
)
/* and there we push to the write stream */
.pipe(
    fs.createWriteStream(writeFilename, {
        highWaterMark: 1
    })
);
Sure, that will definitely come with a performance impact and I don't know how big but it will keep the reading side more or less efficient and writable will get the buffer it needs - but with some extra CPU and perhaps some micro latency for every chunk.
It's up to you to test.
See more on streams, especially PassThrough here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With