Back to blog

MQTT Deep Dive: The IoT Messaging Protocol

iotmqttprotocolsbackendtutorial
MQTT Deep Dive: The IoT Messaging Protocol

Welcome to IOT-5 in the IoT Patterns & Strategies Roadmap! In Phase 2, we covered MQTT basics — pub/sub model, Mosquitto setup, QoS levels, and a v3.1.1 vs v5.0 comparison. Now it's time to go production-grade.

Running MQTT for a hobby project with 10 devices is trivial. Running it for 100,000+ devices with guaranteed delivery, authentication, encryption, load balancing, and zero downtime — that's where things get interesting.

This post covers everything you need to design, secure, and scale MQTT for real production IoT systems.

What You'll Learn

✅ Design MQTT topic hierarchies that scale with your device fleet
✅ Choose the right QoS level for each data type with real trade-off analysis
✅ Configure clean vs persistent sessions for different device behaviors
✅ Secure MQTT with TLS, mTLS, username/password, and JWT authentication
✅ Implement fine-grained authorization with ACLs
✅ Set up broker clustering with EMQX for high availability
✅ Scale MQTT to handle millions of concurrent connections
✅ Monitor broker health and detect problems before they escalate


Topic Design Patterns

Your topic hierarchy is the backbone of your MQTT system. A bad topic design leads to wildcard explosions, impossible filtering, and painful migrations. A good one scales to millions of devices.

Topic Hierarchy Best Practices

{org}/{site}/{zone}/{device-type}/{device-id}/{data-type}

Example:

acme/factory-berlin/zone-a/temperature-sensor/ts-0042/telemetry
acme/factory-berlin/zone-a/temperature-sensor/ts-0042/status
acme/factory-berlin/zone-a/temperature-sensor/ts-0042/alerts
acme/factory-berlin/zone-b/motor/motor-007/telemetry

Rules:

  1. Use lowercase with hyphenstemperature-sensor, not TemperatureSensor or temperature_sensor
  2. Be specific from left to right — broadest category first, most specific last
  3. Never start with / — leading slash creates an empty first level (/topic has 2 levels, topic has 1)
  4. Keep levels meaningful — every level should serve a filtering purpose
  5. Separate data types — telemetry, commands, status, and alerts should be different topics
  6. Include device ID — enables per-device subscriptions and ACLs

Wildcard Subscriptions

MQTT provides two wildcards. Understanding them is critical for designing topics that work at scale:

WildcardSymbolMatchesExample
Single-level+Exactly one levelfactory/+/temperature matches factory/zone-a/temperature but NOT factory/zone-a/rack-1/temperature
Multi-level#Zero or more levelsfactory/# matches factory/zone-a/temperature AND factory/zone-a/rack-1/motor/vibration

Topic Design Patterns for Common Scenarios

Pattern 1: Telemetry + Commands (Bidirectional)

# Device publishes telemetry (device → cloud)
devices/{device-id}/telemetry
 
# Cloud publishes commands (cloud → device)
devices/{device-id}/commands
 
# Device publishes command responses
devices/{device-id}/commands/response

Pattern 2: Request-Response over MQTT

MQTT is inherently pub/sub, but you can implement request-response:

# Requester publishes with correlation ID
requests/{target-device}/rpc
  payload: { "id": "req-123", "method": "getConfig", "replyTo": "responses/requester-1" }
 
# Target publishes response to the replyTo topic
responses/requester-1
  payload: { "id": "req-123", "result": { "interval": 60, "threshold": 35 } }

MQTT v5.0 has native support for this with Response Topic and Correlation Data properties.

Pattern 3: Shared Subscriptions (MQTT v5.0)

Shared subscriptions distribute messages across multiple subscribers — native load balancing:

# Standard subscription — all subscribers get every message
devices/+/telemetry
 
# Shared subscription — messages distributed round-robin
$share/processing-group/devices/+/telemetry

Before v5.0, you had to implement this yourself with application-level load balancing or use broker-specific extensions (EMQX and HiveMQ supported shared subscriptions before the spec).

Anti-Patterns to Avoid

❌ devices/telemetry          → No device ID — can't filter per device
❌ ts-0042                     → No hierarchy — impossible to use wildcards
❌ /devices/ts-0042/data      → Leading slash creates empty level
❌ devices/TS_0042/Data       → Inconsistent casing
❌ devices/ts-0042            → Single topic for everything — can't separate telemetry from commands

QoS Levels In Depth

IOT-3 introduced QoS 0, 1, and 2. Now let's dig into the real trade-offs and when each one actually breaks down.

QoS 0: At Most Once (Fire and Forget)

Behavior:

  • Client sends PUBLISH, moves on immediately
  • No PUBACK, no retry
  • Message can be lost if network hiccups

When to use:

  • High-frequency sensor telemetry (temperature every second)
  • Data where one lost reading doesn't matter (the next one arrives in 1 second)
  • Bandwidth-constrained links where overhead matters

When it breaks: If your network drops packets frequently and you're using QoS 0 for critical alerts, you'll miss them silently.

QoS 1: At Least Once

Behavior:

  • Client waits for PUBACK
  • Retries if no acknowledgment within timeout
  • Broker stores message until delivery confirmed
  • Duplicates are possible — subscriber may get the same message twice

When to use:

  • Sensor readings where accuracy matters (energy meters, billing)
  • State changes (door open/closed, device online/offline)
  • Any data you can't afford to lose but can handle duplicates

Handling duplicates in your application:

class IdempotentProcessor {
  private processed = new Set<string>();
  private maxSetSize = 100_000;
 
  process(message: { messageId: string; payload: any }): boolean {
    // Skip if already processed
    if (this.processed.has(message.messageId)) {
      console.log(`Duplicate skipped: ${message.messageId}`);
      return false;
    }
 
    // Evict oldest entries if set grows too large
    if (this.processed.size >= this.maxSetSize) {
      const first = this.processed.values().next().value;
      this.processed.delete(first);
    }
 
    this.processed.add(message.messageId);
    return true; // Process this message
  }
}

QoS 2: Exactly Once

Behavior:

  • Four-packet handshake: PUBLISH → PUBREC → PUBREL → PUBCOMP
  • Guarantees message delivered exactly once
  • Both client and broker maintain state for the packet ID

When to use:

  • Financial transactions (billing events)
  • Critical commands (shutdown, valve open/close)
  • Audit logs where duplicates corrupt compliance data

The cost:

  • 4x the packets compared to QoS 0
  • Higher latency (must complete full handshake)
  • More broker memory (stores in-flight state)
  • Lower throughput — broker handles fewer messages per second

QoS Selection Matrix

Data TypeQoSWhy
Temperature every 1s0High frequency, next reading replaces lost one
Energy meter reading (billing)1Can't lose, idempotent by timestamp
Device status (online/offline)1 + retainedMust persist for late subscribers
Emergency shutdown command2Must execute exactly once
Firmware update trigger2Duplicate trigger could corrupt update
Heartbeat / keepalive0Missing one is fine, absence triggers LWT
Config change from cloud1Retry acceptable, device validates config

QoS Downgrade

Important: QoS is per subscription, not per message. If a publisher sends at QoS 2 but the subscriber subscribes at QoS 1, the broker downgrades delivery to QoS 1 for that subscriber.

Publisher → PUBLISH QoS 2 → Broker → Subscriber (subscribed QoS 1) → delivers at QoS 1

This means the publisher's effort of QoS 2 is wasted for subscribers who don't need it. Design your system so publishers and subscribers agree on the minimum QoS needed.


Session Management

Sessions determine what happens when a device disconnects and reconnects. Getting this right is critical for devices that sleep (battery-powered) or have flaky connectivity.

Clean Session vs Persistent Session

AspectClean Session (clean=true)Persistent Session (clean=false)
SubscriptionsLost on disconnectRemembered by broker
Queued messagesDiscardedStored until reconnect
In-flight QoS 1/2LostResumed on reconnect
Client IDCan be empty (v3.1.1)Must be unique and stable
Broker memoryLowGrows per disconnected device
Best forEphemeral clients, dashboardsBattery-powered sensors, mobile devices

MQTT v5.0 Session Improvements

MQTT v5.0 replaces the boolean cleanSession with two fields for finer control:

import mqtt from 'mqtt';
 
// MQTT v5.0 — fine-grained session control
const client = mqtt.connect('mqtt://broker:1883', {
  protocolVersion: 5,
  clientId: 'sensor-ts-0042',
 
  // Clean Start: true = discard existing session, false = resume
  clean: false,
 
  properties: {
    // Session Expiry Interval: how long broker keeps session after disconnect
    // 0 = delete immediately (equivalent to clean=true in v3.1.1)
    // 4294967295 (0xFFFFFFFF) = never expire
    sessionExpiryInterval: 3600, // Keep session for 1 hour after disconnect
  },
});
 
client.on('connect', (connack) => {
  if (connack.sessionPresent) {
    console.log('Resumed existing session — subscriptions intact');
    // No need to re-subscribe — broker remembers
  } else {
    console.log('New session — subscribing to topics');
    client.subscribe('devices/ts-0042/commands', { qos: 1 });
  }
});

Why this matters: In v3.1.1, persistent sessions live forever (or until broker restarts). If you have 100,000 devices that connect once and never come back, the broker accumulates 100,000 dead sessions consuming memory. V5.0's sessionExpiryInterval lets sessions auto-expire.

Message Expiry (MQTT v5.0)

Queued messages for offline devices can grow stale. MQTT v5.0 lets publishers set expiry:

client.publish('devices/ts-0042/commands', JSON.stringify({
  action: 'recalibrate',
  timestamp: Date.now(),
}), {
  qos: 1,
  properties: {
    messageExpiryInterval: 300, // Expire after 5 minutes
  },
});
 
// If device reconnects after 10 minutes, this command is gone
// — it wouldn't make sense to recalibrate with stale parameters anyway

Securing MQTT

An unsecured MQTT broker is an open door to your entire IoT fleet. Anyone can subscribe to # and see all your data, or publish fake commands to your actuators.

Layer 1: Transport Security (TLS)

Encrypt all traffic between clients and broker:

Mosquitto TLS configuration:

# /etc/mosquitto/conf.d/tls.conf
 
# Disable plaintext listener
# listener 1883
 
# TLS listener
listener 8883
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
tls_version tlsv1.3
 
# Require client certificates (mTLS)
require_certificate true
use_identity_as_username true

Layer 2: Authentication

Option A: Username/Password

Simple but limited — passwords can leak, hard to rotate at scale.

# /etc/mosquitto/conf.d/auth.conf
allow_anonymous false
password_file /etc/mosquitto/passwd
# Create password file
mosquitto_passwd -c /etc/mosquitto/passwd sensor-ts-0042
# Enter password when prompted

Option B: Client Certificates (mTLS)

Each device has a unique certificate — strongest authentication for IoT:

# Generate CA (Certificate Authority)
openssl genrsa -out ca.key 4096
openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 \
  -out ca.crt -subj "/CN=IoT CA"
 
# Generate device certificate
openssl genrsa -out device-ts-0042.key 2048
openssl req -new -key device-ts-0042.key \
  -out device-ts-0042.csr -subj "/CN=sensor-ts-0042"
openssl x509 -req -in device-ts-0042.csr -CA ca.crt -CAkey ca.key \
  -CAcreateserial -out device-ts-0042.crt -days 365 -sha256

Option C: JWT Token Authentication

For devices that authenticate through an external identity provider:

import mqtt from 'mqtt';
import jwt from 'jsonwebtoken';
 
// Generate short-lived JWT for device authentication
function getDeviceToken(deviceId: string): string {
  return jwt.sign(
    {
      sub: deviceId,
      permissions: ['publish:devices/+/telemetry', 'subscribe:devices/+/commands'],
      iat: Math.floor(Date.now() / 1000),
    },
    process.env.JWT_SECRET!,
    { expiresIn: '1h' }
  );
}
 
// Device connects with JWT as password
const client = mqtt.connect('mqtts://broker:8883', {
  clientId: 'sensor-ts-0042',
  username: 'sensor-ts-0042',
  password: getDeviceToken('sensor-ts-0042'),
});

This requires a broker plugin to validate JWTs. EMQX and HiveMQ both support JWT auth out of the box.

Layer 3: Authorization (ACLs)

Authentication tells you who the client is. Authorization controls what they can do.

# /etc/mosquitto/conf.d/acl.conf
acl_file /etc/mosquitto/acl
 
# /etc/mosquitto/acl — Access Control List
# Sensors can only publish to their own topics
user sensor-ts-0042
topic write devices/sensor-ts-0042/telemetry
topic write devices/sensor-ts-0042/status
topic read devices/sensor-ts-0042/commands
 
# Dashboard users can read everything but write nothing
user dashboard-user
topic read devices/#
 
# Admin can do everything
user admin
topic readwrite #
 
# Pattern-based ACL (dynamic per client)
pattern write devices/%c/telemetry
pattern read devices/%c/commands

EMQX ACL with built-in database:

# EMQX supports ACL rules via API
curl -X POST http://localhost:18083/api/v5/authorization/sources/built_in_database/rules/users \
  -H "Content-Type: application/json" \
  -d '{
    "username": "sensor-ts-0042",
    "rules": [
      {"action": "publish", "topic": "devices/sensor-ts-0042/telemetry", "permission": "allow"},
      {"action": "publish", "topic": "devices/sensor-ts-0042/status", "permission": "allow"},
      {"action": "subscribe", "topic": "devices/sensor-ts-0042/commands", "permission": "allow"},
      {"action": "all", "topic": "#", "permission": "deny"}
    ]
  }'

Security Checklist

LayerMinimumRecommendedProduction
TransportTLS 1.2TLS 1.3TLS 1.3 + certificate pinning
AuthenticationUsername/passwordClient certificates (mTLS)mTLS + short-lived tokens
AuthorizationBasic ACL filePattern-based ACLsDynamic ACLs via database/API
PayloadNoneNone (TLS encrypts transport)Application-layer encryption for sensitive data
MonitoringBroker logsFailed auth alertsReal-time anomaly detection

Broker Clustering

A single MQTT broker is a single point of failure. For production, you need clustering — multiple broker nodes that share state and distribute load.

Why Cluster?

MetricSingle BrokerClustered (3 nodes)
Concurrent connections~100K (depends on hardware)~300K+
AvailabilityOne node failure = total outageTolerates 1 node failure
ThroughputLimited by single CPUHorizontal scaling
MaintenanceDowntime for updatesRolling upgrades

EMQX Cluster Setup

EMQX is the most popular open-source MQTT broker for production clustering. It supports automatic discovery, session transfer, and horizontal scaling.

Docker Compose for EMQX cluster:

# docker-compose.emqx-cluster.yml
version: "3.8"
 
services:
  emqx1:
    image: emqx/emqx:5.5
    container_name: emqx1
    environment:
      - EMQX_NODE_NAME=emqx@emqx1
      - EMQX_CLUSTER__DISCOVERY_STRATEGY=static
      - EMQX_CLUSTER__STATIC__SEEDS=[emqx@emqx1,emqx@emqx2,emqx@emqx3]
    ports:
      - "1883:1883"    # MQTT
      - "8083:8083"    # WebSocket
      - "8883:8883"    # MQTT/TLS
      - "18083:18083"  # Dashboard
    networks:
      - emqx-net
 
  emqx2:
    image: emqx/emqx:5.5
    container_name: emqx2
    environment:
      - EMQX_NODE_NAME=emqx@emqx2
      - EMQX_CLUSTER__DISCOVERY_STRATEGY=static
      - EMQX_CLUSTER__STATIC__SEEDS=[emqx@emqx1,emqx@emqx2,emqx@emqx3]
    ports:
      - "1884:1883"
    networks:
      - emqx-net
 
  emqx3:
    image: emqx/emqx:5.5
    container_name: emqx3
    environment:
      - EMQX_NODE_NAME=emqx@emqx3
      - EMQX_CLUSTER__DISCOVERY_STRATEGY=static
      - EMQX_CLUSTER__STATIC__SEEDS=[emqx@emqx1,emqx@emqx2,emqx@emqx3]
    ports:
      - "1885:1883"
    networks:
      - emqx-net
 
  haproxy:
    image: haproxy:2.8
    container_name: mqtt-lb
    ports:
      - "1880:1883"
    volumes:
      - ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
    depends_on:
      - emqx1
      - emqx2
      - emqx3
    networks:
      - emqx-net
 
networks:
  emqx-net:
    driver: bridge

HAProxy configuration for MQTT load balancing:

# haproxy.cfg
global
    log stdout format raw local0
 
defaults
    mode tcp
    timeout connect 5s
    timeout client 120s
    timeout server 120s
 
frontend mqtt_frontend
    bind *:1883
    default_backend mqtt_backend
 
backend mqtt_backend
    balance leastconn
    # Sticky sessions by client IP — important for MQTT persistent connections
    stick-table type ip size 200k expire 30m
    stick on src
 
    server emqx1 emqx1:1883 check inter 5s fall 3 rise 2
    server emqx2 emqx2:1883 check inter 5s fall 3 rise 2
    server emqx3 emqx3:1883 check inter 5s fall 3 rise 2

Start the cluster:

docker compose -f docker-compose.emqx-cluster.yml up -d
 
# Check cluster status
docker exec emqx1 emqx ctl cluster status
# Cluster status: running
# Node: emqx@emqx1 (running)
# Node: emqx@emqx2 (running)
# Node: emqx@emqx3 (running)

How Cluster Routing Works

When a device publishes to Node 1, but subscribers are connected to Node 2 and Node 3:

Each node maintains a global route table — a mapping of topic subscriptions to nodes. When a message arrives, the receiving node forwards it to all nodes that have matching subscribers. This is how MQTT clustering achieves horizontal scaling without clients needing to know which node holds their data.

HiveMQ Alternative

HiveMQ is the enterprise alternative — commercial with a free community edition:

# docker-compose.hivemq-cluster.yml
version: "3.8"
services:
  hivemq1:
    image: hivemq/hivemq-ce:latest
    container_name: hivemq1
    environment:
      - HIVEMQ_CLUSTER_TRANSPORT_TYPE=TCP
    ports:
      - "1883:1883"
      - "8080:8080"
    networks:
      - hivemq-net
 
  hivemq2:
    image: hivemq/hivemq-ce:latest
    container_name: hivemq2
    environment:
      - HIVEMQ_CLUSTER_TRANSPORT_TYPE=TCP
    ports:
      - "1884:1883"
    networks:
      - hivemq-net
 
networks:
  hivemq-net:
    driver: bridge

Broker Comparison

FeatureEMQXHiveMQ CEHiveMQ EnterpriseMosquitto
ClusteringYes (built-in)Yes (limited)Yes (full)No
Max connections100M+ (tested)~100KMillions~100K
Shared subscriptionsYesYesYesNo (v2.0)
Rule engineYes (SQL-like)NoYesNo
WebSocketYesYesYesYes
DashboardYes (web UI)NoYesNo
LicenseApache 2.0Apache 2.0CommercialEPL/EDL
Best forProduction IoTDev/small prodEnterpriseDev/prototyping

Scaling MQTT

Connection Scaling

Each MQTT connection consumes broker memory for:

  • TCP socket buffer (~4 KB)
  • Session state (~1-2 KB for clean sessions)
  • Subscription routing entries (~100 bytes per subscription)
  • In-flight message buffers (QoS 1/2)

Rule of thumb: 1 million connections ≈ 8-16 GB RAM (depending on QoS and message rates).

Scaling Strategies

Topic Aliasing (MQTT v5.0)

Long topic strings are sent with every PUBLISH. Topic aliases replace them with a short integer after the first use:

// Without topic alias — sends full topic string every time
client.publish('acme/factory-berlin/zone-a/temperature-sensor/ts-0042/telemetry', data);
// Overhead: 67 bytes of topic per message
 
// With topic alias — sends alias number after first message
const client = mqtt.connect('mqtts://broker:8883', {
  protocolVersion: 5,
  properties: {
    topicAliasMaximum: 100, // Support up to 100 aliases
  },
});
 
// First PUBLISH sends full topic + assigns alias
// Subsequent PUBLISH sends only alias (2 bytes instead of 67)

At 10,000 messages/second, topic aliasing saves ~650 KB/second of bandwidth.

Connection Pooling for Backend Services

Backend services that process MQTT data shouldn't create one connection per task:

import mqtt from 'mqtt';
 
class MqttConnectionPool {
  private connections: mqtt.MqttClient[] = [];
  private roundRobinIndex = 0;
 
  constructor(brokerUrl: string, poolSize: number = 5) {
    for (let i = 0; i < poolSize; i++) {
      const client = mqtt.connect(brokerUrl, {
        clientId: `backend-worker-${i}`,
        clean: true,
      });
      this.connections.push(client);
    }
  }
 
  // Round-robin publish across connections
  publish(topic: string, payload: string, opts?: mqtt.IClientPublishOptions): void {
    const client = this.connections[this.roundRobinIndex];
    client.publish(topic, payload, opts || {});
    this.roundRobinIndex = (this.roundRobinIndex + 1) % this.connections.length;
  }
 
  // All connections subscribe (for shared subscriptions)
  subscribeAll(topic: string, opts?: mqtt.IClientSubscribeOptions): void {
    for (const client of this.connections) {
      client.subscribe(topic, opts || {});
    }
  }
}
 
// 5 connections handle publishing across multiple topics
const pool = new MqttConnectionPool('mqtts://broker:8883', 5);
 
pool.publish('factory/zone-a/aggregated', JSON.stringify(data));

Monitoring Broker Health

You can't fix what you can't see. MQTT brokers expose metrics that tell you when trouble is brewing — before your devices start dropping.

Key Metrics to Monitor

MetricWarning ThresholdCritical ThresholdWhat It Means
Connected clients> 80% capacity> 95% capacityRunning out of connection slots
Message rate (in/out)Unusual spike2x normalPossible message storm or loop
Subscription count> 1M per node> 5M per nodeMemory pressure
Retained messages> 100K> 500KStorage growing unbounded
Inflight messages> 10K> 50KQoS handshakes backing up
Memory usage> 70%> 90%Approaching OOM
Packet drop rate> 0.1%> 1%Broker overloaded

EMQX Prometheus Integration

EMQX exposes Prometheus metrics out of the box:

# prometheus.yml
scrape_configs:
  - job_name: 'emqx'
    metrics_path: '/api/v5/prometheus/stats'
    static_configs:
      - targets: ['emqx1:18083', 'emqx2:18083', 'emqx3:18083']
    # Authentication for EMQX dashboard API
    basic_auth:
      username: admin
      password: public

Grafana dashboard query examples:

# Connected clients over time
emqx_connections_count
 
# Message throughput (messages per second)
rate(emqx_messages_received_total[5m])
 
# QoS 1/2 inflight messages
emqx_messages_qos1_inflight + emqx_messages_qos2_inflight
 
# Subscription count
emqx_suboptions_count

$SYS Topics (Mosquitto)

Mosquitto exposes broker stats via special $SYS topics:

const monitor = mqtt.connect('mqtt://broker:1883');
 
monitor.subscribe('$SYS/#');
 
monitor.on('message', (topic, message) => {
  const value = message.toString();
 
  switch (topic) {
    case '$SYS/broker/clients/connected':
      console.log(`Connected clients: ${value}`);
      break;
    case '$SYS/broker/messages/received':
      console.log(`Total messages received: ${value}`);
      break;
    case '$SYS/broker/load/messages/received/1min':
      console.log(`Messages/min: ${value}`);
      break;
    case '$SYS/broker/heap/current':
      console.log(`Heap memory: ${(parseInt(value) / 1024 / 1024).toFixed(1)} MB`);
      break;
  }
});

Health Check Endpoint

Build a simple health check that validates broker connectivity:

import mqtt from 'mqtt';
 
async function checkBrokerHealth(brokerUrl: string): Promise<{
  status: 'healthy' | 'degraded' | 'down';
  latencyMs: number;
  details: string;
}> {
  return new Promise((resolve) => {
    const start = Date.now();
    const testTopic = `health-check/${Date.now()}`;
    const timeout = setTimeout(() => {
      resolve({ status: 'down', latencyMs: -1, details: 'Connection timeout' });
    }, 5000);
 
    const client = mqtt.connect(brokerUrl, {
      clientId: `health-check-${Date.now()}`,
      connectTimeout: 5000,
    });
 
    client.on('connect', () => {
      client.subscribe(testTopic, { qos: 1 });
 
      client.on('message', () => {
        const latency = Date.now() - start;
        clearTimeout(timeout);
        client.end();
 
        resolve({
          status: latency < 100 ? 'healthy' : 'degraded',
          latencyMs: latency,
          details: `Round-trip: ${latency}ms`,
        });
      });
 
      // Publish and expect to receive our own message
      client.publish(testTopic, 'ping', { qos: 1 });
    });
 
    client.on('error', (err) => {
      clearTimeout(timeout);
      client.end();
      resolve({ status: 'down', latencyMs: -1, details: err.message });
    });
  });
}
 
// Usage
const health = await checkBrokerHealth('mqtt://broker:1883');
console.log(`Broker: ${health.status} (${health.latencyMs}ms)`);

MQTT with TypeScript and Python

TypeScript/Node.js (mqtt.js)

Full production client with reconnection, error handling, and graceful shutdown:

import mqtt, { MqttClient, IClientOptions } from 'mqtt';
 
interface DeviceConfig {
  deviceId: string;
  brokerUrl: string;
  username?: string;
  password?: string;
  caCert?: Buffer;
  clientCert?: Buffer;
  clientKey?: Buffer;
}
 
class MqttDeviceClient {
  private client: MqttClient;
  private deviceId: string;
 
  constructor(config: DeviceConfig) {
    this.deviceId = config.deviceId;
 
    const options: IClientOptions = {
      clientId: config.deviceId,
      clean: false, // Persistent session
      reconnectPeriod: 5000, // Retry every 5 seconds
      connectTimeout: 10000,
      keepalive: 60, // Send PINGREQ every 60 seconds
 
      // Last Will and Testament
      will: {
        topic: `devices/${config.deviceId}/status`,
        payload: Buffer.from(JSON.stringify({
          status: 'offline',
          timestamp: Date.now(),
        })),
        qos: 1,
        retain: true,
      },
    };
 
    // Add auth
    if (config.username) {
      options.username = config.username;
      options.password = config.password;
    }
 
    // Add TLS
    if (config.caCert) {
      options.ca = config.caCert;
      options.cert = config.clientCert;
      options.key = config.clientKey;
      options.rejectUnauthorized = true;
    }
 
    this.client = mqtt.connect(config.brokerUrl, options);
    this.setupEventHandlers();
  }
 
  private setupEventHandlers(): void {
    this.client.on('connect', () => {
      console.log(`[${this.deviceId}] Connected to broker`);
 
      // Publish online status (retained)
      this.client.publish(
        `devices/${this.deviceId}/status`,
        JSON.stringify({ status: 'online', timestamp: Date.now() }),
        { qos: 1, retain: true }
      );
 
      // Subscribe to commands
      this.client.subscribe(`devices/${this.deviceId}/commands`, { qos: 1 });
    });
 
    this.client.on('message', (topic, message) => {
      try {
        const payload = JSON.parse(message.toString());
        this.handleCommand(payload);
      } catch (err) {
        console.error(`[${this.deviceId}] Invalid message on ${topic}:`, err);
      }
    });
 
    this.client.on('reconnect', () => {
      console.log(`[${this.deviceId}] Reconnecting...`);
    });
 
    this.client.on('error', (err) => {
      console.error(`[${this.deviceId}] Error:`, err.message);
    });
 
    this.client.on('offline', () => {
      console.log(`[${this.deviceId}] Offline`);
    });
  }
 
  publishTelemetry(data: Record<string, number>): void {
    this.client.publish(
      `devices/${this.deviceId}/telemetry`,
      JSON.stringify({ ...data, timestamp: Date.now() }),
      { qos: 0 } // Telemetry: QoS 0 is fine
    );
  }
 
  publishAlert(alert: { type: string; message: string; severity: string }): void {
    this.client.publish(
      `devices/${this.deviceId}/alerts`,
      JSON.stringify({ ...alert, timestamp: Date.now() }),
      { qos: 1 } // Alerts: QoS 1 — don't lose them
    );
  }
 
  private handleCommand(payload: any): void {
    console.log(`[${this.deviceId}] Command received:`, payload);
 
    switch (payload.action) {
      case 'recalibrate':
        // ... execute recalibration
        break;
      case 'updateConfig':
        // ... apply new configuration
        break;
      case 'reboot':
        // ... schedule graceful reboot
        break;
    }
 
    // Acknowledge command
    this.client.publish(
      `devices/${this.deviceId}/commands/response`,
      JSON.stringify({
        commandId: payload.id,
        status: 'executed',
        timestamp: Date.now(),
      }),
      { qos: 1 }
    );
  }
 
  async shutdown(): Promise<void> {
    // Publish offline status before disconnecting
    this.client.publish(
      `devices/${this.deviceId}/status`,
      JSON.stringify({ status: 'offline', timestamp: Date.now() }),
      { qos: 1, retain: true }
    );
 
    return new Promise((resolve) => {
      this.client.end(false, () => {
        console.log(`[${this.deviceId}] Disconnected gracefully`);
        resolve();
      });
    });
  }
}
 
// Usage
const device = new MqttDeviceClient({
  deviceId: 'sensor-ts-0042',
  brokerUrl: 'mqtts://broker.example.com:8883',
  username: 'sensor-ts-0042',
  password: process.env.DEVICE_TOKEN,
});
 
// Publish telemetry every 5 seconds
setInterval(() => {
  device.publishTelemetry({
    temperature: 23.5 + Math.random() * 2,
    humidity: 65 + Math.random() * 5,
  });
}, 5000);
 
// Graceful shutdown
process.on('SIGTERM', async () => {
  await device.shutdown();
  process.exit(0);
});

Python (paho-mqtt)

# mqtt_device.py
import json
import time
import ssl
import paho.mqtt.client as mqtt
 
class MqttDeviceClient:
    def __init__(self, device_id: str, broker_host: str, broker_port: int = 8883):
        self.device_id = device_id
        self.client = mqtt.Client(
            client_id=device_id,
            protocol=mqtt.MQTTv5,
            callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
        )
 
        # Last Will and Testament
        self.client.will_set(
            topic=f"devices/{device_id}/status",
            payload=json.dumps({"status": "offline", "timestamp": time.time()}),
            qos=1,
            retain=True,
        )
 
        # TLS configuration
        self.client.tls_set(
            ca_certs="/etc/mqtt/ca.crt",
            certfile=f"/etc/mqtt/{device_id}.crt",
            keyfile=f"/etc/mqtt/{device_id}.key",
            tls_version=ssl.PROTOCOL_TLSv1_2,
        )
 
        # Callbacks
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_disconnect = self._on_disconnect
 
        # Connect with auto-reconnect
        self.client.connect(broker_host, broker_port, keepalive=60)
 
    def _on_connect(self, client, userdata, flags, reason_code, properties):
        print(f"[{self.device_id}] Connected: {reason_code}")
 
        # Publish online status
        client.publish(
            f"devices/{self.device_id}/status",
            json.dumps({"status": "online", "timestamp": time.time()}),
            qos=1,
            retain=True,
        )
 
        # Subscribe to commands
        client.subscribe(f"devices/{self.device_id}/commands", qos=1)
 
    def _on_message(self, client, userdata, msg):
        try:
            payload = json.loads(msg.payload.decode())
            print(f"[{self.device_id}] Command: {payload}")
            self._handle_command(payload)
        except json.JSONDecodeError:
            print(f"[{self.device_id}] Invalid JSON on {msg.topic}")
 
    def _on_disconnect(self, client, userdata, flags, reason_code, properties):
        print(f"[{self.device_id}] Disconnected: {reason_code}")
 
    def _handle_command(self, payload: dict):
        action = payload.get("action")
        if action == "recalibrate":
            pass  # Execute recalibration
        elif action == "updateConfig":
            pass  # Apply new configuration
 
        # Acknowledge
        self.client.publish(
            f"devices/{self.device_id}/commands/response",
            json.dumps({
                "commandId": payload.get("id"),
                "status": "executed",
                "timestamp": time.time(),
            }),
            qos=1,
        )
 
    def publish_telemetry(self, data: dict):
        self.client.publish(
            f"devices/{self.device_id}/telemetry",
            json.dumps({**data, "timestamp": time.time()}),
            qos=0,
        )
 
    def start(self):
        self.client.loop_start()
 
    def stop(self):
        self.client.publish(
            f"devices/{self.device_id}/status",
            json.dumps({"status": "offline", "timestamp": time.time()}),
            qos=1,
            retain=True,
        )
        self.client.loop_stop()
        self.client.disconnect()
 
 
# Usage
device = MqttDeviceClient("sensor-ts-0042", "broker.example.com")
device.start()
 
try:
    while True:
        device.publish_telemetry({
            "temperature": 23.5,
            "humidity": 65.0,
        })
        time.sleep(5)
except KeyboardInterrupt:
    device.stop()

Common Beginner Mistakes

Mistake 1: Using the Same Client ID for Multiple Devices

❌ All devices connect with clientId "my-device"
   → Broker disconnects previous client when new one connects with same ID!
 
✅ Each device has a unique clientId: "sensor-ts-0042", "sensor-ts-0043"

MQTT spec requires unique client IDs. If two clients connect with the same ID, the broker disconnects the first one. This causes a "ping-pong" where both devices keep reconnecting and kicking each other off.

Mistake 2: Subscribing to # in Production

❌ client.subscribe('#')   → Receives ALL messages on the entire broker
 
✅ client.subscribe('factory/zone-a/+/telemetry')   → Only what you need

Subscribing to # on a busy broker with 100,000 devices will flood your client with millions of messages it can't process.

Mistake 3: Not Using Retained Messages for Status

❌ Device publishes status on connect, new subscriber misses it
   → Dashboard shows "unknown" until device publishes next status
 
✅ Device publishes status with retain=true
   → New subscriber immediately gets last known status

Mistake 4: Ignoring the Keep-Alive Mechanism

❌ keepalive=0 (disabled) → Broker can't detect dead connections
   → Ghost sessions accumulate, consuming broker memory
 
✅ keepalive=60 → Broker expects PINGREQ within 1.5× keepalive (90s)
   → Dead connections detected and cleaned up automatically

Mistake 5: No Reconnection Strategy

❌ Single connect attempt → connection fails → device goes silent forever
 
✅ Exponential backoff reconnection:
   Attempt 1: wait 1s → Attempt 2: wait 2s → Attempt 3: wait 4s → ... → max 60s

Most MQTT client libraries (mqtt.js, paho-mqtt) handle reconnection automatically, but you need to configure the reconnectPeriod and handle the reconnect event to re-subscribe if using clean sessions.


Summary and Key Takeaways

Design topic hierarchies with wildcards in mind{org}/{site}/{zone}/{device-type}/{device-id}/{data-type}
Use shared subscriptions (v5.0) for load balancing — distribute messages across consumer workers natively
Match QoS to data criticality — QoS 0 for telemetry, QoS 1 for readings, QoS 2 for critical commands only
Persistent sessions + session expiry — let battery-powered devices sleep without losing messages
Always use TLS 1.3 — plaintext MQTT is an open invitation for attackers
mTLS is the strongest IoT authentication — unique certificate per device, no passwords to leak
Implement ACLs — devices should only access their own topics, never #
Cluster your broker for production — EMQX with 3+ nodes handles failover and horizontal scaling
Monitor connected clients, message rates, and memory — catch problems before they become outages
Topic aliasing (v5.0) saves bandwidth — replace long topic strings with 2-byte integers


What's Next

Now that you can design, secure, and scale MQTT, it's time to look at managing the devices themselves — provisioning, configuration, firmware updates, and fleet management at scale.

Next: IOT-6: Deep Dive: Device Management & OTA Updates — Device lifecycle management, device shadow/digital twin pattern, OTA firmware updates with rollback strategies, remote diagnostics, and fleet management at scale.



This is Part 5 of the IoT Patterns & Strategies series. Start from the beginning or jump to any topic that interests you.

📬 Subscribe to Newsletter

Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.

We respect your privacy. Unsubscribe at any time.

💬 Comments

Sign in to leave a comment

We'll never post without your permission.