This content originally appeared on DEV Community and was authored by Mads Quist
Heyđź‘‹
I'm Mads Quist, founder of All Quiet . We've implemented a home-grown message queue based on MongoDB and I'm here to talk about:
- Why we re-invented the wheel
- How we re-invented the wheel
1. Why we re-invented the wheel
Why do we need message queuing?
All Quiet is a modern incident management platform, similar to PagerDuty. Our platform requires features like:
- Sending a double-opt-in email asynchronously after a user registers
- Sending a reminder email 24 hours after registration
- Sending push notifications with Firebase Cloud Messaging (FCM), which can fail due to network or load problems. As push notifications are crucial to our app, we need to retry sending them if there's an issue.
- Accepting emails from outside our integration and processing them into incidents. This process can fail, so we wanted to decouple it and process each email payload on a queue.
Our tech stack
To understand our specific requirements, it's important to get some insights into our tech stack:
- We run a monolithic web application based on .NET Core 7. The .NET Core application runs in a Docker container.
- We run multiple containers in parallel.
- An HAProxy instance distributes HTTP requests equally to each container, ensuring a highly available setup.
- We use MongoDB as our underlying database, replicated across availability zones.
- All of the above components are hosted by AWS on generic EC2 VMs.
Why we re-invented the wheel
- We desired a simple queuing mechanism that could run in multiple processes simultaneously while guaranteeing that each message was processed only once.
- We didn't need a pub/sub pattern.
- We didn't aim for a complex distributed system based on CQRS / event sourcing because, you know, the first rule of distributed systems is to not distribute.
- We wanted to keep things as simple as possible, following the philosophy of choosing "boring technology".
Ultimately, it's about minimizing the number of moving parts in your infrastructure. We aim to build fantastic features for our excellent customers, and it's imperative to maintain our services reliably. Managing a single database system to achieve more than five nines of uptime is challenging enough. So why burden yourself with managing an additional HA RabbitMQ cluster?
Why not just use AWS SQS?
Yeah… cloud solutions like AWS SQS, Google Cloud Tasks, or Azure Queue Storage are fantastic! However, they would have resulted in vendor lock-in. We simply aspire to be independent and cost-effective while still providing a scalable service to our clients.
2. How we re-invented the wheel
What is a message queue?
A message queue is a system that stores messages. Producers of messages store these in the queue, which are later dequeued by consumers for processing. This is incredibly beneficial for decoupling components, especially when processing messages is a resource-intensive task.
What characteristics should our queue show?
- Utilizing MongoDB as our data storage
- Guaranteeing that each message is consumed only once
- Allowing multiple consumers to process messages simultaneously
- Ensuring that if message processing fails, retries are possible
- Enabling scheduling of message consumption for the future
- Not needing guaranteed ordering
- Ensuring high availability
- Ensuring messages and their states are durable and can withstand restarts or extended downtimes
MongoDB has significantly evolved over the years and can meet the criteria listed above.
Implementation
In the sections that follow, I'll guide you through the MongoDB-specific implementation of our message queue. While you'll need a client library suitable for your preferred programming language, such as NodeJS, Go, or C# in the case of All Quiet, the concepts I'll share are platform agnostic.
Queues
Each queue you want to utilize is represented as a dedicated collection in your MongoDB database.
Message Model
Here's an example of a processed message:
{
"_id" : NumberLong(638269014234217933),
"Statuses" : [
{
"Status" : "Processed",
"Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"),
"NextReevaluation" : null
},
{
"Status" : "Processing",
"Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"),
"NextReevaluation" : null
},
{
"Status" : "Enqueued",
"Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"),
"NextReevaluation" : null
}
],
"Payload" : {
"YourData" : "abc123"
}
}
Let’s look at each property of the message.
_id
The _id
field is the canonical unique identifier property of MongoDB. Here, it contains a NumberLong
, not an ObjectId
. We need NumberLong
instead of ObjectId
because:
While ObjectId
values should increase over time, they are not necessarily monotonic. This is because they:
Only contain one second of temporal resolution, so ObjectId values created within the same second do not have a guaranteed ordering, and are generated by clients, which may have differing system clocks.
In our C# implementation, we generate an Id
with millisecond precision and guaranteed ordering based on insertion time. Although we don't require strict processing order in a multi-consumer environment (similar to RabbitMQ), it's essential to maintain FIFO order when operating with just one consumer. Achieving this with ObjectId
is not feasible. If this isn't crucial for you, you can still use ObjectId
.
Statuses
The Statuses
property consists of an array containing the message processing history. At index 0
, you'll find the current status, which is crucial for indexing.
The status object itself contains three properties:
-
Status
: Can be "Enqueued", "Processing", "Processed", or "Failed". -
Timestamp
: This captures the current timestamp. -
NextReevaluation
: Records when the next evaluation should occur, which is essential for both retries and future scheduled executions.
Payload
This property contains the specific payload of your message.
Enqueuing a message
Adding a message is a straightforward insert operation into the collection with the status set to "Enqueued"
.
- For immediate processing, set
NextReevaluation
to null. - For future processing, set
NextReevaluation
to a timestamp in the future, when you want your message to be processed.
db.yourQueueCollection.insert({
"_id" : NumberLong(638269014234217933),
"Statuses" : [
{
"Status" : "Enqueued",
"Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"),
"NextReevaluation" : null
}
],
"Payload" : {
"YourData" : "abc123"
}
});
Dequeuing a message
Dequeuing is slightly more complex but still relatively straightforward. It heavily relies on the concurrent atomic read and update capabilities of MongoDB.
This essential feature of MongoDB ensures:
- Each message is processed only once.
- Multiple consumers can safely process messages simultaneously.
db.yourQueueCollection.findAndModify({
"query": {
"$and": [
{
"Statuses.0.Status": "Enqueued"
},
{
"Statuses.0.NextReevaluation": null
}
]
},
"update": {
"$push": {
"Statuses": {
"$each": [
{
"Status": "Processing",
"Timestamp": ISODate("2023-08-06T06:50:23.800+0000"),
"NextReevaluation": null
}
],
"$position": 0
}
}
}
});
So we are reading one message that is in state “Enqueued”
and at the same time modify it by setting the status “Processing”
at position 0
. Since this operation is atomic it will guarantee that the message will not be picked up by another consumer.
Marking a message as processed
Once the processing of the message is complete, it's a simple matter of updating the message status to "Processed"
using the message’s id
.
db.yourQueueCollection.findAndModify({
"query": {
"_id": NumberLong(638269014234217933)
},
"update": {
"$push": {
"Statuses": {
"$each": [
{
"Status": "Processed",
"Timestamp": ISODate("2023-08-06T06:50:24.100+0000"),
"NextReevaluation": null
}
],
"$position": 0
}
}
}
});
Marking a message as failed
If processing fails, we need to mark the message accordingly. Often, you might want to retry processing the message. This can be achieved by re-enqueuing the message. In many scenarios, it makes sense to reprocess the message after a specific delay, such as 10 seconds, depending on the nature of the processing failure.
db.yourQueueCollection.findAndModify({
"query": {
"_id": NumberLong(638269014234217933)
},
"update": {
"$push": {
"Statuses": {
"$each": [
{
"Status": "Failed",
"Timestamp": ISODate("2023-08-06T06:50:24.100+0000"),
"NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000")
}
],
"$position": 0
}
}
}
});
The dequeuing loop
We've established how we can easily enqueue and dequeue items from our "queue," which is, in fact, simply a MongoDB collection. We can even "schedule" messages for the future by leveraging the NextReevaluation
field.
What's missing is how we will dequeue regularly. Consumers need to execute the findAndModify
command in some kind of loop. A straightforward approach would be to create an endless loop in which we dequeue and process a message. This method is straightforward and effective. However, it will exert considerable pressure on the database and the network.
An alternative would be to introduce a delay, e.g., 100ms, between loop iterations. This will significantly reduce the load but will also decrease the speed of dequeuing.
The solution to the problem is what MongoDB refers to as a change stream.
MongoDB Change Streams
What are change streams? I can’t explain it better than the guys at MongoDB:
Change streams allow applications to access real-time data changes […]. Applications can use change streams to subscribe to all data changes on a single collection […] and immediately react to them.
Great! What we can do is listen to newly created documents in our queue collection, which effectively means listening to newly enqueued messages
This is dead simple:
const changeStream = db.yourQueueCollection.watch();
changeStream.on('insert', changeEvent => {
// Dequeue the message
db.yourQueueCollection.findAndModify({
"query": changeEvent.documentKey._id,
"update": {
"$push": {
"Statuses": {
"$each": [
{
"Status": "Processing",
"Timestamp": ISODate("2023-08-06T06:50:24.100+0000"),
"NextReevaluation": null
}
],
"$position": 0
}
}
}
});
Scheduled and Orphaned Messages
The change stream approach, however, does not work for both scheduled and orphaned messages because there is obviously no change that we can listen to.
- Scheduled messages simply sit in the collection with the status
"Enqueued"
and a"NextReevaluation"
field set to the future. - Orphaned messages are those that were in the
"Processing"
status when their consumer process died. They remain in the collection with the status"Processing"
but no consumer will ever change their status to"Processed"
or"Failed"
.
For these use cases, we need to revert to our simple loop. However, we can use a rather generous delay between iterations.
Wrapping it up
"Traditional" databases, like MySQL, PostgreSQL, or MongoDB (which I also view as traditional), are incredibly powerful today. If used correctly (ensure your indexes are optimized!), they are swift, scale impressively, and are cost-effective on traditional hosting platforms.
Many use cases can be addressed using just a database and your preferred programming language. It's not always necessary to have the "right tool for the right job," meaning maintaining a diverse set of tools like Redis, Elasticsearch, RabbitMQ, etc. Often, the maintenance overhead isn't worth it.
While the solution proposed might not match the performance of, for instance, RabbitMQ, it's usually sufficient and can scale to a point that would mark significant success for your startup.
Software engineering is about navigating trade-offs. Choose yours wisely.
This content originally appeared on DEV Community and was authored by Mads Quist
Mads Quist | Sciencx (2024-07-04T04:33:19+00:00) Why We Built a MongoDB-Message Queue and Reinvented the Wheel. Retrieved from https://www.scien.cx/2024/07/04/why-we-built-a-mongodb-message-queue-and-reinvented-the-wheel/
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.