Object streams are great, but they're currently untyped, which means that you can pipe nonsensical streams together. Boo!
Goal (ish)
class FooReadable extends Readable<Foo> {
...
}
class FooWritable extends Writable<Foo> {
...
}
class BarWritable extends Writable<Bar> {
...
}
const fooReadable = new FooReadable();
const fooWritable = new FooWritable();
const barWritable = new BarWritable();
fooReadable.pipe(fooWritable); // Okay
fooReadable.pipe(barWritable); // Error!
I've figured out how to make this work using using pre-ES6 style constructors, but I really want to write classes that extend some abstract typed class (like above).
The solution should really be native streams under the hood -- no re-implementing all of the classes from scratch.
Here's my current solution, using Overwrite
to replace the the bits of the existing types that I don't want plus a factory method to fudge the types a bit.
import { Writable, WritableOptions, } from "stream";
export function createWritable<T>(
opts: TypedWritableOptions<T>): TypedWritable<T> {
return new Writable(opts) as any;
}
export type TypedWritable<T> = Overwrite<Writable, WritableReplacement<T>>;
export type TypedWritableOptions<T> =
Overwrite<WritableOptions, WritableOptionsReplacement<T>>
// Given types S and D, returns the equivalent of `S & D`, but if any
// properties are shared between S and D, the D versions completely
// replace those in S.
type Overwrite<S, D> = {
[P in Exclude<keyof S, keyof D>]: S[P]
} & D;
interface WritableReplacement<T> {
_write(
chunk: T,
encoding: string,
callback: (error?: Error | null) => void,
): void;
_writev?(
chunks: Array<{ chunk: T, encoding: string }>,
callback: (error?: Error | null) => void,
): void;
}
interface WritableOptionsReplacement<T> {
write?(
chunk: T,
encoding: string,
callback: (error?: Error | null) => void,
): void;
writev?(
chunks: Array<{ chunk: T, encoding: string }>,
callback: (error?: Error | null) => void,
): void;
}
So, here's the better way of doing this.
First, define (somewhere in your codebase) a JS file that simply re-exports the appropriate stream class:
// Readable.js
export { Readable } from 'stream';
Then create an accompanying .d.ts
file and write whatever type definition you'd like. Here's mine, copied mostly-faithfully from @types/node/index.d.ts
.
// Readable.d.ts
import { BasicCallback } from './core';
export interface Readable<T> extends ReadStream<T> {}
export class Readable<T> {
constructor(opts?: ReadableOptions<Readable<T>>);
_read?(size: number): void;
_destroy?(error: Error | null, callback: BasicCallback): void;
}
export interface ReadStream<T> {
readable: boolean;
readonly readableHighWaterMark: number;
readonly readableLength: number;
read(size?: number): T;
setEncoding(encoding: string): this;
pause(): this;
resume(): this;
isPaused(): boolean;
unpipe<T extends NodeJS.WritableStream>(destination?: T): this;
unshift(chunk: T): void;
wrap(oldStream: NodeJS.ReadableStream): this;
push(chunk: T | null, encoding?: string): boolean;
destroy(error?: Error): void;
/**
* Event emitter
* The defined events on documents including:
* 1. close
* 2. data
* 3. end
* 4. readable
* 5. error
*/
addListener(event: 'close', listener: () => void): this;
addListener(event: 'data', listener: (chunk: T) => void): this;
addListener(event: 'end', listener: () => void): this;
addListener(event: 'readable', listener: () => void): this;
addListener(event: 'error', listener: (err: Error) => void): this;
addListener(event: string | symbol, listener: (...args: any[]) => void): this;
emit(event: 'close'): boolean;
emit(event: 'data', chunk: T): boolean;
emit(event: 'end'): boolean;
emit(event: 'readable'): boolean;
emit(event: 'error', err: Error): boolean;
emit(event: string | symbol, ...args: any[]): boolean;
on(event: 'close', listener: () => void): this;
on(event: 'data', listener: (chunk: T) => void): this;
on(event: 'end', listener: () => void): this;
on(event: 'readable', listener: () => void): this;
on(event: 'error', listener: (err: Error) => void): this;
on(event: string | symbol, listener: (...args: any[]) => void): this;
once(event: 'close', listener: () => void): this;
once(event: 'data', listener: (chunk: T) => void): this;
once(event: 'end', listener: () => void): this;
once(event: 'readable', listener: () => void): this;
once(event: 'error', listener: (err: Error) => void): this;
once(event: string | symbol, listener: (...args: any[]) => void): this;
prependListener(event: 'close', listener: () => void): this;
prependListener(event: 'data', listener: (chunk: T) => void): this;
prependListener(event: 'end', listener: () => void): this;
prependListener(event: 'readable', listener: () => void): this;
prependListener(event: 'error', listener: (err: Error) => void): this;
prependListener(event: string | symbol, listener: (...args: any[]) => void): this;
prependOnceListener(event: 'close', listener: () => void): this;
prependOnceListener(event: 'data', listener: (chunk: T) => void): this;
prependOnceListener(event: 'end', listener: () => void): this;
prependOnceListener(event: 'readable', listener: () => void): this;
prependOnceListener(event: 'error', listener: (err: Error) => void): this;
prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this;
removeListener(event: 'close', listener: () => void): this;
removeListener(event: 'data', listener: (chunk: T) => void): this;
removeListener(event: 'end', listener: () => void): this;
removeListener(event: 'readable', listener: () => void): this;
removeListener(event: 'error', listener: (err: Error) => void): this;
removeListener(event: string | symbol, listener: (...args: any[]) => void): this;
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
}
export interface ReadableOptions<This> {
highWaterMark?: number;
encoding?: string;
objectMode?: boolean;
read?(this: This, size: number): void;
destroy?(
this: This,
error: Error | null,
callback: BasicCallback,
): void;
}
I am facing that in a different way by extending original clases (Readable, Writable and Transform) like:
import { Stream } from "stream";
export class GenericTransform<K, T> extends Stream.Transform {
private processChunk: (chunk: K, enc: string) => Promise<T>;
constructor(processChunk: (chunk: K, enc: string) => Promise<T>) {
super({ objectMode: true });
this.processChunk = processChunk;
}
// eslint-disable-next-line no-underscore-dangle
public async _transform(chunk: K, enc: string, cb: (error?: Error | null) => void): Promise<void> {
try {
this.push(await this.processChunk(chunk, enc));
cb();
} catch (err) {
cb(err);
}
}
}
export const createTransform = <K, T>(transform: (chunk: K, enc: string) => Promise<T>): GenericTransform<K, T> => new GenericTransform(transform);
The way to use it is like follows:
inputStream.pipe<GenericTransform<InputType, OutputType>>(
createTransform<InputType, OutputType>(
async (chunks: InputType): Promise<OutputType> => {
await doWhatEver(chunks);
return chunks;
}
)
).outputStream
The same way input stream and output stream could be created with similar generic classes to add typing to them
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With