Message Queue Service using Kafka

In this article, we are going to create a Message Queue Service using Kafka and KafkaJS, somewhere similar to SQS, and we will make sure that messages are processed exactly once, in the exact order that they are sent. We will also implement the redeliv…


This content originally appeared on DEV Community and was authored by Chandraprakash Soni

In this article, we are going to create a Message Queue Service using Kafka and KafkaJS, somewhere similar to SQS, and we will make sure that messages are processed exactly once, in the exact order that they are sent. We will also implement the redelivery mechanism to make sure that if something fails in our business logic, we can add it to the queue again.
Find the source code on Github:
https://github.com/icpsoni/kafka-message-queue

Prerequisites

1: Get Kafka
Download Kafka and extract it from here, and navigate to the directory.

$ tar -xzf <file_name>.tgz
$ cd <file_name>tec

2: Start the Kafka Environment
NOTE: Your local environment must have Java 8+ installed.
Run the following commands to start ZooKeeper:

# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

Open another terminal session and run Kafka Broker service using:

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.

3: Create a Topic to Store our Queue Messages
With the following command, we will create a topic called “message-queue” in our Kafka.

$ bin/kafka-topics.sh --create --topic message-queue --bootstrap-server localhost:9092

To check the created topic use this command.

$ bin/kafka-topics.sh --describe --topic message-queue --bootstrap-server localhost:9092

You can create a topic with any number of partitions, In here we are using 1 partition only.

Creating Services Using KafkaJS

We basically need 3 things to make our queue system work perfectly.

  1. Sending Events to Kafka Queue topic.
  2. Subscribing to the topic and reading the Queue Message.
  3. Handling the redelivery.

kafka-config.js

The following snippet contains the basic config of Kafka that we need for our queue system.

// Using KafkaJs nodejs library
import { Kafka } from 'kafkajs';

// kafka broker running on localhost:9092 default port
const kafkaBroker = 'localhost:9092';

// kafka topic used for queue messages
export const kafkaTopic = 'message-queue';

// kafka client with basic config
export const KafkaClient = new Kafka({
  brokers: [kafkaBroker]
});

producer.js

This service will send messages to the Kafka topic we created earlier. We can use sendMessageToQueue function and pass the message object which needs to be sent to Kafka.

import { KafkaClient } from './index.js';
import { kafkaTopic } from "./kafka-config";

export const sendMessageToQueue = async (message) => {
  const producer = KafkaClient.producer();
  await producer.connect();
  await producer.send({
    topic: kafkaTopic,
    messages: [
      {
        value: message // Your message data goes here
      }
    ]
  });
  // Disconnect producer once message sending is done.
  await producer.disconnect();
};

message-queue.js

This is our main service that takes care of receiving the messages from the Kafka queue doing the business logic and handling the redelivery if something goes wrong in business logic.

import { sendMessageToQueue } from "./producer.js";
import { KafkaClient, kafkaTopic, kafkaGroupId } from "./kafka-config.js";

export const consumeMessage = async () => {
  // Creating a Consumer Instance
  const consumer = KafkaClient.consumer({
    groupId: kafkaGroupId,
  });

  await consumer.connect();
  // Subscribing to out Kafka topic
  await consumer.subscribe({ topic: kafkaTopic, fromBeginning: true});

  await consumer.run({
    autoCommit: false, // It won't commit message acknowledge to kafka until we don't do manually
    eachMessage: async ({ topic, partition, message}) => {
      const messageData = message.value.toString();
      try {
        // Do the business Logic
        console.info('Received Message', messageData);
      } catch (error) {
        console.error(error);
        // Resending message to kafka queue for redelivery
        await sendMessageToQueue(messageData);
      } finally {
        const offset = +message.offset + 1;
        // Committing the message offset to Kafka
        await consumer.commitOffsets([{topic: kafkaTopic, partition, offset: offset.toString()}]);
      }
    }
  });
};

Download the source
Bonus: It contains test sample files too.
https://github.com/icpsoni/kafka-message-queue

References:

  1. https://kafka.apache.org/quickstart
  2. https://kafka.js.org/docs/getting-started


This content originally appeared on DEV Community and was authored by Chandraprakash Soni


Print Share Comment Cite Upload Translate Updates
APA

Chandraprakash Soni | Sciencx (2021-05-09T19:18:00+00:00) Message Queue Service using Kafka. Retrieved from https://www.scien.cx/2021/05/09/message-queue-service-using-kafka/

MLA
" » Message Queue Service using Kafka." Chandraprakash Soni | Sciencx - Sunday May 9, 2021, https://www.scien.cx/2021/05/09/message-queue-service-using-kafka/
HARVARD
Chandraprakash Soni | Sciencx Sunday May 9, 2021 » Message Queue Service using Kafka., viewed ,<https://www.scien.cx/2021/05/09/message-queue-service-using-kafka/>
VANCOUVER
Chandraprakash Soni | Sciencx - » Message Queue Service using Kafka. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2021/05/09/message-queue-service-using-kafka/
CHICAGO
" » Message Queue Service using Kafka." Chandraprakash Soni | Sciencx - Accessed . https://www.scien.cx/2021/05/09/message-queue-service-using-kafka/
IEEE
" » Message Queue Service using Kafka." Chandraprakash Soni | Sciencx [Online]. Available: https://www.scien.cx/2021/05/09/message-queue-service-using-kafka/. [Accessed: ]
rf:citation
» Message Queue Service using Kafka | Chandraprakash Soni | Sciencx | https://www.scien.cx/2021/05/09/message-queue-service-using-kafka/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.