The what, why, and how of Message Queues

If you have ever heard of message queues like Apache Kafka and been confused about the what, why, and how of messaging queues in modern applications, this one is for you.

Dakshraj Sharma
The Startup

--

If you have had any amount of interest in things like scalable architectures and microservices, there is a good chance you have come across messaging queues. Perhaps you have heard of companies making use of Apache Kafka, or an alternative like RabbitMQ, those two being the most popular messaging queues in use today. And, perhaps, just like me, you were confused by what messaging queues did and how they helped modern distributed architectures. Let’s try to understand that.

First things first, let’s get the absolute basics out of the way. What even is a messaging queue?

What is a message queue?

Message queues or brokers are components used for inter-process communication or for inter-microservice communication, whereby messages between multiple services are passed not through direct transfer of data, but through a common “queue” or buffer in the form of these messaging queues. The paradigm is similar to the publisher/subscriber model, whereby multiple publishers or “producers” push messages into “queues”, and subscribers or “consumers” can listen for new messages coming into the queue. As the consumer “consumes” or picks up a message from the queue, the message is removed from the queue. The model is asynchronous in nature, such that the interaction of producers and consumers with the queue is completely independent of one another.

Maybe you’re still confused. Let’s try to understand some use cases for such messaging queues to aid your understanding.

What are some use cases for messaging queues?

As you might have correctly guessed, messaging queues aren’t meant for real-time communication. To take a made-up, unrealistic scenario, using them as an intermediate, say, for an HTTP request where the user has to wait on the response is probably not a good idea. Using messaging queues in a producer-consumer model gives us no guarantee of when the consumer actually will take up the message for processing. All we have is a guarantee that the message will eventually get consumed and processed. That’s not exactly true, because, in real-world large-scale systems, things like queue overflows may be a real problem. But for our understanding, we will be assuming a relatively fail-safe system. Owing to these asynchronous nature, messaging queues are good for processes that are not crucial but nice to have done. The processes which are performed by the consumers should, ideally, not be core to the functioning of the overall architecture and functionality of the application, but they may be processes that help improve the functionality and/or performance of the application.

Some of the use cases for messaging queues in real-world scenarios today are listed below:

Sending emails

Emails are used for a large number of purposes, such as marketing campaigns, account verification, password resets, et cetera. If you think about these use cases, you may realize that none of them need immediate processing. Delays in the dispatch of these emails are completely acceptable and do not detract from the core functionality of the applications utilizing emails for various purposes. Message queues can help in such a case.

  • Multiple “producer” services that generate emails and require them to be published can push email objects (by objects I mean some formatted objects that contain all necessary data such as the content, receiver, subject, etc. for the email) into the queue.
  • A single consumer microservice, dedicated to sending emails, works completely independently of where the email comes from. This service consumes messages from the queue one by one and sends emails as specified in the email object.
  • This approach is well scalable, as in case the number of incoming messages to queue gets too large, we may horizontally scale (read: add more number of) consumer email service instances that may all listen to the same queue, sending emails.

Data post-processing

Assume you have a Blogging application that needs to handle large amounts of image data, from images uploaded by users themselves. Users can’t be expected to provide images that are web-optimized or small in size. However, declining users from uploading certain images due to size may not be the best user experience, and you may want to allow users to upload any images they want if your architecture and storage capacity is capable of handling it.

One solution for allowing user’s flexibility while not impacting the load times of your application negatively (owing to larger assets) is to post-process and optimize all images that are uploaded by the user. This is not at all a crucial operation: while it may impact user experience slightly, the optimization is in no way crucial to the application functionality, nor is it necessary that the action is performed instantly. Service may be employed in the application architecture whose sole purpose is optimizing images that get uploaded on the application. A message queue can help in this scenario. The flow of control and messages for the same maybe something like this:

  • The user publishes a blog post with some high-quality, large images.
  • The image gets pushed to storage (something like AWS S3 or Azure Blob Storage).
  • A hook is triggered in the application, pushing a message with information on the newly uploaded image into the “image optimization” queue.
  • The image optimization service can listen to the queue. It pulls an image from S3, optimizes it, and them reuploads the optimized image to replace the unoptimized one on S3.

Batch updates for databases

Databases are used for many purposes. Not all of them are crucial to application usability. Assume you have an application like Spotify. This application may have databases storing user data, which the user can update and/or view at any time. This data is important and any changes in this data may need to be reflected immediately. On the other hand, your application may have a statistical/machine learning engine, which analyzes user actions for any reason, be it for optimizing recommendations or for generating stats. This operation may be considered non-crucial in terms of how immediate the updates need to be. In general, delays in the integration of the latest user activity into the machine learning algorithms may we completely acceptable. Here, a message queue may be used for optimizing database querying.

The establishment of a database connection has some overhead irrespective of the amount of data that is transferred. Even if a persistent connection is used, transit times become a factor for large-scale systems. In general, if possible, batching is suggested for operations such as record insertion. In the scenario represented above, each action by the user such as song listen, like, playlist creation, etc. may be used for optimizing user recommendations. However, creating a database request for every single such operation would be insanity, for lack of a better word.

Instead, a better approach maybe this:

  • For every action, push the action data to a message queue. This will be magnitudes faster than sending the data directly to the database.
  • A consumer service may cache these activities’ data on a per-user basis as they come into the queue.
  • Caching here is acceptable for two reasons. One, this data is non-critical. Losing this data will not break the application, and the user will never be affected or even know. Two, using a cache is much faster for temporary data (Of course cost is also a factor, but let’s not dive into that for the purpose of this article).
  • On a regular interval, the consuming service may use the cached data, transform it into a single database insertion query, and commit that change to the database.

How the machine learning/data warehousing engine later uses the data is a completely different story, but I hope are able to get a grasp on the several use cases for message queues. Now that you have an idea of the basic concept, let’s look into some of the main features of messaging queues.

Features / Advantages of message queues

  • Decoupling: Operations of the consumer and producer are completely independent of one another.
  • Scalability: The number of producers and consumers can easily be scaled as required.
  • Buffering and load management: Message queues act essentially as data buffers. In case of a spike in the amount of data that needs to be processed by the consumer service, the latter needs not be made aware of the fact. The message queue buffers the data for the service, and the service only needs to process the data one by one, rather than having to manage a large amount of data all at once. This makes your architecture more robust overall.
  • Resiliency: Even if your consumer processes go down, it doesn’t mean that your application breaks. Messages for the consumer will remain queued in the messaging queue. Whenever the consumer service comes back up, it can start processing the messages without having to do any additional setup or work.
  • Delivery guarantees: Message queues offer a general guarantee that any message that gets pushed to a queue will get delivered to a consumer eventually.
  • Order guarantee: Being a queue, an order is associated to the incoming messages implicitly. The same order is followed in the consumption and processing of the data.

Now, hopefully, you have a slightly better understanding of the what and why of messaging queues, compared to when you started reading this article. Now, let’s try using a messaging queue in a small application to understand how it would be used in an actual application.

Using a messaging queue in code

Disclaimer: The application that I’ll be using is a very simple one, and it honestly does nothing of value. However, it should allow you to understand where a messaging queue may fit into a real application.

The architecture for the application will be something like this:

For the messaging queues, I’ll be using RabbitMQ, as it is one of the easier to use messaging queue systems. The concepts should be transferrable to any other system such as Apache Kafka, which works on the same principles. For the application components, I use docker and then docker-compose for putting up the containers. For the application code, I use NodeJS in the example. However, of course, the concept will be language agnostic.

Setup

Let’s see the directory structure used for the application. All the code for the repository will be available here.

|-consumer/ 
| |-Dockerfile
| |-package.json
| |-app.js
|-producer/
| |-Dockerfile
| |-package.json
| |-app.js
|-secondary_producer/
| |-Dockerfile
| |-package.json
| |-app.js
|-docker-compose.yml

The package.json file in NodeJS applications is used for managing dependencies and application metadata. The dependencies for the three applications are identical, and the package.json files for each of the three applications are almost identical. You may change the “name” key to match the application, but even that’s not mandatory.

{ 
"name": "producer",
"version": "1.0.0",
"description": "",
"main": "app.js",
"scripts": { "start": "node app.js", },
"author": "Dakshraj Sharma",
"license": "ISC",
"dependencies": { "amqplib": "^0.5.6" }
}

Now, for the Dockerfiles. The Dockerfiles for each of the three applications is exactly the same since they are each nothing but simple NodeJS applications.

FROM node:alpineWORKDIR /usr/app/COPY ./package*.json ./RUN npm installRUN npm install -g pm2COPY ./ ./CMD [ "pm2-runtime", "npm", "--", "start" ]

This Dockerfile does the following things:

  • Pulls the node:alpine image
  • Creates a directory /usr/app for the application
  • Copies to local package.json file to the container’s /usr/app directory and then installs dependencies by running ‘npm install’.
  • Installs pm2 using ‘npm install -g pm2’ to ensure that the application restarts if it errors.
  • Copies code file (essentially only app.js) to /usr/app.
  • Runs the application using ‘pm2-runtime npm — start’. If you want to learn more about pm2, the official website may be a good place to start, but that may be beyond the scope of this article.

Docker-compose is responsible for running the four containers of our application together and allows connectivity between them. The content of the docker-compose.yml file is very simple:

docker-compose.yaml

We simply create four ‘services’ or containers.

A RabbitMQ container from the latest rabbitmq image.

  • Password and username for connecting to rabbitmq are passed as environment variables.
RABBITMQ_DEFAULT_USER: rabbitmqRABBITMQ_DEFAULT_PASS: rabbitmq

The service specification for the remaining three services is exactly the same, barring the directory paths and service names.

  • Build from the Dockerfile in the specified path:
  • Tell docker-compose to restart the container if it goes down and that the container should depend on the rabbitmq container.
build:context: ./consumer/dockerfile: Dockerfile
  • Set environment variables for the rabbitmq connection (will be used in the code) via ‘process.env’.
RABBITMQ_HOST: rabbitmqRABBITMQ_USER: rabbitmqRABBITMQ_PASS: rabbitmqRABBITMQ_QUEUE: messages
  • Note the RABBITMQ_HOST variable. In a real-world scenario, this would be replaced by the IP/URL of the machine that is running RabbitMQ. If a local installation is being used, the localhost may be specified. Since we are using docker-compose, specifying rabbitmq as an address will automatically be directed to the container/service named rabbitmq.
  • Bookmark /app/node_modules so we do not copy it repeatedly. Also, mount the local files to /usr/app so code changes may be reflected in real-time if using nodemon.
volumes:- /usr/app/node_modules- ./secondary_producer:/usr/app/

The code

Let’s write up our first producer, inside ‘producer/app.js’. To connect with RabbitMQ, we will use the NPM library ‘amqplib’, which will be available for us to use since it was added as a dependency to our package.json already (see above).

const amqp = require('amqplib');

We then store the environment variables as local const values. Using the host, username, and password variables, we also form the connection string using Javascript template strings and store it as RABBITMQ_CONNECTION_STRING.

const RABBITMQ_HOST = process.env.RABBITMQ_HOST; // Hostconst RABBITMQ_USER = process.env.RABBITMQ_USER; //Usernameconst RABBITMQ_PASS = process.env.RABBITMQ_PASS; // Password// Connection string: ampq://username:password@hostconst RABBITMQ_CONNECTION_STRING = `amqp://${RABBITMQ_USER}:${RABBITMQ_PASS}@${RABBITMQ_HOST}/`;// The name of the queue we will connect toconst RABBITMQ_QUEUE = process.env.RABBITMQ_QUEUE;

You might be confused as to why we need to have a queue name value when we already have the host defined. This is because a single RabbitMQ can have many queues, independent of each other. Each of these queues is identified by a string identifier, which we have declared as an environment variable for the current application.

We define a function that returns a random number. In a real-world scenario, this may have been any function that supplied some data which had to be passed into the messaging queue.

const getRandomNumber = () => { return Math.random(); };

Fairly basic stuff upto here. Now, let’s try to connect to the RabbitMQ instance, set up a queue if one doesn’t already exist, and then push in data to the queue. Connection and channel creation are asynchronous and return promises, so we use then and catch blocks.

producer/app.js

To summarize what the code is doing:

  • Create a new connection to the RabbitMQ instance through
amqp.connect(RABBITMQ_CONNECTION_STRING) 
.then((conn) => {
return conn.createChannel(); // Create channel
})
  • Create a queue in the channel to send to
.then((chan) => { 
return chan.assertQueue(
RABBITMQ_QUEUE,
{ durable: false, }
) // specify queue to send to
  • Send data into the queue
.then(() => { 
chan.sendToQueue(
RABBITMQ_QUEUE,
Buffer.from("ANY MESSAGE HERE");
);
})

For messages to be sent into the queue, any buffered bytes work. So, essentially, you could send a JSON.stringified object into the queue, which could be later parsed at the consumer end as a JSON object.

Now we have a producer up. Let’s create the secondary producer in ‘secondary_producer/app.js’ with nearly identical code, which will send a random number into the queue 50 times, every 1.5 second. No explanation should be needed for the code as it is exactly the same as the code for the first producer. Note that we ensure here that the second producer also will send messages to the same queue.

secondary_producer/app.js

Now, then, we have two producers that will dump, in total, 100 messages into the queue.

Let’s now create a consumer for the messages that may do something with the messages.

consumer/app.js

To summarize what the consumer is doing:

  • Environment variables, connection to the RabbitMQ instance, channel creation, and queue specification remains exactly the same as for the producers.
  • To consume messages from channel, we use the code:
.then(() => {
// consume messages from specific queue
chan.consume(RABBITMQ_QUEUE, (message) => {
// do something with the message
setTimeout(() => {
if (message !== null) {
// Increment consumer count
consumer_count++;

// Log the received value to console
console.log(
`[ RECEIVED: ${message.content.toString()} | COUNT: ${consumer_count} ] `.toUpperCase()
);
}
}, 2000); // Wait for two seconds to simulate some blocking operation
});
});

Something to note here is the fact that we do not use a loop in our consumer. This means that consumer connections work somewhat like WebSocket connections, whereby the consumer constantly listens on the channel for incoming messages. Whenever the consumer service is free to do work, it reaches into the queue and gets the oldest message (first in the queue) and processes it as required. In case there is no message, the consumer will wait on the queue.

To run the app, run

docker-compose up --build

Of course, it requires you to have docker and docker-compose installed locally. However, NodeJS and RabbitMQ installations are not needed. ( Why? Because docker is awesome like that)

Running docker-compose output you’ll see an output that looks something like this:

Eventually, when all of the messages have been processed, there will be no more console logging. The producers do not send any more data into the queue, and the consumer continues waiting on the queue.

If you look hard enough, you’ll see how the order of messages being sent and being received is perfectly in line, as expected from the queue. Further, we also see that the consumer works on the messages one by one, independent of how the messages arrive, logging a RECEIVED message exactly every 2 seconds.

That was all for this article. Although I have not discussed complex concepts and strategies used by modern MQ systems like Apache Kafka, such as distributed commit logging and partitioning of queues, you hopefully now have greater insight into what a messaging queue is and can imagine many more use cases where such a system may prove to be useful. Perhaps you could think of how you could integrate an MQ system into your own projects. The principles discussed here should be effectively transferrable to any other MQ system.

Until I find something else I feel worthy of sharing, take care and happy learning :)

--

--

Dakshraj Sharma
The Startup

Fullstack Developer and Devops Engineer | Learning all the time.