~ 9 min read
How to Process Scheduled Queue Jobs in Node.js with BullMQ and Redis on Heroku
Background job processing is a technique for running tasks that can take a long time to complete in a separate process from the main application server. This allows the main application server to continue to handle requests from users while the background jobs are running.
There are many reasons why you might want to use background job processing in your Node.js server application. For example, you might use it to:
- Process large files or images
- Send email or SMS messages
- Run batch jobs
- Perform complex calculations
- Schedule tasks to run at a later time
There are a number of different ways to implement background job processing in Node.js. One popular approach is to use a message queue. A message queue is a first-in-first-out (FIFO) system that allows you to send messages between different processes. When you want to run a background job, you can add a message to the queue. The message will then be processed by a separate worker process, which is a process that is dedicated to running background jobs.
In this article, I will share how to use the BullMQ npm package together with a Redis server to implement background job processing in Node.js. I’ll also cover how to deploy your background job application to Heroku.
Requirements for this tutorial
To get started, you will need to have the following installed:
- Node.js
- Redis
- The Bull and Redis libraries
- The Heroku CLI
Once you have these installed, you can follow the instructions in this article to create your background job application.
Quick introduction to Heroku
You’re welcome to skip if you already know or use Heroku.
Heroku is a cloud platform that makes it easy to deploy and scale web applications. It provides a number of features that make it ideal for deploying background job applications, including:
-
Automatic scaling: Heroku will automatically scale your application up or down based on demand, so you don’t have to worry about managing your own infrastructure.
-
Managed services: Heroku provides a number of managed services, such as Redis and Postgres, so you don’t have to worry about provisioning and managing these services yourself.
To get started with Heroku, you will need to create an account and install the Heroku CLI. Once you have done that, you can create a new Heroku app and deploy your application to it.
The benefit of using Heroku for background job processing is that it is easily scalable, naturally fits long-running processes, and makes it easy to colocate your background job application with your main application server.
Quick introduction to BullMQ and Redis
BullMQ is a message queue library for Node.js. It allows you to send and receive messages between different processes. Redis is an in-memory data store that BullMQ uses as a message queue.
BullMQ and Redis work together to provide a reliable and scalable way to process background jobs. BullMQ handles the queuing and routing of messages, while Redis stores the messages and provides a high-performance, consistent way to access them.
Some key features of BullMQ include:
- Supports multiple queue types, including FIFO, LIFO, and priority queues.
- Allows you to schedule jobs to run at a later time.
- Supports retries for failed jobs.
- Supports rate-limiting for jobs.
Setting up a Heroku application with Redis
To set up a Heroku application with Redis, you will need to:
- Create a new Heroku app.
- Provision a Redis instance.
- Set up the Procfile for a new worker configuration.
Here are the commands you can use to do this:
heroku create my-app
heroku addons:create heroku-redis:hobby-dev
echo "worker: node worker.js" >> Procfile
The first command creates a new Heroku app named my-app
. The second command provisions a Redis instance on the Hobby Dev plan. The third command creates the Heroku app infrastructure file Procfile
, which tells Heroku to provision a background job processing resource and to run the node worker.js
command for it.
The worker.js
file is where you will put your code for processing background jobs. We’re getting to it next!
The BullMQ worker
The worker is the process that will consume work from the queue and run the background jobs. In Heroku, and generally speaking, it’s a separate Node.js runtime process and as such it is entirely unrelated in terms of deployment from that of your main web API Node.js runtime, if you have one.
It’s also generally considered to be a long-running process, meaning it will run indefinitely until it is stopped. If there’s no work, it simply idles. It will connect to the Redis instance and listen for jobs to be added to the queue. When a job is added to the queue, the worker will process the job and then mark it as complete.
Setting up the BullMQ worker
Setting up the BullMQ worker is as simple as creating a new file, worker.js
, and adding the following code:
import { Worker } from "bullmq";
// Redis connection details
const workerConnectionOptions = {
host: import.meta.env.hostname,
port: import.meta.env.port,
password: import.meta.env.password,
};
// The following sets up the worker:
// 1. Connects to a queue named `uploaded_files_queue`
// 2. Runs the `workerJobHandler` function when a job is added to the queue
// 3. Creates a new worker instance that allows hooking to events in the worker lifecycle
const workerInstance = new Worker('uploaded_files_queue', workerJobHandler, {
connection: workerConnectionOptions,
});
// The `workerJobHandler` function is the function that will be called
// when a job is added to the queue. It will receive the job as an argument.
// The job will contain the data that was added to the queue when the job
// was created.
async function workerJobHandler(job) {
console.log(`handling job: [${job.id}]`);
console.log({ jobName: job.name, jobId: job.id, data: job.data });
// for example:
// await processUploadedFile(job.data.fileId)
return;
}
Acting on events in the worker lifecycle
We can hook into events in the worker lifecycle to perform actions when certain events occur. For example, we can hook into the completed
event to perform an action when a job is completed. We can also hook into the failed
event to perform an action when a job fails.
Add the following to the worker.js
file:
workerInstance.on("completed", async (job) => {
console.log(`[${job.id}] entering job completion stage!`);
console.log(`[${job.id}] has completed!`);
});
workerInstance.on("failed", (job, err) => {
console.error(`[${job.id}] has failed with ${err.message}`);
console.error(err);
});
workerInstance.on("error", (err) => {
console.error(`WorkerInstance has errored with ${err.message}`);
});
Tip: Avoid try/catch for error handling. You don’t need to customize error handling within the worker handler function. BullMQ automatically catches thrown errors from the function. Next, it retries failed jobs, and moves them to a dead letter queue after a certain number of retries.
Utilizing CPU cores with BullMQ
Node.js, being a single-threaded runtime, can only utilize a single CPU core. This means that if you have a multi-core CPU host assigned to the worker process then you’ll constantly under-utilize your hardware resources. Instead, you can utilize all CPU cores by running multiple instances of the worker. This is often referred to as clustering and it’s a common pattern in Node.js applications.
We can use a small wrapper for Node.js’s built-in clustering capabilities with the throng npm package in order to launch a separate Node.js runtime process for each CPU.
We will change the worker.js
file to accompany for throng
:
import throng from "throng";
throng({ worker });
// wrap the worker code in a function called `worker` which will then be called by `throng`
// function worker() {
// ... the worker code from previous section
// }
Better BullMQ worker with Dependency Injection
Your background job workers, while deployed separately from your main web APIs, will still need to access your application’s services and dependencies. For example, you may want to access your database, or your file storage service, to which you’ll need to pass the necessary configuration, credentials, and potentially other domain logic.
In order to do this, we can use a dependency injection container to inject dependencies into the worker. We can use the Awilix npm package to do this.
The pattern I suggest is as follows:
-
The worker is co-located with the Node.js API service/microservice source code it is related to. This way, it can easily access domain logic via high-level abstraction APIs (the services, i.e:
FileStorageService
,DatabaseService
, etc). -
The worker is instantiated with a dependency injection layer that is allows the worker to request access to any of the necessary dependencies, just as if it was another HTTP API route handler in the main service.
This will end up looking something like this:
import throng from "throng";
import { Config } from "./services/core/Config.js";
import { DatabaseManager } from "./services/core/db.js";
import { initDI } from "./infra/di.js";
import { WorkerFactory } from "./workers/fileUploadWorker.js";
async function initDatabase(config) {
// Initialize the database
const database = new DatabaseManager(config);
// Ensure database connection is ready
await database.ping();
// Return the database instance
return database;
}
// Load configuration
const configManager = new Config();
const config = await configManager.load();
// Initialize the database
const database = await initDatabase(config);
// Initialize DI
const diContainer = await initDI({ config, database });
const Logger = diContainer.resolve("Logger");
Logger.info(`Worker initialized`);
const worker = WorkerFactory({ container: diContainer });
throng({ worker });
In the above code example, we initialize configuration, and database connection details, then pass these to the dependency injection layer to make them available to DI consumers. As you notice, we wrap the worker code in a Factory Function to make the dependency injection container available to the worker.
The BullMQ producer client
To make this guide complete, we’ll also review shortly the BullMQ client which is responsible to add jobs to the queue for the worker to consume.
It is in fact, as simple as:
import { Queue } from "bullmq";
// Create a new queue instance
const queue = new Queue('uploaded_files_queue', {
connection: workerConnectionOptions,
});
// Schedule a new job on the queue with:
// 1. a name that is associated with this job
// 2. any metadata this job should include (a JSON object)
queue.add(jobName, jobData);
Tip: BullMQ will default to schedule jobs with a retry
attempts
value set to 0 which means it won’t retry failed jobs. You can override this by setting theattempts
value to a number greater than 1. More on this, jobs are retried immediately unless a delay is specified. You can override this by setting thebackoff
value be an object with typeexponential
and a delay specified in milliseconds.
To accommodate for job retries, and other queue house-keeping configuration we can further customize the job queue configuration as follows for when we schedule jobs:
// This configuration can be provided at either the
// queue level or the job level. In this example
// it is set at the job level.
const jobQueueConfig = {
attempts: 2,
backoff: {
type: "exponential",
delay: 30000,
},
removeOnComplete: {
age: 24 * 3600,
},
removeOnFail: {
age: 24 * 3600,
},
};
queue.add(jobName, jobData, jobQueueConfig);
Next steps
Don’t forget you have to actually deploy your worker to Heroku. You can do this by running the following command:
git push heroku main
If you are interested in learning more about BullMQ and Redis, I recommend checking out the following resources:
- BullMQ documentation: https://docs.bullmq.io/
- Redis documentation: https://redis.io/documentation
You’d also want to look into alternative implementations for queue with RabbitMQ and the amqplib
npm package.