IoT Edge Computing & Fog Architecture

Welcome to Phase 3 of the IoT Patterns & Strategies Roadmap! In Phase 1, we explored IoT architecture layers and device types. In Phase 2, we dug into protocols — MQTT, CoAP, AMQP, and WebSocket. Now the question is: where does the processing happen?
Sending every sensor reading from 100,000 devices straight to the cloud sounds simple until you see the bandwidth bill, the latency numbers, and what happens when the internet goes down. Edge computing flips the model — process data close to where it's generated, send only what the cloud actually needs.
By the end of this post, you'll understand when to use edge, fog, or cloud processing, how to implement common edge patterns, and how to keep your system running when connectivity drops.
What You'll Learn
✅ Understand why edge computing is critical for IoT (latency, bandwidth, resilience)
✅ Compare edge vs fog vs cloud architectures with clear trade-offs
✅ Implement data filtering, aggregation, and store-and-forward at the edge
✅ Evaluate edge runtimes: AWS Greengrass, Azure IoT Edge, K3s, Docker
✅ Design fog computing layers for hierarchical IoT systems
✅ Build offline-first IoT applications that sync when connectivity returns
✅ Run ML inference at the edge for real-time decision making
Why Edge Computing Matters for IoT
The Cloud-Only Problem
Consider a factory with 10,000 sensors, each sending readings every second:
10,000 sensors × 1 reading/sec × 200 bytes = 2 MB/sec = 172 GB/dayNow multiply that by network costs, cloud compute costs, and the 50-200ms round-trip latency to the cloud. A vibration sensor detecting equipment failure needs a response in under 10ms — not 200ms after a round trip to us-east-1.
Three Reasons to Process at the Edge
1. Latency — Some decisions can't wait for the cloud.
| Scenario | Required Latency | Cloud Feasible? |
|---|---|---|
| Predictive maintenance alert | < 100ms | Maybe |
| Robotic arm collision avoidance | < 10ms | No |
| Smart traffic light adjustment | < 50ms | No |
| Environmental monitoring report | < 5 min | Yes |
| Monthly energy consumption summary | Hours | Yes |
2. Bandwidth — Raw sensor data is expensive to transmit.
3. Resilience — The internet will go down. Your factory shouldn't stop.
Edge processing ensures critical operations continue during network outages. Decisions that affect safety, quality control, and real-time adjustments happen locally — the cloud gets a summary when connectivity returns.
Edge vs Fog vs Cloud
These three terms get used interchangeably, but they represent distinct layers with different responsibilities:
Comparison Table
| Aspect | Edge | Fog | Cloud |
|---|---|---|---|
| Location | On or next to device | Local network (gateway, on-prem server) | Remote data center |
| Latency | < 10ms | 10-100ms | 50-500ms |
| Compute power | Limited (ARM, microcontroller) | Moderate (x86 server, GPU) | Unlimited (auto-scale) |
| Storage | GB range | TB range | PB range |
| Connectivity | Can work offline | Needs local network | Needs internet |
| Example hardware | Raspberry Pi, Jetson Nano, ESP32 | On-prem server, industrial PC | AWS, Azure, GCP |
| Processing type | Filtering, thresholds, simple ML | Aggregation, regional analytics, model inference | Training, global analytics, historical queries |
| Cost | Low per device, scales linearly | Moderate (shared infrastructure) | Pay-per-use, can spike |
When to Use Each Layer
Edge Computing Patterns
Pattern 1: Data Filtering
The simplest edge pattern — drop data you don't need before it leaves the device.
// Edge data filter — runs on gateway device
interface SensorReading {
deviceId: string;
timestamp: number;
temperature: number;
humidity: number;
vibration: number;
}
class EdgeDataFilter {
private lastSent: Map<string, SensorReading> = new Map();
// Only forward readings that changed significantly
shouldForward(reading: SensorReading): boolean {
const last = this.lastSent.get(reading.deviceId);
if (!last) {
this.lastSent.set(reading.deviceId, reading);
return true; // Always forward first reading
}
const tempDelta = Math.abs(reading.temperature - last.temperature);
const humidDelta = Math.abs(reading.humidity - last.humidity);
const vibDelta = Math.abs(reading.vibration - last.vibration);
// Forward only if change exceeds thresholds
if (tempDelta > 0.5 || humidDelta > 2.0 || vibDelta > 10.0) {
this.lastSent.set(reading.deviceId, reading);
return true;
}
return false; // Skip — not significant enough
}
}
// Usage on edge gateway
const filter = new EdgeDataFilter();
const readings = getIncomingSensorReadings(); // From MQTT subscription
for (const reading of readings) {
if (filter.shouldForward(reading)) {
publishToCloud(reading); // Forward to cloud MQTT broker
}
// Silently drop insignificant readings
}Result: If temperature fluctuates ±0.1°C, you send one reading instead of hundreds. Typical reduction: 70-95% fewer messages.
Pattern 2: Edge Aggregation
Instead of sending every raw reading, compute summaries locally:
interface AggregatedReading {
deviceId: string;
windowStart: number;
windowEnd: number;
temperature: {
min: number;
max: number;
avg: number;
count: number;
};
humidity: {
min: number;
max: number;
avg: number;
count: number;
};
}
class EdgeAggregator {
private windows: Map<string, SensorReading[]> = new Map();
private windowDuration = 60_000; // 1 minute windows
addReading(reading: SensorReading): void {
const key = reading.deviceId;
if (!this.windows.has(key)) {
this.windows.set(key, []);
}
this.windows.get(key)!.push(reading);
}
flush(deviceId: string): AggregatedReading | null {
const readings = this.windows.get(deviceId);
if (!readings || readings.length === 0) return null;
const temps = readings.map(r => r.temperature);
const humids = readings.map(r => r.humidity);
const result: AggregatedReading = {
deviceId,
windowStart: readings[0].timestamp,
windowEnd: readings[readings.length - 1].timestamp,
temperature: {
min: Math.min(...temps),
max: Math.max(...temps),
avg: temps.reduce((a, b) => a + b, 0) / temps.length,
count: temps.length,
},
humidity: {
min: Math.min(...humids),
max: Math.max(...humids),
avg: humids.reduce((a, b) => a + b, 0) / humids.length,
count: humids.length,
},
};
this.windows.set(deviceId, []); // Reset window
return result;
}
}
// Every 60 seconds, flush aggregates to cloud
setInterval(() => {
for (const deviceId of aggregator.getAllDeviceIds()) {
const summary = aggregator.flush(deviceId);
if (summary) {
publishToCloud(summary); // 1 message instead of 60
}
}
}, 60_000);60 raw readings per minute → 1 aggregated message. The cloud gets min/max/avg — enough for dashboards and trend analysis.
Pattern 3: Threshold Alerting
React to critical conditions instantly at the edge without waiting for the cloud:
interface Alert {
deviceId: string;
type: 'warning' | 'critical';
metric: string;
value: number;
threshold: number;
timestamp: number;
message: string;
}
interface ThresholdRule {
metric: string;
warningThreshold: number;
criticalThreshold: number;
direction: 'above' | 'below';
}
class EdgeAlertEngine {
private rules: ThresholdRule[] = [
{ metric: 'temperature', warningThreshold: 35, criticalThreshold: 45, direction: 'above' },
{ metric: 'temperature', warningThreshold: 5, criticalThreshold: 0, direction: 'below' },
{ metric: 'vibration', warningThreshold: 50, criticalThreshold: 80, direction: 'above' },
{ metric: 'humidity', warningThreshold: 80, criticalThreshold: 95, direction: 'above' },
];
// Debounce: don't spam alerts
private lastAlert: Map<string, number> = new Map();
private cooldownMs = 30_000; // 30 second cooldown per device+metric
evaluate(reading: SensorReading): Alert[] {
const alerts: Alert[] = [];
for (const rule of this.rules) {
const value = reading[rule.metric as keyof SensorReading] as number;
if (typeof value !== 'number') continue;
const key = `${reading.deviceId}:${rule.metric}:${rule.direction}`;
const lastTime = this.lastAlert.get(key) || 0;
if (Date.now() - lastTime < this.cooldownMs) continue;
let triggered = false;
let type: 'warning' | 'critical' = 'warning';
let threshold = rule.warningThreshold;
if (rule.direction === 'above') {
if (value >= rule.criticalThreshold) {
triggered = true;
type = 'critical';
threshold = rule.criticalThreshold;
} else if (value >= rule.warningThreshold) {
triggered = true;
type = 'warning';
}
} else {
if (value <= rule.criticalThreshold) {
triggered = true;
type = 'critical';
threshold = rule.criticalThreshold;
} else if (value <= rule.warningThreshold) {
triggered = true;
type = 'warning';
}
}
if (triggered) {
this.lastAlert.set(key, Date.now());
alerts.push({
deviceId: reading.deviceId,
type,
metric: rule.metric,
value,
threshold,
timestamp: Date.now(),
message: `${rule.metric} ${rule.direction} ${threshold}: ${value}`,
});
}
}
return alerts;
}
}
// On edge gateway
const alertEngine = new EdgeAlertEngine();
function processReading(reading: SensorReading) {
const alerts = alertEngine.evaluate(reading);
for (const alert of alerts) {
// React locally — no cloud needed
if (alert.type === 'critical') {
triggerLocalBuzzer();
activateEmergencyShutdown(alert.deviceId);
}
// Also forward alert to cloud for logging
publishToCloud(alert);
}
}Key benefit: The buzzer sounds in < 5ms instead of waiting 200ms+ for a cloud round trip.
Pattern 4: Store-and-Forward
Buffer data locally when the network is down, flush when it recovers:
import * as fs from 'fs';
import * as path from 'path';
interface QueuedMessage {
topic: string;
payload: string;
timestamp: number;
retries: number;
}
class StoreAndForward {
private queue: QueuedMessage[] = [];
private maxQueueSize = 10_000;
private persistPath: string;
private isOnline = true;
constructor(persistDir: string) {
this.persistPath = path.join(persistDir, 'message-queue.json');
this.loadFromDisk(); // Recover after restart
}
async send(topic: string, payload: string): Promise<void> {
if (this.isOnline) {
try {
await this.publishToCloud(topic, payload);
return; // Sent successfully
} catch {
this.isOnline = false;
console.log('Network down — switching to store-and-forward');
}
}
// Store locally
this.enqueue({ topic, payload, timestamp: Date.now(), retries: 0 });
}
private enqueue(msg: QueuedMessage): void {
if (this.queue.length >= this.maxQueueSize) {
// Drop oldest non-critical messages (FIFO eviction)
this.queue.shift();
}
this.queue.push(msg);
this.saveToDisk(); // Survive device restarts
}
async flush(): Promise<void> {
if (this.queue.length === 0) return;
console.log(`Flushing ${this.queue.length} queued messages...`);
const failed: QueuedMessage[] = [];
for (const msg of this.queue) {
try {
await this.publishToCloud(msg.topic, msg.payload);
} catch {
if (msg.retries < 3) {
msg.retries++;
failed.push(msg);
} else {
console.log(`Dropping message after 3 retries: ${msg.topic}`);
}
}
}
this.queue = failed;
this.saveToDisk();
if (failed.length === 0) {
this.isOnline = true;
console.log('All queued messages flushed successfully');
}
}
private saveToDisk(): void {
fs.writeFileSync(this.persistPath, JSON.stringify(this.queue));
}
private loadFromDisk(): void {
try {
const data = fs.readFileSync(this.persistPath, 'utf-8');
this.queue = JSON.parse(data);
console.log(`Loaded ${this.queue.length} messages from disk`);
} catch {
this.queue = [];
}
}
private async publishToCloud(topic: string, payload: string): Promise<void> {
// Actual MQTT/HTTP publish to cloud broker
throw new Error('Not implemented — use your MQTT client here');
}
}
// Usage
const saf = new StoreAndForward('/var/edge/data');
// Normal operation — sends or queues automatically
await saf.send('factory/zone-1/temperature', JSON.stringify(reading));
// Periodic connectivity check and flush
setInterval(async () => {
if (await checkConnectivity()) {
await saf.flush();
}
}, 30_000); // Check every 30 secondsFog Computing Architecture
Fog computing sits between edge and cloud — typically on-premise servers or gateways that aggregate data from multiple edge devices. Think of it as a regional processing hub.
When You Need Fog
Edge devices are constrained — a Raspberry Pi can't run complex ML models or correlate data from 500 sensors. But sending everything to the cloud is wasteful. The fog layer handles:
- Cross-device correlation: "Is the temperature spike on sensor A related to the vibration increase on sensor B?"
- Regional ML inference: Run pre-trained models on an on-prem GPU server
- Local dashboards: Operators on the factory floor need real-time views without internet dependency
- Protocol translation: Bridge MQTT, CoAP, and Modbus devices into a unified data stream
Fog Node Implementation
A fog node typically runs as a service on an on-premise server, collecting from multiple edge devices:
import mqtt from 'mqtt';
interface ZoneData {
zone: string;
devices: Map<string, SensorReading[]>;
lastAggregation: number;
}
class FogNode {
private zones: Map<string, ZoneData> = new Map();
private edgeBroker: mqtt.MqttClient;
private cloudBroker: mqtt.MqttClient;
constructor(edgeBrokerUrl: string, cloudBrokerUrl: string) {
// Connect to local MQTT broker (edge devices publish here)
this.edgeBroker = mqtt.connect(edgeBrokerUrl);
// Connect to cloud MQTT broker (fog publishes aggregates here)
this.cloudBroker = mqtt.connect(cloudBrokerUrl);
this.setupSubscriptions();
this.startAggregationLoop();
}
private setupSubscriptions(): void {
// Subscribe to all zone data from edge devices
this.edgeBroker.subscribe('factory/+/+/data');
this.edgeBroker.on('message', (topic, message) => {
// topic: factory/{zone}/{device}/data
const parts = topic.split('/');
const zone = parts[1];
const deviceId = parts[2];
const reading: SensorReading = JSON.parse(message.toString());
this.ingestReading(zone, deviceId, reading);
});
}
private ingestReading(zone: string, deviceId: string, reading: SensorReading): void {
if (!this.zones.has(zone)) {
this.zones.set(zone, {
zone,
devices: new Map(),
lastAggregation: Date.now(),
});
}
const zoneData = this.zones.get(zone)!;
if (!zoneData.devices.has(deviceId)) {
zoneData.devices.set(deviceId, []);
}
zoneData.devices.get(deviceId)!.push(reading);
// Run cross-device anomaly detection
this.detectCrossDeviceAnomalies(zone);
}
private detectCrossDeviceAnomalies(zone: string): void {
const zoneData = this.zones.get(zone);
if (!zoneData) return;
// Example: correlate temperature and vibration across devices
const latestReadings: SensorReading[] = [];
for (const [, readings] of zoneData.devices) {
if (readings.length > 0) {
latestReadings.push(readings[readings.length - 1]);
}
}
const avgTemp = latestReadings.reduce((sum, r) => sum + r.temperature, 0)
/ latestReadings.length;
const maxVibration = Math.max(...latestReadings.map(r => r.vibration));
// Zone-level alert: high temp + high vibration = potential equipment failure
if (avgTemp > 40 && maxVibration > 60) {
this.cloudBroker.publish(
`alerts/factory/${zone}/anomaly`,
JSON.stringify({
type: 'cross_device_anomaly',
zone,
avgTemp,
maxVibration,
timestamp: Date.now(),
message: `Zone ${zone}: high temp (${avgTemp.toFixed(1)}°C) + vibration (${maxVibration})`,
})
);
}
}
private startAggregationLoop(): void {
// Every 5 minutes, aggregate and send to cloud
setInterval(() => {
for (const [zone, zoneData] of this.zones) {
const summary = this.aggregateZone(zone, zoneData);
this.cloudBroker.publish(
`factory/${zone}/summary`,
JSON.stringify(summary)
);
// Reset device readings after aggregation
for (const [deviceId] of zoneData.devices) {
zoneData.devices.set(deviceId, []);
}
zoneData.lastAggregation = Date.now();
}
}, 5 * 60_000);
}
private aggregateZone(zone: string, data: ZoneData) {
const deviceSummaries: Record<string, any> = {};
for (const [deviceId, readings] of data.devices) {
if (readings.length === 0) continue;
const temps = readings.map(r => r.temperature);
deviceSummaries[deviceId] = {
readingCount: readings.length,
temperature: {
min: Math.min(...temps),
max: Math.max(...temps),
avg: temps.reduce((a, b) => a + b, 0) / temps.length,
},
};
}
return {
zone,
windowStart: data.lastAggregation,
windowEnd: Date.now(),
deviceCount: data.devices.size,
devices: deviceSummaries,
};
}
}
// Start fog node
const fog = new FogNode(
'mqtt://local-broker:1883', // Edge devices publish here
'mqtt://cloud-broker:8883' // Fog publishes aggregates here
);Edge Runtime Options
When you decide to run code at the edge, you need a runtime — a way to deploy, manage, and update software on edge devices. Here's how the main options compare:
Comparison Table
| Feature | AWS Greengrass | Azure IoT Edge | K3s | Docker on Edge |
|---|---|---|---|---|
| Provider | AWS | Microsoft | Rancher/SUSE | Docker Inc |
| Cloud integration | Deep AWS (IoT Core, Lambda, S3) | Deep Azure (IoT Hub, Functions) | Any (cloud-agnostic) | Any |
| Programming model | Lambda functions, containers | Modules (Docker containers) | Pods (Kubernetes) | Containers |
| Min hardware | 1 GHz ARM, 128 MB RAM | 1 GHz x86/ARM, 256 MB RAM | 512 MB RAM, 1 CPU | 512 MB RAM |
| OTA updates | Built-in (Greengrass deployments) | Built-in (IoT Hub deployments) | Helm charts, kubectl | docker compose pull |
| Local messaging | Local MQTT broker + IPC | Route messages between modules | Any (install your own) | Any |
| Offline support | Yes (local Lambda execution) | Yes (modules run locally) | Yes (pods run locally) | Yes |
| Device management | AWS IoT Device Shadow | Azure Device Twin | Manual or Rancher | Manual |
| Cost | Free tier + pay for cloud services | Free tier + pay for cloud services | Free (open source) | Free (open source) |
| Best for | AWS-heavy shops | Azure-heavy shops | Cloud-agnostic, Kubernetes teams | Simple deployments, prototypes |
AWS IoT Greengrass
Greengrass lets you run Lambda functions and Docker containers on edge devices, with seamless integration to AWS IoT Core:
# greengrass-deployment.yaml
# Greengrass V2 component deployment
RecipeFormatVersion: "2020-01-25"
ComponentName: com.factory.temperature-filter
ComponentVersion: "1.0.0"
Manifests:
- Platform:
os: linux
Lifecycle:
run: |
python3 {artifacts:path}/filter.py
Artifacts:
- URI: s3://my-bucket/components/filter.py
ComponentDependencies:
aws.greengrass.Nucleus:
VersionRequirement: ">=2.0.0"# filter.py — Greengrass V2 component
import json
import awsiot.greengrasscoreipc as ipc
ipc_client = ipc.connect()
def on_message(event):
message = json.loads(event.message.payload.decode())
temperature = message.get('temperature', 0)
# Only forward significant readings to cloud
if temperature > 35 or temperature < 5:
ipc_client.publish_to_iot_core(
topic_name=f"factory/alerts/{message['deviceId']}",
qos='AT_LEAST_ONCE',
payload=json.dumps({
'alert': 'temperature_anomaly',
'value': temperature,
'deviceId': message['deviceId'],
})
)
# Subscribe to local MQTT topic
ipc_client.subscribe_to_topic(
topic='factory/+/temperature',
on_stream_event=on_message,
)Azure IoT Edge
Azure IoT Edge runs containerized modules on edge devices, managed through Azure IoT Hub:
// deployment.json — Azure IoT Edge deployment manifest
{
"modulesContent": {
"$edgeAgent": {
"properties.desired": {
"modules": {
"temperatureFilter": {
"type": "docker",
"settings": {
"image": "myregistry.azurecr.io/temp-filter:1.0",
"createOptions": "{\"HostConfig\":{\"Privileged\":true}}"
},
"status": "running",
"restartPolicy": "always"
}
}
}
},
"$edgeHub": {
"properties.desired": {
"routes": {
"filterToCloud": "FROM /messages/modules/temperatureFilter/outputs/alerts INTO $upstream",
"sensorToFilter": "FROM /messages/modules/sensor/* INTO BrokeredEndpoint(\"/modules/temperatureFilter/inputs/data\")"
}
}
}
}
}K3s — Lightweight Kubernetes at the Edge
K3s is a CNCF-certified Kubernetes distribution built for edge and IoT. It strips Kubernetes down to a single binary under 100 MB:
# Install K3s on an edge device (single command)
curl -sfL https://get.k3s.io | sh -
# Check it's running
sudo k3s kubectl get nodes
# NAME STATUS ROLES AGE VERSION
# edge-gw-01 Ready control-plane,master 30s v1.28.4+k3s1# edge-processor.yaml — Deploy edge processing as a Kubernetes pod
apiVersion: apps/v1
kind: Deployment
metadata:
name: edge-data-processor
namespace: iot
spec:
replicas: 1
selector:
matchLabels:
app: edge-processor
template:
metadata:
labels:
app: edge-processor
spec:
containers:
- name: processor
image: myregistry/edge-processor:1.0
resources:
limits:
cpu: "500m"
memory: "256Mi"
requests:
cpu: "100m"
memory: "128Mi"
env:
- name: MQTT_BROKER
value: "mqtt://mosquitto:1883"
- name: CLOUD_BROKER
value: "mqtts://cloud-broker:8883"
volumeMounts:
- name: edge-data
mountPath: /var/edge/data
volumes:
- name: edge-data
hostPath:
path: /var/edge/data
type: DirectoryOrCreate
---
# Local MQTT broker as a Kubernetes service
apiVersion: apps/v1
kind: Deployment
metadata:
name: mosquitto
namespace: iot
spec:
replicas: 1
selector:
matchLabels:
app: mosquitto
template:
metadata:
labels:
app: mosquitto
spec:
containers:
- name: mosquitto
image: eclipse-mosquitto:2
ports:
- containerPort: 1883
volumeMounts:
- name: config
mountPath: /mosquitto/config
volumes:
- name: config
configMap:
name: mosquitto-config
---
apiVersion: v1
kind: Service
metadata:
name: mosquitto
namespace: iot
spec:
selector:
app: mosquitto
ports:
- port: 1883
targetPort: 1883Why K3s for IoT edge?
- Single binary, ~100 MB — runs on Raspberry Pi
- No cloud vendor lock-in
- Full Kubernetes API — use Helm, kubectl, GitOps
- Built-in SQLite instead of etcd (lighter)
- Automatic TLS, traefik ingress included
Docker Compose on Edge
For simpler deployments where Kubernetes is overkill:
# docker-compose.edge.yml
version: "3.8"
services:
mosquitto:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
volumes:
- ./mosquitto/config:/mosquitto/config
- mosquitto-data:/mosquitto/data
restart: unless-stopped
edge-processor:
image: myregistry/edge-processor:1.0
environment:
- MQTT_BROKER=mqtt://mosquitto:1883
- CLOUD_BROKER=mqtts://cloud.example.com:8883
- STORE_PATH=/data/queue
volumes:
- edge-data:/data
depends_on:
- mosquitto
restart: unless-stopped
node-red:
image: nodered/node-red:latest
ports:
- "1880:1880"
volumes:
- nodered-data:/data
restart: unless-stopped
volumes:
mosquitto-data:
edge-data:
nodered-data:# Deploy on edge gateway
docker compose -f docker-compose.edge.yml up -d
# Update a single service
docker compose -f docker-compose.edge.yml pull edge-processor
docker compose -f docker-compose.edge.yml up -d edge-processor
# Check status
docker compose -f docker-compose.edge.yml psEdge-to-Cloud Sync Strategies
The edge and cloud need to stay in sync — the question is how much and how often.
Strategy Comparison
| Strategy | Data Freshness | Bandwidth | Complexity | Best For |
|---|---|---|---|---|
| Continuous streaming | Real-time | High | Low | Small deployments, critical data |
| Periodic batch upload | Minutes to hours | Low | Medium | Telemetry, non-critical data |
| Event-driven upload | On change | Medium | Medium | Alerts, state changes |
| Hybrid (stream + batch) | Mixed | Medium | High | Production systems |
Hybrid Sync Implementation
Most production systems use a hybrid approach — stream critical data, batch the rest:
type Priority = 'critical' | 'normal' | 'low';
interface SyncConfig {
critical: { mode: 'stream' }; // Real-time
normal: { mode: 'batch'; intervalMs: number }; // Every 5 minutes
low: { mode: 'batch'; intervalMs: number }; // Every 30 minutes
}
class HybridSync {
private config: SyncConfig = {
critical: { mode: 'stream' },
normal: { mode: 'batch', intervalMs: 5 * 60_000 },
low: { mode: 'batch', intervalMs: 30 * 60_000 },
};
private normalBatch: QueuedMessage[] = [];
private lowBatch: QueuedMessage[] = [];
private cloudClient: mqtt.MqttClient;
constructor(cloudBrokerUrl: string) {
this.cloudClient = mqtt.connect(cloudBrokerUrl);
// Periodic batch flush
setInterval(() => this.flushBatch('normal'), this.config.normal.intervalMs);
setInterval(() => this.flushBatch('low'), this.config.low.intervalMs);
}
async sync(topic: string, payload: string, priority: Priority): Promise<void> {
const message: QueuedMessage = { topic, payload, timestamp: Date.now(), retries: 0 };
switch (priority) {
case 'critical':
// Stream immediately
await this.publishNow(message);
break;
case 'normal':
this.normalBatch.push(message);
break;
case 'low':
this.lowBatch.push(message);
break;
}
}
private async publishNow(msg: QueuedMessage): Promise<void> {
this.cloudClient.publish(msg.topic, msg.payload, { qos: 1 });
}
private flushBatch(priority: 'normal' | 'low'): void {
const batch = priority === 'normal' ? this.normalBatch : this.lowBatch;
if (batch.length === 0) return;
// Send as a single compressed batch
const batchPayload = JSON.stringify({
batchId: crypto.randomUUID(),
priority,
count: batch.length,
messages: batch,
});
this.cloudClient.publish(`batch/${priority}`, batchPayload, { qos: 1 });
// Clear batch
if (priority === 'normal') {
this.normalBatch = [];
} else {
this.lowBatch = [];
}
}
}
// Usage
const sync = new HybridSync('mqtts://cloud-broker:8883');
// Critical alert — stream immediately
await sync.sync('alerts/factory/zone-1', alertPayload, 'critical');
// Normal telemetry — batch every 5 minutes
await sync.sync('telemetry/zone-1', telemetryPayload, 'normal');
// Environmental logs — batch every 30 minutes
await sync.sync('logs/zone-1', logPayload, 'low');Edge ML Inference
One of the most powerful edge patterns — run pre-trained machine learning models directly on edge devices for real-time predictions without cloud dependency.
Why ML at the Edge?
| Factor | Cloud ML | Edge ML |
|---|---|---|
| Latency | 100-500ms | 5-50ms |
| Privacy | Data leaves premises | Data stays local |
| Cost | Pay per inference | One-time hardware cost |
| Connectivity | Required | Not required |
| Model size | Unlimited | Constrained (MB range) |
Common Edge ML Use Cases
- Anomaly detection: Predict equipment failure from vibration patterns
- Computer vision: Defect detection on manufacturing line
- Sound classification: Detect abnormal machine sounds
- Predictive maintenance: Estimate remaining useful life (RUL)
Edge Inference with TensorFlow Lite
# edge_inference.py — Run on Raspberry Pi or Jetson Nano
import numpy as np
import json
import paho.mqtt.client as mqtt
# TensorFlow Lite — optimized for edge devices
import tflite_runtime.interpreter as tflite
class EdgeAnomalyDetector:
def __init__(self, model_path: str):
# Load quantized model (typically 1-10 MB)
self.interpreter = tflite.Interpreter(model_path=model_path)
self.interpreter.allocate_tensors()
self.input_details = self.interpreter.get_input_details()
self.output_details = self.interpreter.get_output_details()
# Sliding window of recent readings
self.window_size = 60 # 60 seconds of data
self.buffer = []
def add_reading(self, reading: dict) -> dict | None:
"""Add a reading and return anomaly prediction if window is full."""
self.buffer.append([
reading['temperature'],
reading['humidity'],
reading['vibration'],
reading['pressure'],
])
if len(self.buffer) < self.window_size:
return None # Not enough data yet
if len(self.buffer) > self.window_size:
self.buffer.pop(0) # Sliding window
return self.predict()
def predict(self) -> dict:
"""Run inference on the current window."""
# Prepare input tensor
input_data = np.array([self.buffer], dtype=np.float32)
self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
# Run inference
self.interpreter.invoke()
# Get output
output = self.interpreter.get_tensor(self.output_details[0]['index'])
anomaly_score = float(output[0][0])
return {
'anomaly_score': anomaly_score,
'is_anomaly': anomaly_score > 0.8, # Threshold
'confidence': min(anomaly_score, 1.0 - anomaly_score) * 2,
}
# MQTT integration
detector = EdgeAnomalyDetector('models/anomaly_detector.tflite')
client = mqtt.Client()
def on_message(client, userdata, msg):
reading = json.loads(msg.payload.decode())
result = detector.add_reading(reading)
if result and result['is_anomaly']:
# Publish anomaly alert locally and to cloud
alert = {
'type': 'ml_anomaly',
'deviceId': reading['deviceId'],
'score': result['anomaly_score'],
'confidence': result['confidence'],
'timestamp': reading['timestamp'],
}
client.publish('alerts/anomaly', json.dumps(alert))
print(f"⚠️ Anomaly detected: score={result['anomaly_score']:.3f}")
client.on_message = on_message
client.connect('localhost', 1883)
client.subscribe('sensors/+/data')
client.loop_forever()Edge ML Model Update Flow
Models need periodic updates as you train on more data in the cloud:
Offline-First Design Patterns
Edge devices must assume the network will fail. Offline-first design treats connectivity as a bonus, not a requirement.
Principles
- Local-first processing — All critical decisions happen locally
- Idempotent operations — Safe to replay when reconnecting
- Conflict resolution — Handle stale data gracefully
- Graceful degradation — Reduce functionality, never crash
Offline-First Architecture
Connectivity State Machine
type ConnectivityState = 'online' | 'degraded' | 'offline';
class ConnectivityManager {
private state: ConnectivityState = 'online';
private failedPings = 0;
private maxFailedPings = 3;
private listeners: ((state: ConnectivityState) => void)[] = [];
async checkConnectivity(): Promise<ConnectivityState> {
try {
const start = Date.now();
await fetch('https://cloud-broker.example.com/health', {
signal: AbortSignal.timeout(5000),
});
const latency = Date.now() - start;
this.failedPings = 0;
if (latency > 2000) {
this.setState('degraded');
} else {
this.setState('online');
}
} catch {
this.failedPings++;
if (this.failedPings >= this.maxFailedPings) {
this.setState('offline');
} else {
this.setState('degraded');
}
}
return this.state;
}
private setState(newState: ConnectivityState): void {
if (this.state !== newState) {
console.log(`Connectivity: ${this.state} → ${newState}`);
this.state = newState;
this.listeners.forEach(fn => fn(newState));
}
}
onStateChange(listener: (state: ConnectivityState) => void): void {
this.listeners.push(listener);
}
getState(): ConnectivityState {
return this.state;
}
}
// Usage
const connectivity = new ConnectivityManager();
connectivity.onStateChange((state) => {
switch (state) {
case 'online':
console.log('Full connectivity — streaming to cloud');
sync.flushAllQueues();
break;
case 'degraded':
console.log('Degraded — batch mode, critical only');
sync.switchToBatchMode();
break;
case 'offline':
console.log('Offline — local processing only');
sync.pauseCloudSync();
break;
}
});
// Check every 30 seconds
setInterval(() => connectivity.checkConnectivity(), 30_000);Data Conflict Resolution
When the edge device reconnects, its local data may conflict with cloud state (e.g., another operator changed a configuration):
interface VersionedData<T> {
data: T;
version: number;
lastModified: number;
source: 'edge' | 'cloud';
}
type ConflictStrategy = 'cloud-wins' | 'edge-wins' | 'latest-wins' | 'merge';
class ConflictResolver {
resolve<T>(
local: VersionedData<T>,
remote: VersionedData<T>,
strategy: ConflictStrategy
): VersionedData<T> {
// No conflict — versions match
if (local.version === remote.version) return local;
switch (strategy) {
case 'cloud-wins':
return remote;
case 'edge-wins':
return local;
case 'latest-wins':
return local.lastModified > remote.lastModified ? local : remote;
case 'merge':
// Custom merge logic — depends on data type
return this.mergeData(local, remote);
default:
return remote; // Safe default
}
}
private mergeData<T>(
local: VersionedData<T>,
remote: VersionedData<T>
): VersionedData<T> {
// For sensor configs: use cloud for thresholds, edge for calibration
const merged = {
...remote.data, // Cloud settings take precedence
...local.data, // Edge overrides (e.g., local calibration)
};
return {
data: merged as T,
version: Math.max(local.version, remote.version) + 1,
lastModified: Date.now(),
source: 'edge',
};
}
}Common Beginner Mistakes
Mistake 1: Sending Everything to the Cloud
❌ 10,000 sensors → 172 GB/day raw data → cloud → $$$
✅ 10,000 sensors → edge filter → 5 GB/day summaries → cloud → $Fix: Start with the question "what does the cloud actually need?" Usually it's aggregates, alerts, and hourly summaries — not every raw reading.
Mistake 2: No Offline Strategy
❌ Network drops → edge device throws errors → data lost
✅ Network drops → store-and-forward → auto-flush on reconnectFix: Treat network as unreliable from day one. Implement store-and-forward before you think you need it — you will.
Mistake 3: Over-Engineering the Edge
❌ Running Kubernetes + Kafka + PostgreSQL on a Raspberry Pi
✅ Running a simple Node.js process + SQLite + MQTT on a Raspberry PiFix: Match your runtime to your hardware. A Pi has 1-8 GB RAM — Docker Compose is usually more than enough. Save Kubernetes for on-prem servers in the fog layer.
Mistake 4: Ignoring Edge Security
❌ Edge device runs as root, no TLS, default passwords, open ports
✅ Least privilege, mTLS, unique credentials per device, firewall rulesFix: Edge devices are physically accessible — they need more security, not less. We'll cover IoT security in detail in a future post.
Mistake 5: No Model Validation at the Edge
❌ Push new ML model → immediately replace old one → broken predictions
✅ Push new ML model → shadow run alongside old → validate → swapFix: Always run new models in shadow mode first. Compare outputs with the existing model for a validation period before switching.
Summary and Key Takeaways
✅ Edge computing reduces latency, bandwidth, and cloud dependency — process close to the source
✅ Edge, fog, and cloud are distinct layers — edge for filtering, fog for coordination, cloud for global analytics
✅ Data filtering at the edge saves 70-95% bandwidth — only forward significant changes and aggregates
✅ Store-and-forward is essential — buffer data locally when the network drops, flush on reconnect
✅ Threshold alerting at the edge enables sub-10ms response — critical for safety and equipment protection
✅ Match runtime to hardware — Docker Compose for simple deployments, K3s for Kubernetes teams, Greengrass/IoT Edge for cloud-vendor shops
✅ Edge ML inference runs in 5-50ms — anomaly detection, defect detection, predictive maintenance without cloud
✅ Design offline-first from day one — treat connectivity as a bonus, not a requirement
✅ Validate ML models in shadow mode — never hot-swap a model without comparison testing
What's Next
Now that you understand where processing happens — edge, fog, or cloud — it's time to go deep into the protocol that powers most IoT communication.
Next: IOT-5: Deep Dive: MQTT — The IoT Messaging Protocol — MQTT v3.1.1 vs v5.0, topic design patterns, QoS trade-offs in depth, broker clustering (EMQX, HiveMQ), authentication and authorization, and scaling MQTT for millions of connections.
Related Posts
- IoT Patterns & Strategies Roadmap — Complete 12-post series overview and learning paths
- IoT Fundamentals: Architecture & Protocols — Foundation: architecture layers, device types, connectivity
- IoT Communication Protocols: MQTT, CoAP, AMQP & WebSocket — Protocol deep dive with hands-on setup
- Docker & Kubernetes Learning Roadmap — Containerization fundamentals for edge deployments
- Server-Client Architecture Explained — Foundation for distributed system patterns
This is Part 4 of the IoT Patterns & Strategies series. Start from the beginning or jump to any topic that interests you.
📬 Subscribe to Newsletter
Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.
We respect your privacy. Unsubscribe at any time.
💬 Comments
Sign in to leave a comment
We'll never post without your permission.