Handle processing peaks in NestJS using Queues and monitor them— Ft. Redis and Bull

Queues can come in handy in smoothing out processing peaks for tasks which can take time to process or block the node.js event loop.

Challenge

Lets say we have an endpoint which processes some data, that takes more than a few seconds; it can be complex calculations, video processing, audio transcoding etc.. if multiple users make requests to that endpoint concurrently, then that can block the event loop and users will not receive a timely response, rather an ever loading page.

So one way of solving this is to maintain a queue, which can hold the requests coming in and process them asynchronously.

We would built this in NestJS framework, using Redis for maintaining queues via @nestjs/bull package and a third-party module to monitor the queues, so we can keep an eye on the status of our queues.

Pre-requisites

Setup

1- Create a new NestJS application, by using the CLI

nest new testqueues

# In this demo we have used yarn in the next step 
# where it asks for the package manager selection

2- Install dependencies

Run the below commands within the app folder i.e. testqueues

# Install nestjs bull package
yarn add @nestjs/bull bull 

# Install bull-board package which will be used for queues monitoring
yarn add @bull-board/api @bull-board/express @bull-board/nestjs 

Dockerize

We will be using docker to run the application, open the app directory in your favorite IDE, and then create a Dockerfile with the following contents:

FROM node:18-alpine

# Set working directory
WORKDIR /usr/src/app

# Install NestJS globally
RUN npm install -g @nestjs/cli

# Copy package.json and package-lock.json to working directory
COPY package*.json yarn.lock ./

# Install dependencies
RUN yarn install --frozen-lockfile

# Copy source code to working directory
COPY . .

# Expose the default port for the NestJS application
EXPOSE 3000

# Start the NestJS application in development mode
CMD ["npm", "run", "start:dev"]

Do note that this Dockerfile is suitable for development env only

Now create a docker-compose.yml file with the following contents:

version: '3.9'
services:
  app:
    build:
      context: ./
    ports:
      - '3000:3000'
    volumes:
      - ./:/usr/src/app
    command: npm run start:dev
  redis:
    container_name: redis_testqueues
    image: 'redis:latest'
    ports:
      - '6379:6379'

With the above compose file, we are creating two services; app for our nestjs application, and redis for running redis exposed @ 6379 port. If any of the ports are unavailable in your machine then change here, and later reflect accordingly in the app code too.

Run below command to start the application:

docker-compose up

Lets Code

Create a src/constants.ts file, with a constant to hold the name of our redis queue:

export const REDIS_QUEUE_NAME = 'qdataprocess';

Then import the required modules in the src/app.module.ts , add the following to the imports array (which would most probably be empty right now)

  imports: [
    BullModule.forRoot({
      redis: {
        host: 'redis_testqueues', // use the service name from docker-compose if its different
        port: 6379,
      },
    }),
    BullModule.registerQueue({
      name: REDIS_QUEUE_NAME,
    }),
    BullBoardModule.forRoot({
      route: '/queues',
      adapter: ExpressAdapter, // Or FastifyAdapter from `@bull-board/fastify`
    }),
    BullBoardModule.forFeature({
      name: REDIS_QUEUE_NAME,
      adapter: BullAdapter,
    }),
  ],

Add imports for the modules we have added:

import { ExpressAdapter } from '@bull-board/express';
import { BullBoardModule } from '@bull-board/nestjs';
import { BullModule } from '@nestjs/bull';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { REDIS_QUEUE_NAME } from './constants';

Now that the required modules are imported, lets update app.service.ts file, we will add a method to process jobs:

import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { REDIS_QUEUE_NAME } from './constants';
import { Queue } from 'bull';

@Injectable()
export class AppService {
  constructor(@InjectQueue(REDIS_QUEUE_NAME) private queue: Queue) {}

  getHello(): string {
    return 'Hello World!';
  }

  async processData() {
    return await this.queue.add(
      'process_data',
      { custom_id: Math.floor(Math.random() * 10000000) },
      { priority: 1 },
    );
  }
}

The method we have mainly added is processData() which subsequently uses queue.add , the first argument is a unique name for each job and the second one is data; here we are just sending some random numbers to showcase how the data can be transmitted and then received later in the consumer of this job (we will get to that in a bit)

Now update the app.controller.ts file, we will add a route /process which would then use our service method:

import { Controller, Get } from '@nestjs/common';
import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @Get()
  getHello(): string {
    return this.appService.getHello();
  }

  @Get('process')
  async processData() {
    return this.appService.processData();
  }
}

So now our app is capable of receiving jobs at the /process endpoint, we would add a consumer which basically handles the received jobs and process them.

Add process-data.consumer.ts file with the following contents:

import {
  Processor,
  Process,
  OnQueueActive,
  OnQueueCompleted,
} from '@nestjs/bull';
import { Job } from 'bull';
import { REDIS_QUEUE_NAME } from './constants';
import { Logger } from '@nestjs/common';

@Processor(REDIS_QUEUE_NAME)
export class ProcessDataConsumer {
  @Process('process_data')
  async processData() {
    // Perform the job
    // This is just a sample long running process
    // will take between 5 to 10 seconds to finish
    await new Promise((resolve, reject) => {
      try {
        setTimeout(
          () => {
            resolve('Data processed');
          },
          5000 + Math.floor(Math.random() * 5000),
        );
      } catch (error) {
        reject(error);
      }
    });

    return { done: true };
  }

  @OnQueueActive()
  onActive(job: Job<unknown>) {
    // Log that job is starting
    Logger.log(`Starting job ${job.id} : ${job.data['custom_id']}`);
  }

  @OnQueueCompleted()
  onCompleted(job: Job<unknown>) {
    // Log job completion status
    Logger.log(`Job ${job.id} has been finished`);
  }
}

Key things to note here:

  • @Processor(REDIS_QUEUE_NAME) tags our current class as the consumer for the redis queue we registered
  • process_data within @Process(‘process_data’) is the name of the job we defined in our service method
  • processData() is the main function which handles the functionality of processing our data, this can be anything like sending out an email, do some data operations, process video frames, generate thumbnails etc. For demo purposes we are just creating a promise which will resolve within 5 to 10 seconds

We have to now register this consumer, do so by editing app.module.ts file and adding ProcessDataConsumer to the providers array (ensure to import the consumer as well):

providers: [AppService, ProcessDataConsumer],

Finally you can test the application by visiting http://localhost:3000/process which will add a job, refresh the page several time to register multiple jobs, then watch the console logs generated to view the progress of the jobs

OR go to http://localhost:3000/queues to view the dashboard which will list all the details of the jobs in the queue.

What we have achieved

  • Created a NestJS based demo app for registering and consuming jobs based on Redis queues
  • Dockerized the application (with hot-reloading supported)
  • Added a dashboard to monitor all the jobs in our queues

Leave a comment

Your email address will not be published. Required fields are marked *