Back to blog

IoT Edge Computing & Fog Architecture

iotedge-computingarchitecturebackendtutorial
IoT Edge Computing & Fog Architecture

Welcome to Phase 3 of the IoT Patterns & Strategies Roadmap! In Phase 1, we explored IoT architecture layers and device types. In Phase 2, we dug into protocols — MQTT, CoAP, AMQP, and WebSocket. Now the question is: where does the processing happen?

Sending every sensor reading from 100,000 devices straight to the cloud sounds simple until you see the bandwidth bill, the latency numbers, and what happens when the internet goes down. Edge computing flips the model — process data close to where it's generated, send only what the cloud actually needs.

By the end of this post, you'll understand when to use edge, fog, or cloud processing, how to implement common edge patterns, and how to keep your system running when connectivity drops.

What You'll Learn

✅ Understand why edge computing is critical for IoT (latency, bandwidth, resilience)
✅ Compare edge vs fog vs cloud architectures with clear trade-offs
✅ Implement data filtering, aggregation, and store-and-forward at the edge
✅ Evaluate edge runtimes: AWS Greengrass, Azure IoT Edge, K3s, Docker
✅ Design fog computing layers for hierarchical IoT systems
✅ Build offline-first IoT applications that sync when connectivity returns
✅ Run ML inference at the edge for real-time decision making


Why Edge Computing Matters for IoT

The Cloud-Only Problem

Consider a factory with 10,000 sensors, each sending readings every second:

10,000 sensors × 1 reading/sec × 200 bytes = 2 MB/sec = 172 GB/day

Now multiply that by network costs, cloud compute costs, and the 50-200ms round-trip latency to the cloud. A vibration sensor detecting equipment failure needs a response in under 10ms — not 200ms after a round trip to us-east-1.

Three Reasons to Process at the Edge

1. Latency — Some decisions can't wait for the cloud.

ScenarioRequired LatencyCloud Feasible?
Predictive maintenance alert< 100msMaybe
Robotic arm collision avoidance< 10msNo
Smart traffic light adjustment< 50msNo
Environmental monitoring report< 5 minYes
Monthly energy consumption summaryHoursYes

2. Bandwidth — Raw sensor data is expensive to transmit.

3. Resilience — The internet will go down. Your factory shouldn't stop.

Edge processing ensures critical operations continue during network outages. Decisions that affect safety, quality control, and real-time adjustments happen locally — the cloud gets a summary when connectivity returns.


Edge vs Fog vs Cloud

These three terms get used interchangeably, but they represent distinct layers with different responsibilities:

Comparison Table

AspectEdgeFogCloud
LocationOn or next to deviceLocal network (gateway, on-prem server)Remote data center
Latency< 10ms10-100ms50-500ms
Compute powerLimited (ARM, microcontroller)Moderate (x86 server, GPU)Unlimited (auto-scale)
StorageGB rangeTB rangePB range
ConnectivityCan work offlineNeeds local networkNeeds internet
Example hardwareRaspberry Pi, Jetson Nano, ESP32On-prem server, industrial PCAWS, Azure, GCP
Processing typeFiltering, thresholds, simple MLAggregation, regional analytics, model inferenceTraining, global analytics, historical queries
CostLow per device, scales linearlyModerate (shared infrastructure)Pay-per-use, can spike

When to Use Each Layer


Edge Computing Patterns

Pattern 1: Data Filtering

The simplest edge pattern — drop data you don't need before it leaves the device.

// Edge data filter — runs on gateway device
interface SensorReading {
  deviceId: string;
  timestamp: number;
  temperature: number;
  humidity: number;
  vibration: number;
}
 
class EdgeDataFilter {
  private lastSent: Map<string, SensorReading> = new Map();
 
  // Only forward readings that changed significantly
  shouldForward(reading: SensorReading): boolean {
    const last = this.lastSent.get(reading.deviceId);
 
    if (!last) {
      this.lastSent.set(reading.deviceId, reading);
      return true; // Always forward first reading
    }
 
    const tempDelta = Math.abs(reading.temperature - last.temperature);
    const humidDelta = Math.abs(reading.humidity - last.humidity);
    const vibDelta = Math.abs(reading.vibration - last.vibration);
 
    // Forward only if change exceeds thresholds
    if (tempDelta > 0.5 || humidDelta > 2.0 || vibDelta > 10.0) {
      this.lastSent.set(reading.deviceId, reading);
      return true;
    }
 
    return false; // Skip — not significant enough
  }
}
 
// Usage on edge gateway
const filter = new EdgeDataFilter();
const readings = getIncomingSensorReadings(); // From MQTT subscription
 
for (const reading of readings) {
  if (filter.shouldForward(reading)) {
    publishToCloud(reading); // Forward to cloud MQTT broker
  }
  // Silently drop insignificant readings
}

Result: If temperature fluctuates ±0.1°C, you send one reading instead of hundreds. Typical reduction: 70-95% fewer messages.

Pattern 2: Edge Aggregation

Instead of sending every raw reading, compute summaries locally:

interface AggregatedReading {
  deviceId: string;
  windowStart: number;
  windowEnd: number;
  temperature: {
    min: number;
    max: number;
    avg: number;
    count: number;
  };
  humidity: {
    min: number;
    max: number;
    avg: number;
    count: number;
  };
}
 
class EdgeAggregator {
  private windows: Map<string, SensorReading[]> = new Map();
  private windowDuration = 60_000; // 1 minute windows
 
  addReading(reading: SensorReading): void {
    const key = reading.deviceId;
    if (!this.windows.has(key)) {
      this.windows.set(key, []);
    }
    this.windows.get(key)!.push(reading);
  }
 
  flush(deviceId: string): AggregatedReading | null {
    const readings = this.windows.get(deviceId);
    if (!readings || readings.length === 0) return null;
 
    const temps = readings.map(r => r.temperature);
    const humids = readings.map(r => r.humidity);
 
    const result: AggregatedReading = {
      deviceId,
      windowStart: readings[0].timestamp,
      windowEnd: readings[readings.length - 1].timestamp,
      temperature: {
        min: Math.min(...temps),
        max: Math.max(...temps),
        avg: temps.reduce((a, b) => a + b, 0) / temps.length,
        count: temps.length,
      },
      humidity: {
        min: Math.min(...humids),
        max: Math.max(...humids),
        avg: humids.reduce((a, b) => a + b, 0) / humids.length,
        count: humids.length,
      },
    };
 
    this.windows.set(deviceId, []); // Reset window
    return result;
  }
}
 
// Every 60 seconds, flush aggregates to cloud
setInterval(() => {
  for (const deviceId of aggregator.getAllDeviceIds()) {
    const summary = aggregator.flush(deviceId);
    if (summary) {
      publishToCloud(summary); // 1 message instead of 60
    }
  }
}, 60_000);

60 raw readings per minute → 1 aggregated message. The cloud gets min/max/avg — enough for dashboards and trend analysis.

Pattern 3: Threshold Alerting

React to critical conditions instantly at the edge without waiting for the cloud:

interface Alert {
  deviceId: string;
  type: 'warning' | 'critical';
  metric: string;
  value: number;
  threshold: number;
  timestamp: number;
  message: string;
}
 
interface ThresholdRule {
  metric: string;
  warningThreshold: number;
  criticalThreshold: number;
  direction: 'above' | 'below';
}
 
class EdgeAlertEngine {
  private rules: ThresholdRule[] = [
    { metric: 'temperature', warningThreshold: 35, criticalThreshold: 45, direction: 'above' },
    { metric: 'temperature', warningThreshold: 5, criticalThreshold: 0, direction: 'below' },
    { metric: 'vibration', warningThreshold: 50, criticalThreshold: 80, direction: 'above' },
    { metric: 'humidity', warningThreshold: 80, criticalThreshold: 95, direction: 'above' },
  ];
 
  // Debounce: don't spam alerts
  private lastAlert: Map<string, number> = new Map();
  private cooldownMs = 30_000; // 30 second cooldown per device+metric
 
  evaluate(reading: SensorReading): Alert[] {
    const alerts: Alert[] = [];
 
    for (const rule of this.rules) {
      const value = reading[rule.metric as keyof SensorReading] as number;
      if (typeof value !== 'number') continue;
 
      const key = `${reading.deviceId}:${rule.metric}:${rule.direction}`;
      const lastTime = this.lastAlert.get(key) || 0;
 
      if (Date.now() - lastTime < this.cooldownMs) continue;
 
      let triggered = false;
      let type: 'warning' | 'critical' = 'warning';
      let threshold = rule.warningThreshold;
 
      if (rule.direction === 'above') {
        if (value >= rule.criticalThreshold) {
          triggered = true;
          type = 'critical';
          threshold = rule.criticalThreshold;
        } else if (value >= rule.warningThreshold) {
          triggered = true;
          type = 'warning';
        }
      } else {
        if (value <= rule.criticalThreshold) {
          triggered = true;
          type = 'critical';
          threshold = rule.criticalThreshold;
        } else if (value <= rule.warningThreshold) {
          triggered = true;
          type = 'warning';
        }
      }
 
      if (triggered) {
        this.lastAlert.set(key, Date.now());
        alerts.push({
          deviceId: reading.deviceId,
          type,
          metric: rule.metric,
          value,
          threshold,
          timestamp: Date.now(),
          message: `${rule.metric} ${rule.direction} ${threshold}: ${value}`,
        });
      }
    }
 
    return alerts;
  }
}
 
// On edge gateway
const alertEngine = new EdgeAlertEngine();
 
function processReading(reading: SensorReading) {
  const alerts = alertEngine.evaluate(reading);
 
  for (const alert of alerts) {
    // React locally — no cloud needed
    if (alert.type === 'critical') {
      triggerLocalBuzzer();
      activateEmergencyShutdown(alert.deviceId);
    }
 
    // Also forward alert to cloud for logging
    publishToCloud(alert);
  }
}

Key benefit: The buzzer sounds in < 5ms instead of waiting 200ms+ for a cloud round trip.

Pattern 4: Store-and-Forward

Buffer data locally when the network is down, flush when it recovers:

import * as fs from 'fs';
import * as path from 'path';
 
interface QueuedMessage {
  topic: string;
  payload: string;
  timestamp: number;
  retries: number;
}
 
class StoreAndForward {
  private queue: QueuedMessage[] = [];
  private maxQueueSize = 10_000;
  private persistPath: string;
  private isOnline = true;
 
  constructor(persistDir: string) {
    this.persistPath = path.join(persistDir, 'message-queue.json');
    this.loadFromDisk(); // Recover after restart
  }
 
  async send(topic: string, payload: string): Promise<void> {
    if (this.isOnline) {
      try {
        await this.publishToCloud(topic, payload);
        return; // Sent successfully
      } catch {
        this.isOnline = false;
        console.log('Network down — switching to store-and-forward');
      }
    }
 
    // Store locally
    this.enqueue({ topic, payload, timestamp: Date.now(), retries: 0 });
  }
 
  private enqueue(msg: QueuedMessage): void {
    if (this.queue.length >= this.maxQueueSize) {
      // Drop oldest non-critical messages (FIFO eviction)
      this.queue.shift();
    }
    this.queue.push(msg);
    this.saveToDisk(); // Survive device restarts
  }
 
  async flush(): Promise<void> {
    if (this.queue.length === 0) return;
 
    console.log(`Flushing ${this.queue.length} queued messages...`);
    const failed: QueuedMessage[] = [];
 
    for (const msg of this.queue) {
      try {
        await this.publishToCloud(msg.topic, msg.payload);
      } catch {
        if (msg.retries < 3) {
          msg.retries++;
          failed.push(msg);
        } else {
          console.log(`Dropping message after 3 retries: ${msg.topic}`);
        }
      }
    }
 
    this.queue = failed;
    this.saveToDisk();
 
    if (failed.length === 0) {
      this.isOnline = true;
      console.log('All queued messages flushed successfully');
    }
  }
 
  private saveToDisk(): void {
    fs.writeFileSync(this.persistPath, JSON.stringify(this.queue));
  }
 
  private loadFromDisk(): void {
    try {
      const data = fs.readFileSync(this.persistPath, 'utf-8');
      this.queue = JSON.parse(data);
      console.log(`Loaded ${this.queue.length} messages from disk`);
    } catch {
      this.queue = [];
    }
  }
 
  private async publishToCloud(topic: string, payload: string): Promise<void> {
    // Actual MQTT/HTTP publish to cloud broker
    throw new Error('Not implemented — use your MQTT client here');
  }
}
 
// Usage
const saf = new StoreAndForward('/var/edge/data');
 
// Normal operation — sends or queues automatically
await saf.send('factory/zone-1/temperature', JSON.stringify(reading));
 
// Periodic connectivity check and flush
setInterval(async () => {
  if (await checkConnectivity()) {
    await saf.flush();
  }
}, 30_000); // Check every 30 seconds

Fog Computing Architecture

Fog computing sits between edge and cloud — typically on-premise servers or gateways that aggregate data from multiple edge devices. Think of it as a regional processing hub.

When You Need Fog

Edge devices are constrained — a Raspberry Pi can't run complex ML models or correlate data from 500 sensors. But sending everything to the cloud is wasteful. The fog layer handles:

  • Cross-device correlation: "Is the temperature spike on sensor A related to the vibration increase on sensor B?"
  • Regional ML inference: Run pre-trained models on an on-prem GPU server
  • Local dashboards: Operators on the factory floor need real-time views without internet dependency
  • Protocol translation: Bridge MQTT, CoAP, and Modbus devices into a unified data stream

Fog Node Implementation

A fog node typically runs as a service on an on-premise server, collecting from multiple edge devices:

import mqtt from 'mqtt';
 
interface ZoneData {
  zone: string;
  devices: Map<string, SensorReading[]>;
  lastAggregation: number;
}
 
class FogNode {
  private zones: Map<string, ZoneData> = new Map();
  private edgeBroker: mqtt.MqttClient;
  private cloudBroker: mqtt.MqttClient;
 
  constructor(edgeBrokerUrl: string, cloudBrokerUrl: string) {
    // Connect to local MQTT broker (edge devices publish here)
    this.edgeBroker = mqtt.connect(edgeBrokerUrl);
    // Connect to cloud MQTT broker (fog publishes aggregates here)
    this.cloudBroker = mqtt.connect(cloudBrokerUrl);
 
    this.setupSubscriptions();
    this.startAggregationLoop();
  }
 
  private setupSubscriptions(): void {
    // Subscribe to all zone data from edge devices
    this.edgeBroker.subscribe('factory/+/+/data');
 
    this.edgeBroker.on('message', (topic, message) => {
      // topic: factory/{zone}/{device}/data
      const parts = topic.split('/');
      const zone = parts[1];
      const deviceId = parts[2];
      const reading: SensorReading = JSON.parse(message.toString());
 
      this.ingestReading(zone, deviceId, reading);
    });
  }
 
  private ingestReading(zone: string, deviceId: string, reading: SensorReading): void {
    if (!this.zones.has(zone)) {
      this.zones.set(zone, {
        zone,
        devices: new Map(),
        lastAggregation: Date.now(),
      });
    }
 
    const zoneData = this.zones.get(zone)!;
    if (!zoneData.devices.has(deviceId)) {
      zoneData.devices.set(deviceId, []);
    }
    zoneData.devices.get(deviceId)!.push(reading);
 
    // Run cross-device anomaly detection
    this.detectCrossDeviceAnomalies(zone);
  }
 
  private detectCrossDeviceAnomalies(zone: string): void {
    const zoneData = this.zones.get(zone);
    if (!zoneData) return;
 
    // Example: correlate temperature and vibration across devices
    const latestReadings: SensorReading[] = [];
    for (const [, readings] of zoneData.devices) {
      if (readings.length > 0) {
        latestReadings.push(readings[readings.length - 1]);
      }
    }
 
    const avgTemp = latestReadings.reduce((sum, r) => sum + r.temperature, 0)
                    / latestReadings.length;
    const maxVibration = Math.max(...latestReadings.map(r => r.vibration));
 
    // Zone-level alert: high temp + high vibration = potential equipment failure
    if (avgTemp > 40 && maxVibration > 60) {
      this.cloudBroker.publish(
        `alerts/factory/${zone}/anomaly`,
        JSON.stringify({
          type: 'cross_device_anomaly',
          zone,
          avgTemp,
          maxVibration,
          timestamp: Date.now(),
          message: `Zone ${zone}: high temp (${avgTemp.toFixed(1)}°C) + vibration (${maxVibration})`,
        })
      );
    }
  }
 
  private startAggregationLoop(): void {
    // Every 5 minutes, aggregate and send to cloud
    setInterval(() => {
      for (const [zone, zoneData] of this.zones) {
        const summary = this.aggregateZone(zone, zoneData);
 
        this.cloudBroker.publish(
          `factory/${zone}/summary`,
          JSON.stringify(summary)
        );
 
        // Reset device readings after aggregation
        for (const [deviceId] of zoneData.devices) {
          zoneData.devices.set(deviceId, []);
        }
        zoneData.lastAggregation = Date.now();
      }
    }, 5 * 60_000);
  }
 
  private aggregateZone(zone: string, data: ZoneData) {
    const deviceSummaries: Record<string, any> = {};
 
    for (const [deviceId, readings] of data.devices) {
      if (readings.length === 0) continue;
 
      const temps = readings.map(r => r.temperature);
      deviceSummaries[deviceId] = {
        readingCount: readings.length,
        temperature: {
          min: Math.min(...temps),
          max: Math.max(...temps),
          avg: temps.reduce((a, b) => a + b, 0) / temps.length,
        },
      };
    }
 
    return {
      zone,
      windowStart: data.lastAggregation,
      windowEnd: Date.now(),
      deviceCount: data.devices.size,
      devices: deviceSummaries,
    };
  }
}
 
// Start fog node
const fog = new FogNode(
  'mqtt://local-broker:1883',   // Edge devices publish here
  'mqtt://cloud-broker:8883'    // Fog publishes aggregates here
);

Edge Runtime Options

When you decide to run code at the edge, you need a runtime — a way to deploy, manage, and update software on edge devices. Here's how the main options compare:

Comparison Table

FeatureAWS GreengrassAzure IoT EdgeK3sDocker on Edge
ProviderAWSMicrosoftRancher/SUSEDocker Inc
Cloud integrationDeep AWS (IoT Core, Lambda, S3)Deep Azure (IoT Hub, Functions)Any (cloud-agnostic)Any
Programming modelLambda functions, containersModules (Docker containers)Pods (Kubernetes)Containers
Min hardware1 GHz ARM, 128 MB RAM1 GHz x86/ARM, 256 MB RAM512 MB RAM, 1 CPU512 MB RAM
OTA updatesBuilt-in (Greengrass deployments)Built-in (IoT Hub deployments)Helm charts, kubectldocker compose pull
Local messagingLocal MQTT broker + IPCRoute messages between modulesAny (install your own)Any
Offline supportYes (local Lambda execution)Yes (modules run locally)Yes (pods run locally)Yes
Device managementAWS IoT Device ShadowAzure Device TwinManual or RancherManual
CostFree tier + pay for cloud servicesFree tier + pay for cloud servicesFree (open source)Free (open source)
Best forAWS-heavy shopsAzure-heavy shopsCloud-agnostic, Kubernetes teamsSimple deployments, prototypes

AWS IoT Greengrass

Greengrass lets you run Lambda functions and Docker containers on edge devices, with seamless integration to AWS IoT Core:

# greengrass-deployment.yaml
# Greengrass V2 component deployment
RecipeFormatVersion: "2020-01-25"
ComponentName: com.factory.temperature-filter
ComponentVersion: "1.0.0"
Manifests:
  - Platform:
      os: linux
    Lifecycle:
      run: |
        python3 {artifacts:path}/filter.py
    Artifacts:
      - URI: s3://my-bucket/components/filter.py
ComponentDependencies:
  aws.greengrass.Nucleus:
    VersionRequirement: ">=2.0.0"
# filter.py — Greengrass V2 component
import json
import awsiot.greengrasscoreipc as ipc
 
ipc_client = ipc.connect()
 
def on_message(event):
    message = json.loads(event.message.payload.decode())
    temperature = message.get('temperature', 0)
 
    # Only forward significant readings to cloud
    if temperature > 35 or temperature < 5:
        ipc_client.publish_to_iot_core(
            topic_name=f"factory/alerts/{message['deviceId']}",
            qos='AT_LEAST_ONCE',
            payload=json.dumps({
                'alert': 'temperature_anomaly',
                'value': temperature,
                'deviceId': message['deviceId'],
            })
        )
 
# Subscribe to local MQTT topic
ipc_client.subscribe_to_topic(
    topic='factory/+/temperature',
    on_stream_event=on_message,
)

Azure IoT Edge

Azure IoT Edge runs containerized modules on edge devices, managed through Azure IoT Hub:

// deployment.json — Azure IoT Edge deployment manifest
{
  "modulesContent": {
    "$edgeAgent": {
      "properties.desired": {
        "modules": {
          "temperatureFilter": {
            "type": "docker",
            "settings": {
              "image": "myregistry.azurecr.io/temp-filter:1.0",
              "createOptions": "{\"HostConfig\":{\"Privileged\":true}}"
            },
            "status": "running",
            "restartPolicy": "always"
          }
        }
      }
    },
    "$edgeHub": {
      "properties.desired": {
        "routes": {
          "filterToCloud": "FROM /messages/modules/temperatureFilter/outputs/alerts INTO $upstream",
          "sensorToFilter": "FROM /messages/modules/sensor/* INTO BrokeredEndpoint(\"/modules/temperatureFilter/inputs/data\")"
        }
      }
    }
  }
}

K3s — Lightweight Kubernetes at the Edge

K3s is a CNCF-certified Kubernetes distribution built for edge and IoT. It strips Kubernetes down to a single binary under 100 MB:

# Install K3s on an edge device (single command)
curl -sfL https://get.k3s.io | sh -
 
# Check it's running
sudo k3s kubectl get nodes
# NAME          STATUS   ROLES                  AGE   VERSION
# edge-gw-01   Ready    control-plane,master   30s   v1.28.4+k3s1
# edge-processor.yaml — Deploy edge processing as a Kubernetes pod
apiVersion: apps/v1
kind: Deployment
metadata:
  name: edge-data-processor
  namespace: iot
spec:
  replicas: 1
  selector:
    matchLabels:
      app: edge-processor
  template:
    metadata:
      labels:
        app: edge-processor
    spec:
      containers:
      - name: processor
        image: myregistry/edge-processor:1.0
        resources:
          limits:
            cpu: "500m"
            memory: "256Mi"
          requests:
            cpu: "100m"
            memory: "128Mi"
        env:
        - name: MQTT_BROKER
          value: "mqtt://mosquitto:1883"
        - name: CLOUD_BROKER
          value: "mqtts://cloud-broker:8883"
        volumeMounts:
        - name: edge-data
          mountPath: /var/edge/data
      volumes:
      - name: edge-data
        hostPath:
          path: /var/edge/data
          type: DirectoryOrCreate
---
# Local MQTT broker as a Kubernetes service
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mosquitto
  namespace: iot
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mosquitto
  template:
    metadata:
      labels:
        app: mosquitto
    spec:
      containers:
      - name: mosquitto
        image: eclipse-mosquitto:2
        ports:
        - containerPort: 1883
        volumeMounts:
        - name: config
          mountPath: /mosquitto/config
      volumes:
      - name: config
        configMap:
          name: mosquitto-config
---
apiVersion: v1
kind: Service
metadata:
  name: mosquitto
  namespace: iot
spec:
  selector:
    app: mosquitto
  ports:
  - port: 1883
    targetPort: 1883

Why K3s for IoT edge?

  • Single binary, ~100 MB — runs on Raspberry Pi
  • No cloud vendor lock-in
  • Full Kubernetes API — use Helm, kubectl, GitOps
  • Built-in SQLite instead of etcd (lighter)
  • Automatic TLS, traefik ingress included

Docker Compose on Edge

For simpler deployments where Kubernetes is overkill:

# docker-compose.edge.yml
version: "3.8"
services:
  mosquitto:
    image: eclipse-mosquitto:2
    ports:
      - "1883:1883"
    volumes:
      - ./mosquitto/config:/mosquitto/config
      - mosquitto-data:/mosquitto/data
    restart: unless-stopped
 
  edge-processor:
    image: myregistry/edge-processor:1.0
    environment:
      - MQTT_BROKER=mqtt://mosquitto:1883
      - CLOUD_BROKER=mqtts://cloud.example.com:8883
      - STORE_PATH=/data/queue
    volumes:
      - edge-data:/data
    depends_on:
      - mosquitto
    restart: unless-stopped
 
  node-red:
    image: nodered/node-red:latest
    ports:
      - "1880:1880"
    volumes:
      - nodered-data:/data
    restart: unless-stopped
 
volumes:
  mosquitto-data:
  edge-data:
  nodered-data:
# Deploy on edge gateway
docker compose -f docker-compose.edge.yml up -d
 
# Update a single service
docker compose -f docker-compose.edge.yml pull edge-processor
docker compose -f docker-compose.edge.yml up -d edge-processor
 
# Check status
docker compose -f docker-compose.edge.yml ps

Edge-to-Cloud Sync Strategies

The edge and cloud need to stay in sync — the question is how much and how often.

Strategy Comparison

StrategyData FreshnessBandwidthComplexityBest For
Continuous streamingReal-timeHighLowSmall deployments, critical data
Periodic batch uploadMinutes to hoursLowMediumTelemetry, non-critical data
Event-driven uploadOn changeMediumMediumAlerts, state changes
Hybrid (stream + batch)MixedMediumHighProduction systems

Hybrid Sync Implementation

Most production systems use a hybrid approach — stream critical data, batch the rest:

type Priority = 'critical' | 'normal' | 'low';
 
interface SyncConfig {
  critical: { mode: 'stream' };      // Real-time
  normal: { mode: 'batch'; intervalMs: number };  // Every 5 minutes
  low: { mode: 'batch'; intervalMs: number };     // Every 30 minutes
}
 
class HybridSync {
  private config: SyncConfig = {
    critical: { mode: 'stream' },
    normal: { mode: 'batch', intervalMs: 5 * 60_000 },
    low: { mode: 'batch', intervalMs: 30 * 60_000 },
  };
 
  private normalBatch: QueuedMessage[] = [];
  private lowBatch: QueuedMessage[] = [];
  private cloudClient: mqtt.MqttClient;
 
  constructor(cloudBrokerUrl: string) {
    this.cloudClient = mqtt.connect(cloudBrokerUrl);
 
    // Periodic batch flush
    setInterval(() => this.flushBatch('normal'), this.config.normal.intervalMs);
    setInterval(() => this.flushBatch('low'), this.config.low.intervalMs);
  }
 
  async sync(topic: string, payload: string, priority: Priority): Promise<void> {
    const message: QueuedMessage = { topic, payload, timestamp: Date.now(), retries: 0 };
 
    switch (priority) {
      case 'critical':
        // Stream immediately
        await this.publishNow(message);
        break;
      case 'normal':
        this.normalBatch.push(message);
        break;
      case 'low':
        this.lowBatch.push(message);
        break;
    }
  }
 
  private async publishNow(msg: QueuedMessage): Promise<void> {
    this.cloudClient.publish(msg.topic, msg.payload, { qos: 1 });
  }
 
  private flushBatch(priority: 'normal' | 'low'): void {
    const batch = priority === 'normal' ? this.normalBatch : this.lowBatch;
    if (batch.length === 0) return;
 
    // Send as a single compressed batch
    const batchPayload = JSON.stringify({
      batchId: crypto.randomUUID(),
      priority,
      count: batch.length,
      messages: batch,
    });
 
    this.cloudClient.publish(`batch/${priority}`, batchPayload, { qos: 1 });
 
    // Clear batch
    if (priority === 'normal') {
      this.normalBatch = [];
    } else {
      this.lowBatch = [];
    }
  }
}
 
// Usage
const sync = new HybridSync('mqtts://cloud-broker:8883');
 
// Critical alert — stream immediately
await sync.sync('alerts/factory/zone-1', alertPayload, 'critical');
 
// Normal telemetry — batch every 5 minutes
await sync.sync('telemetry/zone-1', telemetryPayload, 'normal');
 
// Environmental logs — batch every 30 minutes
await sync.sync('logs/zone-1', logPayload, 'low');

Edge ML Inference

One of the most powerful edge patterns — run pre-trained machine learning models directly on edge devices for real-time predictions without cloud dependency.

Why ML at the Edge?

FactorCloud MLEdge ML
Latency100-500ms5-50ms
PrivacyData leaves premisesData stays local
CostPay per inferenceOne-time hardware cost
ConnectivityRequiredNot required
Model sizeUnlimitedConstrained (MB range)

Common Edge ML Use Cases

  • Anomaly detection: Predict equipment failure from vibration patterns
  • Computer vision: Defect detection on manufacturing line
  • Sound classification: Detect abnormal machine sounds
  • Predictive maintenance: Estimate remaining useful life (RUL)

Edge Inference with TensorFlow Lite

# edge_inference.py — Run on Raspberry Pi or Jetson Nano
import numpy as np
import json
import paho.mqtt.client as mqtt
 
# TensorFlow Lite — optimized for edge devices
import tflite_runtime.interpreter as tflite
 
class EdgeAnomalyDetector:
    def __init__(self, model_path: str):
        # Load quantized model (typically 1-10 MB)
        self.interpreter = tflite.Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()
 
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()
 
        # Sliding window of recent readings
        self.window_size = 60  # 60 seconds of data
        self.buffer = []
 
    def add_reading(self, reading: dict) -> dict | None:
        """Add a reading and return anomaly prediction if window is full."""
        self.buffer.append([
            reading['temperature'],
            reading['humidity'],
            reading['vibration'],
            reading['pressure'],
        ])
 
        if len(self.buffer) < self.window_size:
            return None  # Not enough data yet
 
        if len(self.buffer) > self.window_size:
            self.buffer.pop(0)  # Sliding window
 
        return self.predict()
 
    def predict(self) -> dict:
        """Run inference on the current window."""
        # Prepare input tensor
        input_data = np.array([self.buffer], dtype=np.float32)
        self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
 
        # Run inference
        self.interpreter.invoke()
 
        # Get output
        output = self.interpreter.get_tensor(self.output_details[0]['index'])
        anomaly_score = float(output[0][0])
 
        return {
            'anomaly_score': anomaly_score,
            'is_anomaly': anomaly_score > 0.8,  # Threshold
            'confidence': min(anomaly_score, 1.0 - anomaly_score) * 2,
        }
 
 
# MQTT integration
detector = EdgeAnomalyDetector('models/anomaly_detector.tflite')
client = mqtt.Client()
 
def on_message(client, userdata, msg):
    reading = json.loads(msg.payload.decode())
    result = detector.add_reading(reading)
 
    if result and result['is_anomaly']:
        # Publish anomaly alert locally and to cloud
        alert = {
            'type': 'ml_anomaly',
            'deviceId': reading['deviceId'],
            'score': result['anomaly_score'],
            'confidence': result['confidence'],
            'timestamp': reading['timestamp'],
        }
        client.publish('alerts/anomaly', json.dumps(alert))
        print(f"⚠️ Anomaly detected: score={result['anomaly_score']:.3f}")
 
client.on_message = on_message
client.connect('localhost', 1883)
client.subscribe('sensors/+/data')
client.loop_forever()

Edge ML Model Update Flow

Models need periodic updates as you train on more data in the cloud:


Offline-First Design Patterns

Edge devices must assume the network will fail. Offline-first design treats connectivity as a bonus, not a requirement.

Principles

  1. Local-first processing — All critical decisions happen locally
  2. Idempotent operations — Safe to replay when reconnecting
  3. Conflict resolution — Handle stale data gracefully
  4. Graceful degradation — Reduce functionality, never crash

Offline-First Architecture

Connectivity State Machine

type ConnectivityState = 'online' | 'degraded' | 'offline';
 
class ConnectivityManager {
  private state: ConnectivityState = 'online';
  private failedPings = 0;
  private maxFailedPings = 3;
  private listeners: ((state: ConnectivityState) => void)[] = [];
 
  async checkConnectivity(): Promise<ConnectivityState> {
    try {
      const start = Date.now();
      await fetch('https://cloud-broker.example.com/health', {
        signal: AbortSignal.timeout(5000),
      });
      const latency = Date.now() - start;
 
      this.failedPings = 0;
 
      if (latency > 2000) {
        this.setState('degraded');
      } else {
        this.setState('online');
      }
    } catch {
      this.failedPings++;
 
      if (this.failedPings >= this.maxFailedPings) {
        this.setState('offline');
      } else {
        this.setState('degraded');
      }
    }
 
    return this.state;
  }
 
  private setState(newState: ConnectivityState): void {
    if (this.state !== newState) {
      console.log(`Connectivity: ${this.state} → ${newState}`);
      this.state = newState;
      this.listeners.forEach(fn => fn(newState));
    }
  }
 
  onStateChange(listener: (state: ConnectivityState) => void): void {
    this.listeners.push(listener);
  }
 
  getState(): ConnectivityState {
    return this.state;
  }
}
 
// Usage
const connectivity = new ConnectivityManager();
 
connectivity.onStateChange((state) => {
  switch (state) {
    case 'online':
      console.log('Full connectivity — streaming to cloud');
      sync.flushAllQueues();
      break;
    case 'degraded':
      console.log('Degraded — batch mode, critical only');
      sync.switchToBatchMode();
      break;
    case 'offline':
      console.log('Offline — local processing only');
      sync.pauseCloudSync();
      break;
  }
});
 
// Check every 30 seconds
setInterval(() => connectivity.checkConnectivity(), 30_000);

Data Conflict Resolution

When the edge device reconnects, its local data may conflict with cloud state (e.g., another operator changed a configuration):

interface VersionedData<T> {
  data: T;
  version: number;
  lastModified: number;
  source: 'edge' | 'cloud';
}
 
type ConflictStrategy = 'cloud-wins' | 'edge-wins' | 'latest-wins' | 'merge';
 
class ConflictResolver {
  resolve<T>(
    local: VersionedData<T>,
    remote: VersionedData<T>,
    strategy: ConflictStrategy
  ): VersionedData<T> {
    // No conflict — versions match
    if (local.version === remote.version) return local;
 
    switch (strategy) {
      case 'cloud-wins':
        return remote;
 
      case 'edge-wins':
        return local;
 
      case 'latest-wins':
        return local.lastModified > remote.lastModified ? local : remote;
 
      case 'merge':
        // Custom merge logic — depends on data type
        return this.mergeData(local, remote);
 
      default:
        return remote; // Safe default
    }
  }
 
  private mergeData<T>(
    local: VersionedData<T>,
    remote: VersionedData<T>
  ): VersionedData<T> {
    // For sensor configs: use cloud for thresholds, edge for calibration
    const merged = {
      ...remote.data,   // Cloud settings take precedence
      ...local.data,    // Edge overrides (e.g., local calibration)
    };
 
    return {
      data: merged as T,
      version: Math.max(local.version, remote.version) + 1,
      lastModified: Date.now(),
      source: 'edge',
    };
  }
}

Common Beginner Mistakes

Mistake 1: Sending Everything to the Cloud

❌ 10,000 sensors → 172 GB/day raw data → cloud → $$$
 
✅ 10,000 sensors → edge filter → 5 GB/day summaries → cloud → $

Fix: Start with the question "what does the cloud actually need?" Usually it's aggregates, alerts, and hourly summaries — not every raw reading.

Mistake 2: No Offline Strategy

❌ Network drops → edge device throws errors → data lost
 
✅ Network drops → store-and-forward → auto-flush on reconnect

Fix: Treat network as unreliable from day one. Implement store-and-forward before you think you need it — you will.

Mistake 3: Over-Engineering the Edge

❌ Running Kubernetes + Kafka + PostgreSQL on a Raspberry Pi
 
✅ Running a simple Node.js process + SQLite + MQTT on a Raspberry Pi

Fix: Match your runtime to your hardware. A Pi has 1-8 GB RAM — Docker Compose is usually more than enough. Save Kubernetes for on-prem servers in the fog layer.

Mistake 4: Ignoring Edge Security

❌ Edge device runs as root, no TLS, default passwords, open ports
 
✅ Least privilege, mTLS, unique credentials per device, firewall rules

Fix: Edge devices are physically accessible — they need more security, not less. We'll cover IoT security in detail in a future post.

Mistake 5: No Model Validation at the Edge

❌ Push new ML model → immediately replace old one → broken predictions
 
✅ Push new ML model → shadow run alongside old → validate → swap

Fix: Always run new models in shadow mode first. Compare outputs with the existing model for a validation period before switching.


Summary and Key Takeaways

Edge computing reduces latency, bandwidth, and cloud dependency — process close to the source
Edge, fog, and cloud are distinct layers — edge for filtering, fog for coordination, cloud for global analytics
Data filtering at the edge saves 70-95% bandwidth — only forward significant changes and aggregates
Store-and-forward is essential — buffer data locally when the network drops, flush on reconnect
Threshold alerting at the edge enables sub-10ms response — critical for safety and equipment protection
Match runtime to hardware — Docker Compose for simple deployments, K3s for Kubernetes teams, Greengrass/IoT Edge for cloud-vendor shops
Edge ML inference runs in 5-50ms — anomaly detection, defect detection, predictive maintenance without cloud
Design offline-first from day one — treat connectivity as a bonus, not a requirement
Validate ML models in shadow mode — never hot-swap a model without comparison testing


What's Next

Now that you understand where processing happens — edge, fog, or cloud — it's time to go deep into the protocol that powers most IoT communication.

Next: IOT-5: Deep Dive: MQTT — The IoT Messaging Protocol — MQTT v3.1.1 vs v5.0, topic design patterns, QoS trade-offs in depth, broker clustering (EMQX, HiveMQ), authentication and authorization, and scaling MQTT for millions of connections.



This is Part 4 of the IoT Patterns & Strategies series. Start from the beginning or jump to any topic that interests you.

📬 Subscribe to Newsletter

Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.

We respect your privacy. Unsubscribe at any time.

💬 Comments

Sign in to leave a comment

We'll never post without your permission.