How to keep a history of MQTT data with Node.js

The MQTT protocol is very popular in IoT applications. It is a simple way to connect different data sources
with your application by using a publish/subscribe model. Sometimes you may want to keep a history of your MQTT data to
use it for model trainin…


This content originally appeared on DEV Community 👩‍💻👨‍💻 and was authored by Alexey Timin

The MQTT protocol is very popular in IoT applications. It is a simple way to connect different data sources
with your application by using a publish/subscribe model. Sometimes you may want to keep a history of your MQTT data to
use it for model training, diagnostics or metrics. If your data sources provide different formats of data that can
not be interpreted as time series of floats, ReductStore is what you need.

Let's make a simple MQTT application to see how it works.

Prerequisites

For this usage example we have the following requirements:

  • Linux AMD64
  • Docker and Docker Compose
  • NodeJS >= 16

If you're an Ubuntu user, use this command to install the dependencies:

$ sudo apt-get update
$ sudo apt-get install docker-compose nodejs

Run MQTT Broker and ReductStore with Docker Compose

The easiest way to run the broker and the storage is to use Docker Compose. So we should create a docker-compose.yml
file in the example's folder with the services:

version: "3"
services:
  reduct-storage:
    image: reductstore/reductstore:latest
    volumes:
      - ./data:/data
    ports:
      - "8383:8383"

  mqtt-broker:
    image: eclipse-mosquitto:1.6
    ports:
      - "1883:1883"

Then run the configuration:

docker-compose up

Docker Compose downloaded the images and ran the containers. Pay attention that we published ports 1883 for MQTT
protocol and 8383 for ReductStore HTTP API.

Write NodeJS script

Now we're ready to make hands dirty with code. Let's initialize the NPM package and
install MQTT Client and
JavaScript Client SDK.

$ npm init
$ npm install --save reduct-js async-mqtt 

When we have all the dependencies installed, we can write the script:

const MQTT = require('async-mqtt');
const {Client} = require('reduct-js');

MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
    await mqttClient.subscribe('mqtt_data');

    const reductClient = new Client('http://localhost:8383');
    const bucket = await reductClient.getOrCreateBucket('mqtt');

    mqttClient.on('message', async (topic, msg) => {
        const record = await bucket.beginWrite(topic);
        await record.write(msg)
        console.log('Received message "%s" from topic "%s" is written', msg,
            topic);
    });

}).catch(error => console.error(error));

Let's look at the code in detail. First, we have to connect to the MQTT broker and subscribe to a topic. The topic name
just random string, which producers should know. In our case, it is mqtt_data:


MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
    await mqttClient.subscribe('mqtt_data');

    // rest of code
}).catch(error => console.error(error));

If the MQTT connection is successful, we can start dealing with ReductStore. To start writing data there, we need a
bucket. We create a bucket with the name mqtt or get an existing one:

const reductClient = new Client('http://localhost:8383');
const bucket = await reductClient.getOrCreateBucket('mqtt');

The last step is to write the received message to the storage. We must use a callback
for event message, to catch it. Then we write the message to entry mqtt_data:

mqttClient.on('message', async (topic, msg) => {
    const record = await bucket.beginWrite(topic);
    await record.write(msg)
    console.log('Received message "%s" from topic "%s" was written', data,
        topic);
});

When we call bucket.beginWrite we create an entry in the bucket if it doesn't exist yet. Then we write data to the
entry with the current timestamp. Now our MQTT data is safe and sound in the storage, and we can access them by using
the same SDK.

Publish data to MQTT topic

When you launch the script, it does nothing because there is no data from MQTT. You have to publish something to topic
mqtt_data. I prefer to use mosquitto_pub. For Ubuntu users, it is a
part of the mosquitto-clients package:

$ sudo apt-get install mosquitto-clients
$ mosuitto_pub -t mqtt_data -m "Hello, world!"

Getting data from ReductStore

Now you know how to get data from MQTT and write it to ReductStore, but we need a little NodejS script to read the data
from the storage:

const {Client} = require('reduct-js');

const client = new Client('http://localhost:8383');

client.getBucket('mqtt').then(async (bucket) => {
    const record  = await bucket.beginRead('mqtt_data');
    console.log('Last record: %s', await record.readAsString());

    // Get data for lash hour
    const stopTime = BigInt(Date.now() * 1000);
    const startTime = stopTime - 3_600_000_000n;

    for await (const record of bucket.query('mqtt_data', startTime, stopTime)) {
        const data = await record.read();
        console.log('Found record "%s" with timestamp "%d"', data, record.time);
    }

}).catch(error => console.error(error));

To read the latest record in the entry is very easy:

const record  = await bucket.beginRead('mqtt_data');
const data = await record.readAsString();

To take data for timeinterval, we can use the query method. It returns an async iterator, so we can use for await
loop to iterate over the records:

const stopTime = BigInt(Date.now() * 1000);
const startTime = stopTime - 3_600_000_000n;

for await (const record of bucket.query('mqtt_data', startTime, stopTime)) {
    const data = await record.read();
    console.log('Found record "%s" with timestamp "%d"', data, record.time);
}

Pay attention, the storage uses timestamps with microsecond precision, so we can't use Date class and number type.
What is why we use BigInt.

Conclusion

As you can see, the MQTT protocol and ReductStore very simple technologies that can be used together very easily in
NodeJS.
You can find the source code of the example here. If you have any questions or problems running it. Feel free to
make an issue.

I hope this tutorial has been helpful. Thanks!

Links


This content originally appeared on DEV Community 👩‍💻👨‍💻 and was authored by Alexey Timin


Print Share Comment Cite Upload Translate Updates
APA

Alexey Timin | Sciencx (2022-12-29T22:23:37+00:00) How to keep a history of MQTT data with Node.js. Retrieved from https://www.scien.cx/2022/12/29/how-to-keep-a-history-of-mqtt-data-with-node-js/

MLA
" » How to keep a history of MQTT data with Node.js." Alexey Timin | Sciencx - Thursday December 29, 2022, https://www.scien.cx/2022/12/29/how-to-keep-a-history-of-mqtt-data-with-node-js/
HARVARD
Alexey Timin | Sciencx Thursday December 29, 2022 » How to keep a history of MQTT data with Node.js., viewed ,<https://www.scien.cx/2022/12/29/how-to-keep-a-history-of-mqtt-data-with-node-js/>
VANCOUVER
Alexey Timin | Sciencx - » How to keep a history of MQTT data with Node.js. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2022/12/29/how-to-keep-a-history-of-mqtt-data-with-node-js/
CHICAGO
" » How to keep a history of MQTT data with Node.js." Alexey Timin | Sciencx - Accessed . https://www.scien.cx/2022/12/29/how-to-keep-a-history-of-mqtt-data-with-node-js/
IEEE
" » How to keep a history of MQTT data with Node.js." Alexey Timin | Sciencx [Online]. Available: https://www.scien.cx/2022/12/29/how-to-keep-a-history-of-mqtt-data-with-node-js/. [Accessed: ]
rf:citation
» How to keep a history of MQTT data with Node.js | Alexey Timin | Sciencx | https://www.scien.cx/2022/12/29/how-to-keep-a-history-of-mqtt-data-with-node-js/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.