Back to blog

IoT Data Pipeline: Ingestion, Processing & Storage

iotdata pipelinekafkastream processingbackend
IoT Data Pipeline: Ingestion, Processing & Storage

Welcome to IOT-7 in the IoT Patterns & Strategies Roadmap! So far, we've covered device management and firmware updates. Now we need to handle what devices actually produce: data. Lots of it.

A single temperature sensor might send 1 reading every 5 seconds. 10,000 sensors means 2,000 readings per second. 100,000 sensors means 20,000 per second. Add in vibration sensors, pressure sensors, GPS coordinates, images from edge cameras — and suddenly you're dealing with gigabytes flowing in from the field every minute.

But here's the real challenge: You can't just dump raw sensor data into a database. You need to:

  • Ingest it reliably without losing messages
  • Transform and enrich it in real-time
  • Detect anomalies as data arrives
  • Store it efficiently for both immediate queries and historical analysis
  • Scale from hundreds of devices to millions

This post covers how production IoT systems solve this.

What You'll Learn

✅ Understand IoT data characteristics and why standard databases fail
✅ Design ingestion architectures (MQTT → message queue → processing)
✅ Choose between stream processing vs batch processing with clear trade-offs
✅ Master Lambda and Kappa architectures for IoT workloads
✅ Implement real-time stream processing with Kafka Streams
✅ Transform and enrich data in the pipeline
✅ Detect anomalies and quality issues as data flows
✅ Design storage strategies: hot data, warm data, cold archive
✅ Build a complete end-to-end pipeline with code examples


The IoT Data Challenge

IoT data is fundamentally different from traditional application data. Let's understand why standard approaches break.

Characteristics of IoT Data

Why Traditional Databases Fail

Traditional Database (e.g., PostgreSQL):
- ✗ Optimized for OLTP (frequent updates, point lookups)
- ✗ Row-oriented storage (bad for time-series)
- ✗ Indexes bloat with high-cardinality dimensions (device ID, timestamp)
- ✗ Slow aggregations over time ranges
- ✗ NeedsVACUUM to recover space (stalls queries)
 
IoT Time-Series Database (e.g., InfluxDB):
- ✓ Optimized for OLAP (sequential scans, aggregations)
- ✓ Column-oriented storage (compress billions of monotonic timestamps)
- ✓ Smart indexes for time ranges (not individual rows)
- ✓ Efficient downsampling (1 query → seconds vs minutes)
- ✓ Built-in retention policies (automatic cleanup)

IoT Data Pipeline Architecture

A complete pipeline has 4 layers:

Layer 1: Ingestion (MQTT → Message Queue)

Devices publish to MQTT. A message broker ensures messages don't get lost:

MQTT (protocol) ← Devices (telemetry)

MQTT Broker (Mosquitto, EMQX, HiveMQ) ← Stores messages temporarily

Kafka / RabbitMQ ← Persistent message queue

Stream Processors ← Multiple consumers can process same messages

Layer 2: Processing (Stream Processing)

Real-time transformation, enrichment, and filtering:

Raw Event: {"deviceId":"temp-007","value":22.5,"ts":1673456789}

Transform: Add location from device registry

Enrich:    Add threshold from device config

Filtered:  Only pass if value > threshold

Output: {"deviceId":"temp-007","value":22.5,"location":"Berlin","threshold":25,"alert":false}

Layer 3: Storage (Hot/Warm/Cold)

Different access patterns need different storage:

Hot Data (Recent):      TimescaleDB  → Sub-second queries, dashboards
Warm Data (Historical): ClickHouse   → Batch analytics, reports
Cold Data (Archive):    S3/GCS       → Compliance, long-term analysis

Layer 4: Consumption (APIs, Dashboards, Alerts)

Data flows to end users and systems.


Stream Processing vs Batch Processing

The classic trade-off: Speed vs Simplicity

AspectStream ProcessingBatch Processing
LatencySecondsHours (end-of-day)
Use CaseReal-time alerts, dashboardsAnalytics, reports, ML
StateHard (must manage)Easy (stateless)
ToolsKafka Streams, Flink, Spark StreamingSpark, Hadoop, SQL
CostContinuous (24/7)Variable (compute on demand)
ComplexityHigh (windowing, state)Low (simple SQL)
ExampleAlert if temp > 30°C nowDaily temperature report

When to Use Each

Use Stream Processing if:

  • Need alerts or actions within seconds
  • Dashboard needs live data
  • Anomaly detection must be real-time
  • Example: "Alert when pressure spike detected"

Use Batch Processing if:

  • Analysis can wait hours/days
  • Generating reports or ML training data
  • Simpler logic, less infrastructure
  • Example: "Daily average temperature by location"

Use Both (Lambda Architecture) if:

  • Need both real-time alerts AND accurate daily reports
  • Can tolerate more infrastructure complexity

Lambda vs Kappa Architectures

How to design pipelines that serve both speed and accuracy layers.

Lambda Architecture

Two parallel streams: Speed Layer (real-time, approximate) + Batch Layer (accurate, slow)

Pros:

  • ✓ Fast approximate results + accurate results
  • ✓ Can recover from stream processor failures
  • ✓ Proven pattern at scale (Netflix, Twitter)

Cons:

  • ✗ Double the infrastructure (stream + batch)
  • ✗ Must keep both in sync
  • ✗ Complex serving layer logic

Kappa Architecture

Single stream path. Use event log as replay source.

Pros:

  • ✓ Simpler: one code path
  • ✓ Easier to reason about
  • ✓ Less infrastructure overhead
  • ✓ Can recompute from scratch any time

Cons:

  • ✗ Historical data needed (large Kafka logs)
  • ✗ Recomputation takes time
  • ✗ Stream processor must be deterministic

Lambda vs Kappa Decision Tree

Do you need accurate historical analytics?
├─ Yes
│  └─ Do you have spare infrastructure?
│     ├─ Yes → Lambda (speed + batch layer)
│     └─ No  → Stream-only with replay capability (Kappa)
└─ No
   └─ Kappa (simpler, pure streaming)

Recommendation for IoT:

  • Start with Kappa if ingestion is reliable (Kafka)
  • Upgrade to Lambda if you need bucketing/aggregation that's expensive to recompute

Stream Processing with Kafka Streams

Kafka Streams is a library for processing data in Kafka topics. No separate infrastructure — runs in your application.

Basic Topology: Transform + Filter

// Node.js with Kafka Streams (using Kafka-js client)
const kafka = new Kafka({
  clientId: 'iot-processor',
  brokers: ['kafka:9092'],
});
 
const consumer = kafka.consumer({ groupId: 'iot-processing' });
const producer = kafka.producer();
 
await consumer.connect();
await producer.connect();
 
await consumer.subscribe({ topic: 'devices.telemetry' });
 
await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    // Parse raw event
    const event = JSON.parse(message.value.toString());
 
    // Transform: lookup device metadata
    const device = await getDeviceMetadata(event.deviceId);
 
    // Enrich: add location and threshold
    const enriched = {
      ...event,
      location: device.location,
      threshold: device.threshold,
      timestamp: Date.now(),
    };
 
    // Filter: only alert if exceeds threshold
    if (enriched.value > enriched.threshold) {
      // Send to alerts topic
      await producer.send({
        topic: 'devices.alerts',
        messages: [
          {
            key: enriched.deviceId,
            value: JSON.stringify(enriched),
          },
        ],
      });
    }
 
    // Send all events (enriched) to storage
    await producer.send({
      topic: 'devices.enriched',
      messages: [
        {
          key: enriched.deviceId,
          value: JSON.stringify(enriched),
        },
      ],
    });
  },
});

Stateful Processing: Windowing & Aggregation

Calculate 5-minute averages:

// Pseudo-code for stateful Kafka Streams
// In Java/Scala, use actual Kafka Streams library
 
const windowState = new Map<string, WindowData>();
 
const WINDOW_SIZE = 5 * 60 * 1000; // 5 minutes
 
await consumer.run({
  eachMessage: async ({ message }) => {
    const event = JSON.parse(message.value.toString());
    const deviceId = event.deviceId;
 
    // Create window key: device + time bucket
    const windowStart = Math.floor(event.ts / WINDOW_SIZE) * WINDOW_SIZE;
    const windowKey = `${deviceId}:${windowStart}`;
 
    // Initialize window if new
    if (!windowState.has(windowKey)) {
      windowState.set(windowKey, {
        sum: 0,
        count: 0,
        min: Infinity,
        max: -Infinity,
      });
    }
 
    // Update window
    const window = windowState.get(windowKey);
    window.sum += event.value;
    window.count++;
    window.min = Math.min(window.min, event.value);
    window.max = Math.max(window.max, event.value);
 
    // Emit aggregated result
    const result = {
      deviceId,
      window: {
        start: windowStart,
        end: windowStart + WINDOW_SIZE,
      },
      stats: {
        avg: window.sum / window.count,
        min: window.min,
        max: window.max,
        count: window.count,
      },
    };
 
    await producer.send({
      topic: 'devices.aggregated',
      messages: [{ key: deviceId, value: JSON.stringify(result) }],
    });
  },
});

Data Transformation & Enrichment

Raw sensor data is often incomplete. Add context before storage.

Transformation Pipeline

interface RawEvent {
  deviceId: string;
  sensorId: string;
  value: number;
  ts: number;
}
 
interface EnrichedEvent extends RawEvent {
  location: string;
  deviceType: string;
  zone: string;
  unit: string;
  threshold: number;
  isAnomaly: boolean;
}
 
async function enrichEvent(raw: RawEvent): Promise<EnrichedEvent> {
  // 1. Lookup device metadata
  const device = await deviceRegistry.get(raw.deviceId);
  if (!device) throw new DeviceNotFoundError(raw.deviceId);
 
  // 2. Lookup sensor configuration
  const sensor = device.sensors.find((s) => s.id === raw.sensorId);
  if (!sensor) throw new SensorNotFoundError(raw.sensorId);
 
  // 3. Validate value
  if (raw.value < sensor.min || raw.value > sensor.max) {
    // Out of range
    console.warn(`Out of range: ${raw.deviceId}.${raw.sensorId} = ${raw.value}`);
  }
 
  // 4. Apply unit conversion (if needed)
  const convertedValue = convertUnits(raw.value, sensor.unit, 'SI');
 
  // 5. Detect anomaly
  const historical = await getHistorical(raw.deviceId, raw.sensorId, 24 * 60 * 60 * 1000);
  const isAnomaly = detectAnomaly(
    convertedValue,
    historical,
    sensor.anomalyThreshold,
  );
 
  // 6. Enrich
  return {
    ...raw,
    value: convertedValue,
    location: device.location,
    deviceType: device.deviceType,
    zone: device.zone,
    unit: sensor.unit,
    threshold: sensor.threshold,
    isAnomaly,
  };
}
 
function detectAnomaly(
  value: number,
  historical: number[],
  threshold: number,
): boolean {
  const mean = historical.reduce((a, b) => a + b) / historical.length;
  const std = Math.sqrt(
    historical.reduce((sq, n) => sq + (n - mean) ** 2, 0) / historical.length,
  );
 
  // Flag if > 3 standard deviations from mean
  return Math.abs(value - mean) > threshold * std;
}

Data Quality & Anomaly Detection

Not all data is good data. Detect and quarantine problems early.

Quality Checks

interface DataQualityReport {
  total: number;
  valid: number;
  invalid: number;
  duplicate: number;
  outOfRange: number;
  anomaly: number;
  issues: string[];
}
 
async function validateEvent(event: EnrichedEvent): Promise<DataQualityReport> {
  const report: DataQualityReport = {
    total: 1,
    valid: 1,
    invalid: 0,
    duplicate: 0,
    outOfRange: 0,
    anomaly: 0,
    issues: [],
  };
 
  // Check 1: Missing fields
  if (!event.deviceId || !event.sensorId || event.value === undefined) {
    report.valid = 0;
    report.invalid = 1;
    report.issues.push('Missing required fields');
    return report;
  }
 
  // Check 2: Duplicate detection (seen recently?)
  const recentKey = `${event.deviceId}:${event.sensorId}:${event.value}`;
  const lastSeen = await cache.get(recentKey);
  if (lastSeen && Date.now() - lastSeen < 1000) {
    report.duplicate = 1;
    report.issues.push(`Duplicate within 1s of last event`);
  }
 
  // Check 3: Out of range
  if (event.value < -50 || event.value > 150) {
    // Temperature sensor
    report.outOfRange = 1;
    report.issues.push(`Out of valid range: ${event.value}`);
  }
 
  // Check 4: Anomaly (already detected during enrichment)
  if (event.isAnomaly) {
    report.anomaly = 1;
    report.issues.push('Detected anomaly');
  }
 
  // Check 5: Schema validation
  // (done at ingestion, but good to re-check)
 
  return report;
}
 
// Route events based on quality
async function routeEvent(event: EnrichedEvent) {
  const quality = await validateEvent(event);
 
  if (quality.issues.length > 0) {
    // Send to quarantine topic for investigation
    await producer.send({
      topic: 'devices.quarantine',
      messages: [
        {
          key: event.deviceId,
          value: JSON.stringify({ event, quality }),
        },
      ],
    });
    return;
  }
 
  // Send to storage
  await producer.send({
    topic: 'devices.enriched',
    messages: [{ key: event.deviceId, value: JSON.stringify(event) }],
  });
}

Storage: Hot/Warm/Cold Tiers

The 90-10 rule: 90% of queries access recent data, 10% access historical.

Tier Strategy

Data Lifecycle

// Lifecycle management
interface StorageConfig {
  hot: {
    database: 'timescaledb';
    retention: 7 * 24 * 60 * 60 * 1000; // 7 days
    compression: 'snappy';
  };
  warm: {
    database: 'clickhouse';
    retention: 365 * 24 * 60 * 60 * 1000; // 1 year
    compression: 'gzip';
  };
  cold: {
    storage: 's3';
    bucket: 'iot-archive';
    retention: null; // Indefinite
    format: 'parquet';
  };
}
 
// Every hour, migrate old data
setInterval(async () => {
  const now = Date.now();
 
  // Send 7-100 day old data from TimescaleDB to ClickHouse
  const toDrop = now - 14 * 24 * 60 * 60 * 1000; // Older than 14 days
  const toWarm = now - 7 * 24 * 60 * 60 * 1000; // Older than 7 days
 
  const warmData = await timescaledb.query(`
    SELECT * FROM events
    WHERE ts > ${toWarm} AND ts < ${toDrop}
  `);
 
  // Insert into ClickHouse
  await clickhouse.insert('events', warmData);
 
  // Delete from TimescaleDB
  await timescaledb.query(`
    DELETE FROM events WHERE ts < ${toWarm}
  `);
 
  // Every week, archive ClickHouse to S3
  const toArchive = now - 365 * 24 * 60 * 60 * 1000;
  const archiveData = await clickhouse.query(`
    SELECT * FROM events WHERE ts < ${toArchive}
  `);
 
  // Convert to Parquet and upload to S3
  const parquetFile = await convertToParquet(archiveData);
  await s3.upload(`events/${datePartition(toArchive)}.parquet`, parquetFile);
 
  // Delete from ClickHouse
  await clickhouse.query(`DELETE FROM events WHERE ts < ${toArchive}`);
}, 60 * 60 * 1000); // Every hour

Building a Complete IoT Pipeline

Here's how it all fits together: MQTT → Kafka → Processing → TimescaleDB → Grafana

Architecture Diagram

Docker Compose Setup

# docker-compose.yml for IoT pipeline
version: '3.8'
 
services:
  # MQTT Broker
  emqx:
    image: emqx/emqx:5.0
    ports:
      - '1883:1883' # MQTT
      - '8083:8083' # WebSocket
      - '18083:18083' # Dashboard
    environment:
      EMQX_LOADED_PLUGINS: 'emqx_management,emqx_recon'
 
  # Kafka
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - '9092:9092'
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
 
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    ports:
      - '2181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
 
  # TimescaleDB (PostgreSQL + TimescaleDB extension)
  timescaledb:
    image: timescale/timescaledb:latest-pg15
    ports:
      - '5432:5432'
    environment:
      POSTGRES_PASSWORD: password
      POSTGRES_DB: iot
 
  # Stream Processor (runs Node.js app)
  processor:
    build: .
    depends_on:
      - kafka
      - emqx
      - timescaledb
    environment:
      KAFKA_BROKERS: kafka:29092
      MQTT_URL: mqtt://emqx:1883
      DATABASE_URL: postgresql://postgres:password@timescaledb:5432/iot
 
  # Grafana for dashboards
  grafana:
    image: grafana/grafana:latest
    ports:
      - '3000:3000'
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
    depends_on:
      - timescaledb

Stream Processor Application

// processor.js - The glue that binds it all together
const mqtt = require('mqtt');
const { Kafka } = require('kafkajs');
const { Client: PG } = require('pg');
 
const kafka = new Kafka({
  clientId: 'iot-processor',
  brokers: process.env.KAFKA_BROKERS.split(','),
});
 
const consumer = kafka.consumer({ groupId: 'iot-processing' });
const producer = kafka.producer();
 
const pgClient = new PG({
  connectionString: process.env.DATABASE_URL,
});
 
const mqttClient = mqtt.connect(process.env.MQTT_URL);
 
async function start() {
  await consumer.connect();
  await producer.connect();
  await pgClient.connect();
 
  // Subscribe to raw telemetry
  await consumer.subscribe({ topic: 'devices.telemetry' });
 
  console.log('🚀 IoT Stream Processor started');
 
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      try {
        const event = JSON.parse(message.value.toString());
 
        // Transform: add timestamp
        event.processedAt = Date.now();
 
        // Enrich: add device metadata
        const { rows } = await pgClient.query(
          'SELECT * FROM devices WHERE device_id = $1',
          [event.deviceId],
        );
        if (rows.length === 0) {
          console.warn(`Device not found: ${event.deviceId}`);
          return;
        }
 
        event.location = rows[0].location;
        event.zone = rows[0].zone;
 
        // Detect anomaly
        event.isAnomaly = event.value > rows[0].threshold;
 
        // Insert into TimescaleDB
        await pgClient.query(
          `INSERT INTO metrics (device_id, value, location, is_anomaly, ts)
           VALUES ($1, $2, $3, $4, to_timestamp($5 / 1000.0))`,
          [
            event.deviceId,
            event.value,
            event.location,
            event.isAnomaly,
            event.ts,
          ],
        );
 
        // If anomaly, send alert
        if (event.isAnomaly) {
          await producer.send({
            topic: 'alerts',
            messages: [{ key: event.deviceId, value: JSON.stringify(event) }],
          });
        }
      } catch (error) {
        console.error('Error processing message:', error);
      }
    },
  });
}
 
start().catch(console.error);

Summary: IoT Data Pipeline Best Practices

1. **Ingest from devices** → MQTT Broker (Mosquitto, EMQX)
2. **Buffer in queue** → Kafka (guarantees durability)
3. **Stream process** → Kafka Streams (transform, enrich, filter)
4. **Detect anomalies** → Real-time thresholds + statistical models
5. **Store hot data** → TimescaleDB (7 days, sub-100ms queries)
6. **Migrate warm data** → ClickHouse (analytical queries, 1 year)
7. **Archive cold data** → S3 Parquet (compliance, indefinite)
8. **Visualize** → Grafana dashboards
9. **Alert** → Automatic rule evaluation
10. **Iterate** → Monitor pipeline health, adjust thresholds

Checklist for Building Your Pipeline

  • ✅ MQTT broker is resilient (cluster or HA setup)
  • ✅ Kafka topics are partitioned by device ID (parallelism)
  • ✅ Stream processor handles failures gracefully (exactly-once semantics)
  • ✅ Data enrichment is cached (not hitting DB for every event)
  • ✅ Anomaly detection has baselines (not just static thresholds)
  • ✅ Hot storage has TTL policies (auto-delete old data)
  • ✅ Warm storage is queryable (SQL interface)
  • ✅ Cold storage is cost-optimized (Parquet, compression)
  • ✅ Monitoring covers pipeline lag (broker → processor → storage)

  • Kafka: The Definitive Guide (Neha Narkhede et al.)
  • Designing Data-Intensive Applications (Martin Kleppmann)
  • TimescaleDB Documentation: Best practices for time-series
  • ClickHouse Documentation: Analytics on IoT data

Series: IoT Patterns & Strategies Roadmap
Previous: Device Management & OTA Updates
Next: IoT Security: Device Identity, Encryption & Zero Trust

📬 Subscribe to Newsletter

Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.

We respect your privacy. Unsubscribe at any time.

💬 Comments

Sign in to leave a comment

We'll never post without your permission.