Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Typescript: is there a better way to get typed streams?

Object streams are great, but they're currently untyped, which means that you can pipe nonsensical streams together. Boo!

Goal (ish)

class FooReadable extends Readable<Foo> {
  ...
}
class FooWritable extends Writable<Foo> {
  ...
}
class BarWritable extends Writable<Bar> {
  ...
}

const fooReadable = new FooReadable();
const fooWritable = new FooWritable();
const barWritable = new BarWritable();

fooReadable.pipe(fooWritable); // Okay
fooReadable.pipe(barWritable); // Error!

I've figured out how to make this work using using pre-ES6 style constructors, but I really want to write classes that extend some abstract typed class (like above).

The solution should really be native streams under the hood -- no re-implementing all of the classes from scratch.

Here's my current solution, using Overwrite to replace the the bits of the existing types that I don't want plus a factory method to fudge the types a bit.

import { Writable, WritableOptions, } from "stream";

export function createWritable<T>(
    opts: TypedWritableOptions<T>): TypedWritable<T> {
  return new Writable(opts) as any;
}

export type TypedWritable<T> = Overwrite<Writable, WritableReplacement<T>>;
export type TypedWritableOptions<T> =
    Overwrite<WritableOptions, WritableOptionsReplacement<T>>


// Given types S and D, returns the equivalent of `S & D`, but if any
// properties are shared between S and D, the D versions completely
// replace those in S.
type Overwrite<S, D> = {
  [P in Exclude<keyof S, keyof D>]: S[P]
} & D;

interface WritableReplacement<T> {
  _write(
      chunk: T,
      encoding: string,
      callback: (error?: Error | null) => void,
      ): void;
  _writev?(
      chunks: Array<{ chunk: T, encoding: string }>,
      callback: (error?: Error | null) => void,
      ): void;
}

interface WritableOptionsReplacement<T> {
  write?(
      chunk: T,
      encoding: string,
      callback: (error?: Error | null) => void,
      ): void;
  writev?(
      chunks: Array<{ chunk: T, encoding: string }>,
      callback: (error?: Error | null) => void,
      ): void;
}
like image 343
Ender Avatar asked Oct 17 '25 22:10

Ender


2 Answers

So, here's the better way of doing this.

First, define (somewhere in your codebase) a JS file that simply re-exports the appropriate stream class:

// Readable.js
export { Readable } from 'stream';

Then create an accompanying .d.ts file and write whatever type definition you'd like. Here's mine, copied mostly-faithfully from @types/node/index.d.ts.

// Readable.d.ts
import { BasicCallback } from './core';

export interface Readable<T> extends ReadStream<T> {}
export class Readable<T> {
  constructor(opts?: ReadableOptions<Readable<T>>);

  _read?(size: number): void;
  _destroy?(error: Error | null, callback: BasicCallback): void;
}

export interface ReadStream<T> {
  readable: boolean;
  readonly readableHighWaterMark: number;
  readonly readableLength: number;
  read(size?: number): T;
  setEncoding(encoding: string): this;
  pause(): this;
  resume(): this;
  isPaused(): boolean;
  unpipe<T extends NodeJS.WritableStream>(destination?: T): this;
  unshift(chunk: T): void;
  wrap(oldStream: NodeJS.ReadableStream): this;
  push(chunk: T | null, encoding?: string): boolean;
  destroy(error?: Error): void;

  /**
   * Event emitter
   * The defined events on documents including:
   * 1. close
   * 2. data
   * 3. end
   * 4. readable
   * 5. error
   */
  addListener(event: 'close', listener: () => void): this;
  addListener(event: 'data', listener: (chunk: T) => void): this;
  addListener(event: 'end', listener: () => void): this;
  addListener(event: 'readable', listener: () => void): this;
  addListener(event: 'error', listener: (err: Error) => void): this;
  addListener(event: string | symbol, listener: (...args: any[]) => void): this;

  emit(event: 'close'): boolean;
  emit(event: 'data', chunk: T): boolean;
  emit(event: 'end'): boolean;
  emit(event: 'readable'): boolean;
  emit(event: 'error', err: Error): boolean;
  emit(event: string | symbol, ...args: any[]): boolean;

  on(event: 'close', listener: () => void): this;
  on(event: 'data', listener: (chunk: T) => void): this;
  on(event: 'end', listener: () => void): this;
  on(event: 'readable', listener: () => void): this;
  on(event: 'error', listener: (err: Error) => void): this;
  on(event: string | symbol, listener: (...args: any[]) => void): this;

  once(event: 'close', listener: () => void): this;
  once(event: 'data', listener: (chunk: T) => void): this;
  once(event: 'end', listener: () => void): this;
  once(event: 'readable', listener: () => void): this;
  once(event: 'error', listener: (err: Error) => void): this;
  once(event: string | symbol, listener: (...args: any[]) => void): this;

  prependListener(event: 'close', listener: () => void): this;
  prependListener(event: 'data', listener: (chunk: T) => void): this;
  prependListener(event: 'end', listener: () => void): this;
  prependListener(event: 'readable', listener: () => void): this;
  prependListener(event: 'error', listener: (err: Error) => void): this;
  prependListener(event: string | symbol, listener: (...args: any[]) => void): this;

  prependOnceListener(event: 'close', listener: () => void): this;
  prependOnceListener(event: 'data', listener: (chunk: T) => void): this;
  prependOnceListener(event: 'end', listener: () => void): this;
  prependOnceListener(event: 'readable', listener: () => void): this;
  prependOnceListener(event: 'error', listener: (err: Error) => void): this;
  prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this;

  removeListener(event: 'close', listener: () => void): this;
  removeListener(event: 'data', listener: (chunk: T) => void): this;
  removeListener(event: 'end', listener: () => void): this;
  removeListener(event: 'readable', listener: () => void): this;
  removeListener(event: 'error', listener: (err: Error) => void): this;
  removeListener(event: string | symbol, listener: (...args: any[]) => void): this;

  [Symbol.asyncIterator](): AsyncIterableIterator<T>;
}

export interface ReadableOptions<This> {
  highWaterMark?: number;
  encoding?: string;
  objectMode?: boolean;
  read?(this: This, size: number): void;
  destroy?(
      this: This,
      error: Error | null,
      callback: BasicCallback,
      ): void;
}
like image 144
Ender Avatar answered Oct 19 '25 11:10

Ender


I am facing that in a different way by extending original clases (Readable, Writable and Transform) like:

import { Stream } from "stream";

export class GenericTransform<K, T> extends Stream.Transform {
  private processChunk: (chunk: K, enc: string) => Promise<T>;

  constructor(processChunk: (chunk: K, enc: string) => Promise<T>) {
    super({ objectMode: true });
    this.processChunk = processChunk;
  }

  // eslint-disable-next-line no-underscore-dangle
  public async _transform(chunk: K, enc: string, cb: (error?: Error | null) => void): Promise<void> {
    try {
      this.push(await this.processChunk(chunk, enc));
      cb();
    } catch (err) {
      cb(err);
    }
  }
}

export const createTransform = <K, T>(transform: (chunk: K, enc: string) => Promise<T>): GenericTransform<K, T> => new GenericTransform(transform);

The way to use it is like follows:

  inputStream.pipe<GenericTransform<InputType, OutputType>>(
    createTransform<InputType, OutputType>(
      async (chunks: InputType): Promise<OutputType> => {
        await doWhatEver(chunks);
        return chunks;
      }
    )
  ).outputStream

The same way input stream and output stream could be created with similar generic classes to add typing to them

like image 35
Ivan Perea Fuentes Avatar answered Oct 19 '25 13:10

Ivan Perea Fuentes



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!