This content originally appeared on DEV Community and was authored by Musole Masu
The goal behind Data streaming is to process and analyze in real-time, data that move from data sources to destinations.
It is mostly used in microservices to ensure inter-services communication. In a microservice architecture, the recommendation is to build independent services that can be altered, updated or taken down without affecting the rest of the architecture.
In this tutorial, we are going to learn how to use NATS Streaming in a Kubernetes Cluster. NATS Streaming is a data streaming system powered by NATS.
We will build a Basketball Dunk Contest App with two services, a Dunk Service that will handle players registration and dunk shot attempts for registered players. And a Statistic Service which will be displaying the Dunk Contest statistic in real-time from data accompanying events messages. NATS Streaming here will be the events transporter between our two services.
Before we dive into the code, make sure you have the following in order to follow along with this tutorial:
- Working knowledge of Node.js / typescript, Docker, Kubernetes Objects,
- Node.js (preferably the latest LTS version), Docker How to install docker on Ubuntu? and local Kubernetes Cluster installed via Minikube, click HERE to install minikube,
- Docker Hub account, click HERE to sign up,
I will be coding in a Linux machine for the tutorial.
1. Project Structure
Let's set up our project, we will first work on a number of kubernetes objects related to the project.
Run the following:
$ mkdir dunk-contest
$ cd dunk-contest/
$ mkdir kubernetes
These commands create the project directory dunk-contest/
then navigate inside the directory to create another directory named kubernetes/
.
In kubernetes/
directory, we are going to add new files with required configurations to build the following Kubernetes objects:
- Deployment objects for the NATS Streaming Server, for the Dunk Service MongoDB database and for the Statistic Service MongoDB database,
- Service objects for Pods running containers of NATS Streaming image, Mongo image for Dunk Service and for Statistic Service.
1.1 Deployment and Service objects
Make sure to work this part of the tutorial in the
kubernetes/
directory!
1.1.1 NATS Deployment and Service
-
Add a new YAML file named
nats-deployment.yaml
and put the configuration below:
apiVersion: apps/v1
kind: Deployment
metadata:
name: nats-deployment
spec:
replicas: 1
selector:
matchLabels:
app: nats
template:
metadata:
labels:
app: nats
spec:
containers:
- name: nats
image: nats-streaming:0.22.1
args:
[
"-p",
"4222",
"-hbi",
"5s",
"-hbt",
"5s",
"-hbf",
"2",
"-SD",
"-cid",
"dunk-contest",
]
This config file will create a Pod running a container of nats-streaming:0.22.1
docker image and a Deployment to monitor the Pod. Practically, this Pod will act as the project NATS Streaming Server exposing port 4222
to clients(Dunk Service and Statistic Service).
-
Add a new YAML file named
nats-service.yaml
and put the configuration below:
apiVersion: v1
kind: Service
metadata:
name: nats-service
spec:
selector:
app: nats
ports:
- name: client
protocol: TCP
port: 4222
targetPort: 4222
This config file will create a kubernetes object of kind Service, that other pods inside the kubernetes cluster will use to access the NATS streaming server Pod on port 4222
.
1.1.2 MongoDB Deployment and Service
Here we are going to add 4 new config files:
-
1
dunk-mongo-deployment.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: dunk-mongo-deployment
spec:
replicas: 1
selector:
matchLabels:
app: dunk-mongo
template:
metadata:
labels:
app: dunk-mongo
spec:
containers:
- name: dunk-mongo
image: mongo
With these configurations, kubernetes will create a Deployment object to monitor a MongoDB Pod of mongo docker image. This database will be dedicated to the Dunk Service.
-
2
dunk-mongo-service.yaml
:
apiVersion: v1
kind: Service
metadata:
name: dunk-mongo-service
spec:
selector:
app: dunk-mongo
ports:
- name: db
protocol: TCP
port: 27017
targetPort: 27017
Another config file that will create a kubernetes object of kind Service that will permit other pods in the cluster to access the mongo pod of the Dunk Service.
-
3
stats-mongo-deployment.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: stats-mongo-deployment
spec:
replicas: 1
selector:
matchLabels:
app: stats-mongo
template:
metadata:
labels:
app: stats-mongo
spec:
containers:
- name: stats-mongo
image: mongo
Kubernetes will use this set of configurations to create a Deployment object to manage the MongoDB Pod of mongo docker image. This mongo database will be dedicated to the Statistic Service only.
-
4
stats-mongo-service.yaml
:
apiVersion: v1
kind: Service
metadata:
name: stats-mongo-service
spec:
selector:
app: stats-mongo
ports:
- name: db
protocol: TCP
port: 27017
targetPort: 27017
Finally we have this config file to create a Service object that will expose the MongoDB Pod of the Statistic Service to other pods in the kubernetes cluster.
Your kubernetes/
directory tree structure should look like this by now:
.
├── dunk-mongo-deployment.yaml
├── dunk-mongo-service.yaml
├── nats-deployment.yaml
├── nats-service.yaml
├── stats-mongo-deployment.yaml
└── stats-mongo-service.yaml
0 directories, 6 files
Save all created files in the kubernetes/
directory and make sure that your Kubernetes cluster is up and running. Open the terminal and run the following:
$ minikube start
Minikube quickly sets up a local Kubernetes cluster on macOS, Linux, and Windows.
Now, let's tell Kubernetes to create objects using our configuration files. Run the command below in the kubernetes/
directory:
$ kubectl apply -f .
At this moment, we must have 3 running pods, one for nats-streaming, for mongodb of the Dunk Service and one for mongodb of the Statistic Service. Verify it with this command:
$ kubectl get pods
We reached the point in the tutorial where we are going to build our two services and connect them to the Pods created in this part of the tutorial. Let's do it in the next points.
1.2 Dunk Service
Here we are going to build an express application, listening on port 4001 for connections. It will have two API endpoints, one http://localhost:4001/dunk-contest/register
to handle POST request for players registration and the other http://localhost:4001/dunk-contest/attempt/:playerName
to handle POST request for players dunk shots attempts.
Navigate back to the project directory dunk-contest/
and create a new directory named dunk-service/
. In the dunk-service/
directory, generate a package.json
and install ts-node-dev
, typescript
, express
, @types/express
, node-nats-streaming
and mongodb
as dependencies:
$ cd dunk-service/
$ npm init -y
$ npm install ts-node-dev typescript express @types/express node-nats-streaming mongodb
Open the package.json
file, replace the actual script section by the one below:
"scripts": {
"start": "ts-node-dev src/index.ts"
}
Save the file. In the same directory create a directory named src/
,in src/
add a typescript file named nats-connector.ts
and paste the following:
import nats, { Stan } from "node-nats-streaming";
class NatsConnector {
private _client?: Stan;
get client() {
if (!this._client) {
throw new Error("Cannot access NATS Client before connecting!");
}
return this._client;
}
connectToNats(clusterId: string, clientId: string, url: string) {
this._client = nats.connect(clusterId, clientId, { url });
return new Promise<void>((resolve, reject) => {
this.client.on("connect", () => {
console.log(`DUNK SERVICE IS CONNECTED TO NATS STREAMING SERVER`);
resolve();
});
this.client.on("error", (err) => {
reject(err);
});
});
}
}
export const natsConnector = new NatsConnector();
Inside this file:
- We define a variable
_client
of Stan type, a type imported from node-nats-streaming library, - We export an instance of NatsConnector class that has a method called
connectToNats()
- connectToNats() takes three parameters, theclusterId
, theclientId
and theurl
:
- clusterId: This was set early in the NATS streaming server deployment configuration file. Dunk Service, here being a client will use it to connect to the NATS server,
- clientId: An identifier for the Dunk Service as client to the NATS server,
- url: The NATS Streaming server endpoint, that the Dunk Service will use to access resources in the NATS running pod.
In connectToNats(), to _client
we assign a function imported from node-nats-streaming
called connect()
on which we passe our three parameters as arguments.
And connectToNats() returns a promise that resolve if _client
get successfully connected to NATS server and reject if otherwise.
Next, add another typescript file named event-publisher.ts
and put the following:
import { Stan } from "node-nats-streaming";
export class EventPublisher {
private client: Stan;
constructor(client: Stan) {
this.client = client;
}
publishEvent(subject: string, data: any): Promise<void> {
return new Promise((resolve, reject) => {
this.client.publish(subject, JSON.stringify(data), (err) => {
if (err) {
return reject(err);
}
console.log("\x1b[36m%s\x1b[0m", `EVENT ${subject} PUBLISHED!`);
resolve();
});
});
}
}
In this one, we export the class EventPublisher
that has a variable named client
of type Stan
just like in the NatsConnetor class. We have a method in this class called publishEvent()
of two parameters:
- subject: This is the name of the channel which events passe through and reach clients that had subscribed to,
- data: the data or message accompanying published events.
And publishEvent()
returns a promise that resolve when events are successfully published and reject when there is a failure.
After this, in src/
directory, create a directory named routes/
; add two new typescript files:
-
1
registerPlayerRoutes.ts
a middleware and put the code below:
import { Router, Request, Response } from "express";
import { MongoClient } from "mongodb";
interface Players {
NAME: string;
HEIGHT: number;
WEIGHT: number;
EXPERIENCE: number;
}
const registerPlayerRouter = Router();
registerPlayerRouter.post(
"/dunk-contest/register",
async (req: Request, res: Response) => {
const player: Players = {
NAME: req.body.name,
HEIGHT: req.body.height,
WEIGHT: req.body.weight,
EXPERIENCE: req.body.experience,
};
const mongoClient = await MongoClient.connect(
"mongodb://localhost:27017/dunk-service"
);
const db = mongoClient.db();
const playerCollection = db.collection("players");
await playerCollection.insertOne(player);
console.log("\x1b[36m%s\x1b[0m", "PLAYER REGISTERED WITH SUCCESS");
const newPlayer = await playerCollection.findOne({
NAME: req.body.name,
});
console.table(newPlayer);
res.send({});
mongoClient.close();
}
);
export { registerPlayerRouter };
In the registerPlayerRoutes.ts
file above we did the following:
- Imported
Router
,Request
andResponse
from express; - Imported
MongoClient
from mongodb; Implemented a POST request on
registerPlayerRouter.post("/dunk-contest/register")
to register players to theplayers collection
in dunk-service MongoDB database and fetch the registered player. MongoClient is used here to connect this process to the appropriate MongoDB Pod.2
attemptDunkRoutes.ts
and put the code below:
import { Router, Request, Response } from "express";
import { MongoClient } from "mongodb";
import { natsConnector } from "./../nats-connector";
import { EventPublisher } from "./../event-publisher";
const attemptDunkRouter = Router();
attemptDunkRouter.post(
"/dunk-contest/attempt/:playerName",
async (req: Request, res: Response) => {
const mongoClient = await MongoClient.connect(
"mongodb://localhost:27017/dunk-service"
);
const db = mongoClient.db();
const playerCollection = db.collection("players");
const playerFound = await playerCollection.findOne({
NAME: req.params.playerName,
});
const dunkPoint: number =
(playerFound?.HEIGHT *
playerFound?.WEIGHT *
playerFound?.EXPERIENCE *
Math.random()) /
100;
await new EventPublisher(natsConnector.client).publishEvent("Dunk-Shot", {
PLAYER_NAME: playerFound?.NAME,
DUNK_POINT: dunkPoint,
});
res.send({});
mongoClient.close();
}
);
export { attemptDunkRouter };
With attemptDunkRoutes.ts
we worked on a middleware, we did the following:
- Imported
Router
,Request
andResponse
from express; - Imported MongoClient from mongodb;
- Imported
natsConnector
, an NatsConnector instance; - Imported the class
EventPublisher
; - Implemented a POST request on attemptDunkRouter.post(
"/dunk-contest/attempt/:playerName") to attempt a dunk shots for a player found in the
players collection
by player's name got withreq.params.playerName
; - MongoClient is used here to connect this process to the appropriate MongoDB pod;
- With
EventPlubilsher
class, we created a new instance that passesnatsconnector.client
as argument and callspublishEvent
function to publish an event through theDunk-Shot
channel withPLAYER_NAME
andDUNK_POINT
as event message; -
DUNK_POINT
is number calculated with the player's HEIGHT, WEIGHT, EXPERIENCE and a random number.
To wrap with service up, go ahead, move back to src/
directory, add a typescript file named index.ts
and paste the code below:
import express from "express";
import { registerPlayerRouter } from "./routes/registerPlayerRoutes";
import { attemptDunkRouter } from "./routes/attemptDunkRoutes";
import { natsConnector } from "./nats-connector";
const app = express();
app.use(express.json());
app.use(registerPlayerRouter);
app.use(attemptDunkRouter);
const start = async () => {
try {
await natsConnector.connectToNats(
"dunk-contest",
"123",
"http://localhost:4222"
);
natsConnector.client.on("close", () => {
process.exit();
});
} catch (error) {
console.error(error);
}
app.listen(4001, () => {
console.log("\x1b[36m%s\x1b[0m", "DUNK SERVICE LISTENING ON 4001");
});
};
start();
In the index.ts
file above we did the following:
- Imported
express
,Request
andResponse
from express; - Imported
registerPlayerRouter
andattemptDunkRouter
, two middlewares; - Imported
natsConnector
, an instance of class NatsConnector that was created early; - Called the express function
express()
and puts new Express application inside theapp
variable (to start a new Express application); - Used the middlewares with
app.use(registerPlayerRouter)
andapp.use(attemptDunkRouter)
; - Wrote the
start
function to connect the Express application to NATS Streaming server and have it listens for connection on port 4001.
Now generate a tsconfig.json
file to compile your TypeScript code in JavaScript code. Open your terminal, navigate back to dunk-service/
directory and run the command below:
$ tsc --init
Great, we are almost done with the Dunk Service, we shall come back later to fix some little things.
The dunk-service/
directory should look like the tree below:
.
├── package.json
├── package-lock.json
├── src
│ ├── event-publisher.ts
│ ├── index.ts
│ ├── nats-connector.ts
│ └── routes
│ ├── attemptDunkRoutes.ts
│ └── registerPlayerRoutes.ts
└── tsconfig.json
2 directories, 8 files
Let's perform a simple test to check the following:
- Dunk Service connection to its dedicated MongoDB running pod;
- Dunk Service connection to the Nats Streaming Server;
In the steps below, do the test:
Step - 1: Access to NATS Streaming server Pod
Take your pod's name by running:
$ kubectl get pods
Copy your pod's name, you will use it in the command that is coming.
Here we are going to make the NATS Streaming server pod running in the kubernetes cluster accessible in our local machine. Open the terminal, forward a local port on your machine to a port on your pod by running the following:
$ kubectl port-forward <YOUR POD NAME> 4222:4222
Make sure to paste your pod's name where indicated in the command above.
Step - 2: Access to MongoDB pod dedicated to the Dunk Service
Take your pod's name by running:
$ kubectl get pods
Copy your pod's name, you will use it in the command that is coming.
Here we are going to make the MongoDB pod of Dunk Service running in the kubernetes cluster accessible in our local machine. Open another terminal and forward a local port on your machine to a port on your pod by running the following:
$ kubectl port-forward <YOUR POD NAME> 27017:27017
Step - 3: Start the Dunk Service (Express application)
Open a third terminal in the dunk-service/
directory and run this command:
$ npm start
By now, the Dunk Service must be connected to the NATS Streaming server pod and to its MongoDB pod.
Step - 4: Open your API Client and do these tests
- Test - 1. POST request on http://localhost:4001/dunk-contest/register
Make a POST
request, with HEADERS Content-Type: application/json
and a BODY of :
{
"name": "LeBron",
"height": 2.18,
"weight": 105,
"experience": 5
}
- Test - 2. POST request on http://localhost:4001/dunk-contest/attempt/LeBron
Hopefully, you will have a similar output in your terminals as below:
2. Conclusion
In this part of the tutorial, we started to build our Dunk Contest Application with the purpose of learning how to use NATS Streaming in a microservice architecture in a Kubernetes Cluster set and running in our local machines.
The completed Application should have featured two services, the Dunk Service and Statistic Service streaming data using NATS Streaming.
In the process, we started a NATS Streaming Server running in the kubernetes cluster and two MongoDB Pods, each dedicated to a specific service. We also started the Dunk Service, which successfully registered players in its MongoDB Pod and successfully published an event to the NATS Streaming Server; event consisting of a dunk shot of a registered player.
Please stay tuned for more important NATS Streaming practices that we are going to implement in part 2 of this tutorial as we will be completing our Dunk Contest Application.
If you have questions, comments, feel free to reach out here or on my Twitter; I will be more than happy to answer and Here, you may find the project code.
See you soon.
This content originally appeared on DEV Community and was authored by Musole Masu
Musole Masu | Sciencx (2021-12-16T21:21:43+00:00) Data Streaming for Microservices using NATS Streaming – Part 1. Retrieved from https://www.scien.cx/2021/12/16/data-streaming-for-microservices-using-nats-streaming-part-1/
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.