All posts by vogler

Replicate and Archive Data from a MQTT Broker to MonsterMQ

Here’s a straightforward example of how data replication can be achieved using the Frankenstein Automation-Gateway.com to transfer data from a remote broker to a local MonsterMQ Broker.

The local MonsterMQ Broker is configured so that data is stored in TimescaleDB to maintain a historical record. This process converts the current state of the UNS into archived historical data.

It will also create a Frankenstein OPC UA Server, allowing you to access the data from the MQTT broker. However, since we have it data-agnostic, all the data in the OPC UA Server will be available as a string data type.

Monster.yaml

Create a file monster.yaml with this content :

TCP: 1883 
WS: 1884
SSL: false
MaxMessageSizeKb: 64
QueuedMessagesEnabled: false

SessionStoreType: POSTGRES
RetainedStoreType: POSTGRES

ArchiveGroups:
  - Name: "source"
    Enabled: true
    TopicFilter: [ "source/#" ]
    RetainedOnly: false
    LastValType: NONE
    ArchiveType: POSTGRES

Postgres:
  Url: jdbc:postgresql://timescale:5432/postgres
  User: system
  Pass: manager

Frankenstein.yaml

Create a file frankenstein.yml with this content and adapt the Host of the soruce broker and the Topic paths which you want to replicate from the source to your local MonsterMQ Broker.

Servers:
  OpcUa:
    - Id: "opcsrv"
      Port: 4840
      EndpointAddresses:
        - linux0 # Change this to your hostname!
      Topics:
        - Topic: mqtt/source/path/Enterprise/Dallas/#
Drivers:
  Mqtt:
    - Id: "source"
      Enabled: true
      LogLevel: INFO
      Host: test.monstermq.com # Change this to your source MQTT Broker!
      Port: 1883
      Format: Raw
Loggers:
  Mqtt:
    - Id: "source"
      Enabled: true
      LogLevel: INFO
      Host: 172.17.0.1
      Port: 1883
      Format: Raw
      BulkMessages: false
      LogLevel: INFO
      Logging:
        - Topic: mqtt/source/path/Enterprise/Dallas/#

Docker Compose

Create a docker-compose.yaml file with this content and then start it with docker-compose up -d

services:
  timescale:
    image: timescale/timescaledb:latest-pg16
    container_name: timescale
    restart: unless-stopped
    ports:
      - "5432:5432"
    volumes:
      - timescale_data:/var/lib/postgresql/data
    environment:
      POSTGRES_USER: system
      POSTGRES_PASSWORD: manager
  monstermq:
    image: rocworks/monstermq:latest
    container_name: monstermq
    restart: unless-stopped
    ports:
      - 1883:1883
      - 1884:1884
    volumes:
      - ./log:/app/log
      - ./monster.yaml:/app/config.yaml
    command: ["+cluster", "-log FINE"]
  frankenstein:
    image: rocworks/automation-gateway:1.37.1
    container_name: frankenstein
    restart: always
    ports:
      - 1885:1883
      - 4840:4840
    environment:
      JAVA_OPTS: '-Xmx1024m'
    volumes:
      - ./frankenstein.yaml:/app/config.yaml
      - ./security:/app/security
volumes:
  timescale_data:      

From OPC UA & MQTT/SparkplugB to Snowflake with Frankenstein Automation-Gateway

In that example we take SparkplugB messages from a MQTT Broker, decode it and write it to Snowflake. And we take some OPC UA nodes and write it also to the same table.

Create a database and a schema for your destination table.

CREATE OR REPLACE SCHEMA scada;

Create the table for the incoming data:

CREATE TABLE IF NOT EXISTS scada.gateway (
  system character varying(1000) NOT NULL,
  address character varying(1000) NOT NULL,
  sourcetime timestamp with time zone NOT NULL,
  servertime timestamp with time zone NOT NULL,
  numericvalue float,
  stringvalue text,
  status character varying(30),
  CONSTRAINT gateway_pk PRIMARY KEY (system, address, sourcetime)
  );

Generate a private key:

> openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out snowflake.p8 -nocrypt

Generate a public key:

> openssl rsa -in snowflake.p8 -pubout -out snowflake.pub

Set the public key to your user:

> ALTER USER xxxxxx SET RSA_PUBLIC_KEY=’MIIBIjANBgkqh…’;

Replace MIIBIjANBgkqh… with your public key from the snowflake.pub file (without —–BEGIN PRIVATE KEY—– and without —–END PRIVATE KEY—–)

Details about creating keys can be found here

Prepare the Gateway

Add a Snowflake logger section to the gateways config.yml. In that example we take SparkplugB messages from a MQTT Broker, decode it and write it to Snowflake. And we take some OPC UA nodes and write it also to the same table.

Drivers:
  OpcUa:
    - Id: "test1"
      Enabled: true
      LogLevel: INFO
      EndpointUrl: "opc.tcp://test.monstermq.com:4840/server"
      UpdateEndpointUrl: true
      SecurityPolicy: None

  Mqtt:
    - Id: "test2"
      Enabled: true
      LogLevel: INFO
      Host: test.monstermq.com
      Port: 1883
      Format: SparkplugB

Loggers:
  Snowflake:
    - Id: "snowflake"
      Enabled: true
      LogLevel: INFO
      PrivateKeyFile: "snowflake.p8"
      Account: xx00000
      Url: https://xx00000.eu-central-1.snowflakecomputing.com:443
      User: xxxxxx
      Role: accountadmin
      Scheme: https
      Port: 443
      Database: SCADA
      Schema: SCADA
      Table: GATEWAY
      Logging:
        - Topic: opc/test1/path/Objects/Mqtt/#
        - Topic: mqtt/test2/path/spBv1.0/vogler/DDATA/+/#

The Url you can find in the Snowflake web console by going to Admin/Accounts and then hover over the “Locator” column.

Start the Gateway

> git checkout snowflake
> cd automation-gateway/source/app  
> ../gradlew run


Note: Using gradlew to start the gateway is not recommended for production. Instead, consider using a Docker image or the files from the build/distribution for a more robust setup..

Exploring My Home Automation Architecture

My home automation setup has been an ongoing project, evolving over the last 10–15 years. It started with a simple goal: to track data from my photovoltaic (PV) system, power meters, and temperature sensors, all connected through Raspberry Pi devices. It started with Oracle and over time, it’s grown into a more complex architecture that incorporates multiple layers of data collection, processing, and visualization.

1. Data Collection with Raspberry Pi and MQTT

At the core of my setup are Raspberry Pi devices that connect various sensors, including those for monitoring power generation (PV) by bluetooth, power consumption by meters giving digital signals, and temperature sensors. These Pi devices act as data collectors, feeding data into a local Mosquitto broker. A local broker on the device can serve as a short-term buffer, before it’s synchronized to my central MonsterMQ broker, by using a persistant session and QoS>0.

2. MonsterMQ Broker as the Central Hub

The MonsterMQ broker is the central point where data from all sources is collected. It serves as a bridge, collecting data from the local Mosquitto broker and preparing it for further processing and storage. Before building my own broker, MonsterMQ, I used Mosquitto. Now that I have my own broker, I use MonsterMQ, both to ensure it gets thoroughly tested and to leverage its features. Additionally, in the future, I can use MonsterMQ to store incoming values directly in Apache Kafka. As a database engineer, I appreciate MonsterMQ because it allows me to view the broker’s current state by querying a PostgreSQL database. This lets me see connected clients, their connection details, source IP addresses, and all subscriptions with their parameters..

3. Automation-Gateway for Data Flexibility

To expand the possibilities of what I can do with the data, I use the Automation-Gateway. This tool collects values from MonsterMQ and serves two primary functions:

  • Integration with Apache Kafka: By publishing data to Apache Kafka, I maintain a reliable stream that acts as an intermediary between my data sources and the storage databases. This setup provides resilience, allowing me to manage and maintain the databases independently while keeping the data history intact in Kafka.
  • OPC UA Server Exposure: The Automation-Gateway also exposes data as an OPC UA server, making it accessible to industrial platforms and clients that communicate over OPC UA. This can be achieved just with a simple YAML configuration file.

4. Experimental Integrations: Ignition and WinCC Unified

On top of this setup, I’ve added experimental connections to Ignition and WinCC Unified. Both of these platforms connect to the Automation-Gateway OPC UA Server. Just for testing those systems are publishing values to my public MQTT broker at test.monstermq.com. While these integrations aren’t necessary, they’re helpful for testing and exploring new capabilities.

5. Long-Term Data Storage with TimescaleDB, QuestDB, and InfluxDB

Data from Kafka is stored in multiple databases:

  • InfluxDB: My home-automation started with Oracle and then moved to InfluxDB
  • TimescaleDB: Since I am still an advanced SQL user, I needed a database with strong SQL capabilities. Therefore, I added TimescaleDB and imported historical data into it.

Amount of records as of today: 1_773_659_197

Additinally the Automation-Gateway is writing the data now to QuestDB. It is used for experimental data logging and alternative time-series database exploration. Potentially will replace the other databases. I was blown away by how quickly I was able to import 1.5 billion historical data points into QuestDB.

These databases serve as long-term storage solutions, allowing me to create detailed dashboards in Grafana. By keeping Kafka as the layer between my data sources and the databases, I ensure flexibility for database maintenance, as Kafka retains the historical data.

6. Data Logging with MonsterMQ

The public MonsterMQ broker is configured to write data of topics below “grafana/#” directly into a TimescaleDB table. This setup allows you to see updates in Grafana whenever new data is published. In this specific Grafana dashboard configuration, if you publish a JSON object with a key ‘value’ and a numeric value, such as {“value”: 42}, it will appear on the dashboard almost instantly. Here is a public dashboard.

select 
  time, array_to_string(topic,'/') as topic, 
  (payload_json->>'value')::numeric as value 
from grafanaarchive
where $__timeFilter(time)
and payload_json->>'value' is not null
and payload_json->>'value' ~ '^[0-9]+(\.[0-9]+)?$'
order by time asc

7. SparkplugB Decoding with MonsterMQ

The public MonsterMQ broker is configured to decode and expand SparkplugB messages. Expanded messages can be found under the topic “spBv1.0e“. Ignition publishes some of my home automation data via SparkplugB to the public broker, and you’re welcome to publish your own SparkplugB messages here as well.

Final Thoughts

This setup is the result of years of experimentation and adaptation to new tools. In theory, I could simplify parts of it, for example, by replacing more components with the Automation-Gateway. But I appreciate having Kafka as the buffer between data sources and databases – it offers flexibility for maintenance and helps preserve historical data.

Feel free to test the public MonsterMQ broker at test.monstermq.com. And if you’re curious, publish a JSON object with a “value” key to grafana/something to see it immediately reflected in the Grafana dashboard!

MonsterMQ with Grafana

Because MonsterMQ can store topic values directly in PostgreSQL/Timescale, you can instantly create dashboards with Grafana! 📊

Here’s a simple example:

👉 Check out the live dashboard

It’s super easy to get started. Use the public available MonsterMQ at test.monstermq.com at port 1883. Just publish a JSON string to any topic under “Test”, like “Test/Sensor1” with a payload like this: {“value”: 1}, and you’ll see the value reflected in the Grafana dashboard in real time.

I’m currently publishing temperature sensor values to the public broker from my home automation using automation-gateway.com. Just that you see some values in the dashboard.

So, if you need a broker to store your IoT data directly into TimescaleDB without the need for any additional components, consider using MonsterMQ. It’s free and available at MonsterMQ.com.

Here is the exmaple docker-compose.yml file of the public availble test.monstermq.com broker:

services:
  timescale:
    image: timescale/timescaledb:latest-pg16
    restart: unless-stopped
    ports:
      - "5432:5432"
    volumes:
      - /data/timescale:/var/lib/postgresql/data
    environment:
      POSTGRES_USER: system
      POSTGRES_PASSWORD: xxx
  monstermq:
    image: rocworks/monstermq:latest
    restart: unless-stopped
    ports:
      - 1883:1883
    volumes:
      - ./config.yaml:/app/config.yaml
    command: ["+cluster", "-log INFO"]
  pgadmin:
    image: dpage/pgadmin4
    restart: unless-stopped
    environment:
      PGADMIN_DEFAULT_EMAIL: andreas.vogler@rocworks.at
      PGADMIN_DEFAULT_PASSWORD: xxx
    volumes:
      - /data/pgadmin:/var/lib/pgadmin/storage
    ports:
      - "8080:80"
  grafana:
    image: grafana/grafana
    restart: unless-stopped
    ports:
    - 80:3000

Here is the MonsterMQ config.yml file:

Port: 1883
SSL: false
WS: true
TCP: true

SessionStoreType: POSTGRES
RetainedStoreType: POSTGRES

SparkplugMetricExpansion:
  Enabled: true

ArchiveGroups:
  - Name: "All"
    Enabled: true
    TopicFilter: [ "#" ]
    RetainedOnly: false
    LastValType: POSTGRES
    ArchiveType: NONE
  - Name: "Test"
    Enabled: true
    TopicFilter: [ "Test/#" ]
    RetainedOnly: false
    LastValType: NONE
    ArchiveType: POSTGRES

Postgres:
  Url: jdbc:postgresql://timescale:5432/monster
  User: system
  Pass: xxx

Give it a try and let me know what you think!

#iot #mqtt #monstermq #timescale #postgresql #grafana

Public MonsterMQ 👽 Broker for testing !

👉 I’ve just installed MonsterMQ on a public virtual machine, hosted by Hetzner – thanks to Jeremy Theocharis awesome post! You can try it out at test.monstermq.com via TCP or Websockets at port 1883. No password, no security. If you want to leave me a message, then use your name as ClientId 😊

🔍 Want to take a look at the TimescaleDB behind it? Connect to the database on the default port 5432 using the (readonly) user “monster” with the password “monster”.

😲 I’ve intentionally set it to store all messages, not just retained ones, in a table “alllastval” for testing purposes.

📈 Additionally, messages published on topics matching “Test/#” will be archived in a history table “testarchive”!

ℹ️ Keep in mind, it’s hosted on a small machine, and every published value is being written and updated in a PostgreSQL table. So, please don’t expect massive throughput or run performance tests.

I’d love for you to try it out. If you find any issues, let me know, or drop an issue on GitHub!

Publish OPC UA and MQTT Data to the Cloud with Automation-Gateway

Publish OPC UA and MQTT Data to the Cloud with Automation-Gateway – inspired by a users request 💡

If you have a local 𝗢𝗣𝗖 𝗨𝗔 server or 𝗠𝗤𝗧𝗧 broker and want to bring that data to a 𝗰𝗹𝗼𝘂𝗱-𝗯𝗮𝘀𝗲𝗱 dashboard, Automation-Gateway.com makes it simple. You can easily publish your data to 𝗜𝗻𝗳𝗹𝘂𝘅𝗗𝗕 Cloud and visualize it in 𝗚𝗿𝗮𝗳𝗮𝗻𝗮 — all without complex setups.

I recently added support for InfluxDB V2 to the gateway, allowing you to configure an Influx token and bucket for data publishing. With just a few steps, your local OPC UA or MQTT data can be 𝘀𝘁𝗼𝗿𝗲𝗱 𝗶𝗻 𝘁𝗵𝗲 𝗰𝗹𝗼𝘂𝗱 and displayed in Grafana in real time.

MQTT Server Interface for WinCC OA? Made with Kotlin 😲


Starting with WinCC OA Version 3.20, you can write your business logic in JavaScript and run them using Node.js, providing direct access to the WinCC OA Runtime.

🙈 With that, I have developed a Kotlin program that acts as an MQTT Broker. When you subscribe to a topic (where the topic name matches a datapoint name), the program will send value changes from the corresponding WinCC OA datapoint to your MQTT client.

❓ But wait, Kotlin is like Java, it runs on the JVM, it is not JavaScript!

💡 Did you know that a Node.js Runtime built with GraalVM exists? It allows you to mix Java and JavaScript. And it also works with WinCC OA.

🤩 You can use JVM based languages and its huge ecosystem to develop business logic with WinCC OA. I have developed a Java library which makes it easier to use the WinCC OA JavaScript functions in Java.

👉 Here it is: https://github.com/vogler75/winccoa-graalvm please note that the example program is provided as an example; it lacks security features and has not been tested for production use. However, it can be extended and customized to meet specific requirements.

⚡ Please be aware that the GraalVM Node.js Runtime is not officially supported by WinCC Open Architecture.

Zenoh & Automation-Gateway.com

OPC UA data to Zenoh? Have you ever used zenoh.io? It’s really cool.

👉 I have implemented a Zenoh publisher in my fun open-source project, the Automation-Gateway.com. It can now bring data from OPC UA/MQTT/PLC4X to Zenoh, with just some lines of configuration.

😎 The cool point about Zenoh is that you publish the data from multiple sources, from multiple machines to the data centric Zenoh network – there is no central server. You can then startup a Zenoh client somewhere and subscribe to data coming from any of those sources.

🤫 It is like a distributed UNS.

🤩 And there is even more, they have a Zenoh MQTT bridge, so MQTT clients can connect to this bridge and subscribe to all the available data in the Zenoh network. A new machine/HMI/publisher can be added on the fly and the data will be visible immediately in the Zenoh MQTT bridge.

👉 See the screenshots. I deployed the gateway to two machines to publish data from a S7 and from a WinCC OA system to the Zenoh network. Then I started a Zenoh MQTT bridge to subscribe to some of the data with a MQTT client.

🤠 Disclaimer: I did not take care about security. I did not do any performance tests.