Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NestJS external event bus implementation with Redis

Tags:

cqrs

nestjs

I'm trying to integrate my nestjs application's cqrs setup with a external message service such as Redis. I've found a pull request and a comment on the nestJS github stating that I should be able to integrate my query/event/command bus with external services since version 7.0 of cqrs.

I've been trying to implement this, but I can't find much information from nestjs on the subject. The only thing I could find was an outdated configuration example and an open topic on github for creating tutorials on how to implement this. I managed to replace the default publisher and subscriper by going off the limited help I could find on github about this topic, but I don't really understand how I can use that to connect to the external service or if this is the best approach for this problem.

EventBus

import { RedisEventSubscriber } from '../busses/redisEventSubscriber';
import { RedisEventPublisher } from '../busses/redisEventPublisher';
import { OnModuleInit } from '@nestjs/common';
import { ModuleRef } from "@nestjs/core";
import { CommandBus, EventBus as NestJsEventBus } from "@nestjs/cqrs";

export class EventBus extends NestJsEventBus implements OnModuleInit {

constructor( commandBus: CommandBus, moduleRef: ModuleRef) {
  super(commandBus, moduleRef);
}

onModuleInit() {

  const subscriber = new RedisEventSubscriber();
  subscriber.bridgeEventsTo(this._subject$);
  this.publisher = new RedisEventPublisher();

  }
}

Publisher

export class RedisEventPublisher implements IEventPublisher {

publish<T extends IEvent = IEvent>(event: T) {
  console.log("Event published to Redis")
  }
}

Subscriber

export class RedisEventSubscriber implements IMessageSource {

  bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
    console.log('bridged event to thingy')
  }
}

If anyone who has setup nestjs with an external message system before could share their thoughts or share a resource on how to do this properly, that would be appreciated.

like image 386
Jordi Avatar asked Oct 18 '25 13:10

Jordi


1 Answers

So after a couple days I managed to find two approaches to connecting to an external eventbus. I found out that I don't really need an external command or query bus as these come in through API calls. So if you want to connect to an external eventbus with NestJS here are the two options I found:

  1. Via a custom publisher & subscriber
  2. Via the NestJS Microservice package

The two approaches mainly differ in the way they connect to the external eventbus and how they handle incoming messages. Depending on your needs one might suit you better than the other, but I went with the first option.

Custom publisher & subscriber

In my application I was already using manual publish calls to my eventbus by using the EventBus class from NestJS and calling .publish() for my events. I created a service that wrapped around the local NestJS eventbus together with the custom publisher and custom subscriber.

eventBusService.ts

export class EventBusService implements IEventBusService {
  
  constructor(
    private local: EventBus, // Injected from NestJS CQRS Module
    @Inject('eventPublisher') private publisher: IEventPublisher,
    @Inject('eventSubscriber') subscriber: IMessageSource) {
      subscriber.bridgeEventsTo(this.local.subject$);
   }
  
  publish(event: IEvent): void {
    this.publisher.publish(event);
  };
} 

The eventservice uses a custom subscriber to redirect any incoming events from the remote eventbus onto the local eventBus using .bridgeEventsTo(). The custom subscriber uses a redis NPM package's client to communicate with the eventbus.

subscriber.ts

export class RedisEventSubscriber implements IMessageSource {

  constructor(@Inject('redisClient') private client: RedisClient) {}

  bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
    this.client.subscribe('Foo');
    this.client.on("message", (channel: string, message: string) => {

      const { payload, header } = JSON.parse(message);
      const event = Events[header.name];

      subject.next(new event(data.event.payload));
    });
  }
};

This function also contains the logic that maps an incoming Redis event into a event. For this I created a dictionary that holds all my events in the app.module so that I could look up if the event knows how to handle the incoming event. It then calls subject.next() with a new event so that it is put on the internal NestJS eventbus.

publisher.ts

To update other systems from my own events, I created a publisher that sends data to Redis.

export class RedisEventPublisher implements IEventPublisher {

  constructor(@Inject('redisClient') private client: RedisClient) {}

  publish<T extends IEvent = IEvent>(event: T) {
    const name = event.constructor.name;
    const request = {
      header: {
        name
      },
      payload: {
        event
      }
    }
    this.client.publish('Foo', JSON.stringify(request));
  }
}

Just like the subscriber, this class uses the NPM package client to send events to the Redis eventBus.


Microservice setup

The microservice setup in some parts is quite similar to the custom eventservice approach. It uses the same publisher class, but the subscription setup is done differently. It uses the NestJS Microservice package to setup a microservice that listens to incoming messages and then calls the eventService to send an incoming event to the eventbus.

eventService.ts

export class EventBusService implements IEventBusService {
  
  constructor(
    private eventBus: EventBus,
    @Inject('eventPublisher') private eventPublisher: IEventPublisher,) {
   }
  
  public publish<T extends IEvent>(event: T): void {

    const data = {
      payload: event,
      eventName: event.constructor.name
    }
    
    this.eventPublisher.publish(data);
  };

  async handle(string: string) : Promise<void> {

    const data = JSON.parse(string);
    const event = Events[data.event.eventName];

    if (!event) {
      console.log(`Could not find corresponding event for 
      ${data.event.eventName}`);
    };

    await this.eventBus.publish(new event(data.event.payload));
  }
} 

NestJS has documentation on how to setup the hybrid service which can be found here. The microservice package provides you with a @EventPattern() decorator which you can use to create handlers for incoming eventbus messages, You just add them to a NestJS controller and inject the eventService.

controller.ts

@Controller()
export default class EventController {

  constructor(@Inject('eventBusService') private eventBusService: 
  IEventBusService) {}

  @EventPattern(inviteServiceTopic)
  handleInviteServiceEvents(data: string) {
    this.eventBusService.handle(data)
  }
}

Since I didn't feel like creating a hybrid application just to listen to incoming events, I decided to go with the 1st option. The code was nicely grouped together instead of having a random controller with @EventPattern() decorators.

This took quite some time to figure out, so I hope it helps someone out in the future. :)

like image 144
Jordi Avatar answered Oct 21 '25 16:10

Jordi