Category Archives: MonsterMQ

Retained messages are the devil of MQTT 😈 😱

MQTT brokers handle retained messages differently – and that can have serious implications…

🔸 HiveMQ Community Edition doesn’t support to persist retained messages to disk. After a restart, they’re gone – so it’s better to go with the Enterprise Edition. Unfortunately I do not have access to the Enterprise version to perform further tests.

🔸 Mosquitto writes retained messages to disk periodically(!), default is 30 minutes! And during writing from memory to disk, publishing values pauses, it seems the broker gets frozen during writing to disk. Writing 1M retained messages to disk takes time… plus, I got it to crash at about 4460000 retained values (reproducible), and after the crash, it didn’t start anymore – until I removed the database file and lost all retained messages.

🔸 EMQX handled the test with a max of 2000 value changes per second. It was already using about 250% CPU of four cores, and I occasionally received “Mnesia is overloaded” warnings. I don’t know whether messages are being queued or dropped at that point. Writing to disk didn’t seem fast. Additionally, a broker startup after storing 1 million retained messages took a while (about 30 seconds), likely because all values are being loaded into memory.

🔸 BifroMQ handled the test at around 8000 v/s, using only about 20% of four CPU cores. I noticed that throughput decreases as the number of published topics increases. Interestingly, it limits wildcard subscriptions to just 10 retained messages – to protect the system, but that number feels a bit low. I couldn’t find a configuration setting to change it.

🔸 MonsterMQ handled the test with about 7000 v/s with 18% for the Monster plus 70% for the PostgreSQL database of four CPU cores. MonsterMQ stores retained messages in a PostgreSQL table (written in bulk). If you have a high volume of retained messages, it can lead to full queues – especially if your PostgreSQL instance can’t keep up. In that case, the broker will not persist all values.

🧨 Be careful when using retained messages!

👉 Understand and test your broker’s behavior before relying on them for data durability.

Just to note: These results are based on my own tests and specific use cases, and were performed on modest hardware. I was publishing retained messages to hierarchically structured topics (e.g., root/0/0/0/0/0/0/0/0/0/0), using numbers from 0 to 9, without any subscribers.

Replicate and Archive Data from a MQTT Broker to MonsterMQ

Here’s a straightforward example of how data replication can be achieved using the Frankenstein Automation-Gateway.com to transfer data from a remote broker to a local MonsterMQ Broker.

The local MonsterMQ Broker is configured so that data is stored in TimescaleDB to maintain a historical record. This process converts the current state of the UNS into archived historical data.

It will also create a Frankenstein OPC UA Server, allowing you to access the data from the MQTT broker. However, since we have it data-agnostic, all the data in the OPC UA Server will be available as a string data type.

Monster.yaml

Create a file monster.yaml with this content :

TCP: 1883 
WS: 1884
SSL: false
MaxMessageSizeKb: 64
QueuedMessagesEnabled: false

SessionStoreType: POSTGRES
RetainedStoreType: POSTGRES

ArchiveGroups:
  - Name: "source"
    Enabled: true
    TopicFilter: [ "source/#" ]
    RetainedOnly: false
    LastValType: NONE
    ArchiveType: POSTGRES

Postgres:
  Url: jdbc:postgresql://timescale:5432/postgres
  User: system
  Pass: manager

Frankenstein.yaml

Create a file frankenstein.yml with this content and adapt the Host of the soruce broker and the Topic paths which you want to replicate from the source to your local MonsterMQ Broker.

Servers:
  OpcUa:
    - Id: "opcsrv"
      Port: 4840
      EndpointAddresses:
        - linux0 # Change this to your hostname!
      Topics:
        - Topic: mqtt/source/path/Enterprise/Dallas/#
Drivers:
  Mqtt:
    - Id: "source"
      Enabled: true
      LogLevel: INFO
      Host: test.monstermq.com # Change this to your source MQTT Broker!
      Port: 1883
      Format: Raw
Loggers:
  Mqtt:
    - Id: "source"
      Enabled: true
      LogLevel: INFO
      Host: 172.17.0.1
      Port: 1883
      Format: Raw
      BulkMessages: false
      LogLevel: INFO
      Logging:
        - Topic: mqtt/source/path/Enterprise/Dallas/#

Docker Compose

Create a docker-compose.yaml file with this content and then start it with docker-compose up -d

services:
  timescale:
    image: timescale/timescaledb:latest-pg16
    container_name: timescale
    restart: unless-stopped
    ports:
      - "5432:5432"
    volumes:
      - timescale_data:/var/lib/postgresql/data
    environment:
      POSTGRES_USER: system
      POSTGRES_PASSWORD: manager
  monstermq:
    image: rocworks/monstermq:latest
    container_name: monstermq
    restart: unless-stopped
    ports:
      - 1883:1883
      - 1884:1884
    volumes:
      - ./log:/app/log
      - ./monster.yaml:/app/config.yaml
    command: ["+cluster", "-log FINE"]
  frankenstein:
    image: rocworks/automation-gateway:1.37.1
    container_name: frankenstein
    restart: always
    ports:
      - 1885:1883
      - 4840:4840
    environment:
      JAVA_OPTS: '-Xmx1024m'
    volumes:
      - ./frankenstein.yaml:/app/config.yaml
      - ./security:/app/security
volumes:
  timescale_data: