IoT Data Pipeline: Ingestion, Processing & Storage

Welcome to IOT-7 in the IoT Patterns & Strategies Roadmap! So far, we've covered device management and firmware updates. Now we need to handle what devices actually produce: data. Lots of it.
A single temperature sensor might send 1 reading every 5 seconds. 10,000 sensors means 2,000 readings per second. 100,000 sensors means 20,000 per second. Add in vibration sensors, pressure sensors, GPS coordinates, images from edge cameras — and suddenly you're dealing with gigabytes flowing in from the field every minute.
But here's the real challenge: You can't just dump raw sensor data into a database. You need to:
- Ingest it reliably without losing messages
- Transform and enrich it in real-time
- Detect anomalies as data arrives
- Store it efficiently for both immediate queries and historical analysis
- Scale from hundreds of devices to millions
This post covers how production IoT systems solve this.
What You'll Learn
✅ Understand IoT data characteristics and why standard databases fail
✅ Design ingestion architectures (MQTT → message queue → processing)
✅ Choose between stream processing vs batch processing with clear trade-offs
✅ Master Lambda and Kappa architectures for IoT workloads
✅ Implement real-time stream processing with Kafka Streams
✅ Transform and enrich data in the pipeline
✅ Detect anomalies and quality issues as data flows
✅ Design storage strategies: hot data, warm data, cold archive
✅ Build a complete end-to-end pipeline with code examples
The IoT Data Challenge
IoT data is fundamentally different from traditional application data. Let's understand why standard approaches break.
Characteristics of IoT Data
Why Traditional Databases Fail
Traditional Database (e.g., PostgreSQL):
- ✗ Optimized for OLTP (frequent updates, point lookups)
- ✗ Row-oriented storage (bad for time-series)
- ✗ Indexes bloat with high-cardinality dimensions (device ID, timestamp)
- ✗ Slow aggregations over time ranges
- ✗ NeedsVACUUM to recover space (stalls queries)
IoT Time-Series Database (e.g., InfluxDB):
- ✓ Optimized for OLAP (sequential scans, aggregations)
- ✓ Column-oriented storage (compress billions of monotonic timestamps)
- ✓ Smart indexes for time ranges (not individual rows)
- ✓ Efficient downsampling (1 query → seconds vs minutes)
- ✓ Built-in retention policies (automatic cleanup)IoT Data Pipeline Architecture
A complete pipeline has 4 layers:
Layer 1: Ingestion (MQTT → Message Queue)
Devices publish to MQTT. A message broker ensures messages don't get lost:
MQTT (protocol) ← Devices (telemetry)
↓
MQTT Broker (Mosquitto, EMQX, HiveMQ) ← Stores messages temporarily
↓
Kafka / RabbitMQ ← Persistent message queue
↓
Stream Processors ← Multiple consumers can process same messagesLayer 2: Processing (Stream Processing)
Real-time transformation, enrichment, and filtering:
Raw Event: {"deviceId":"temp-007","value":22.5,"ts":1673456789}
↓
Transform: Add location from device registry
↓
Enrich: Add threshold from device config
↓
Filtered: Only pass if value > threshold
↓
Output: {"deviceId":"temp-007","value":22.5,"location":"Berlin","threshold":25,"alert":false}Layer 3: Storage (Hot/Warm/Cold)
Different access patterns need different storage:
Hot Data (Recent): TimescaleDB → Sub-second queries, dashboards
Warm Data (Historical): ClickHouse → Batch analytics, reports
Cold Data (Archive): S3/GCS → Compliance, long-term analysisLayer 4: Consumption (APIs, Dashboards, Alerts)
Data flows to end users and systems.
Stream Processing vs Batch Processing
The classic trade-off: Speed vs Simplicity
| Aspect | Stream Processing | Batch Processing |
|---|---|---|
| Latency | Seconds | Hours (end-of-day) |
| Use Case | Real-time alerts, dashboards | Analytics, reports, ML |
| State | Hard (must manage) | Easy (stateless) |
| Tools | Kafka Streams, Flink, Spark Streaming | Spark, Hadoop, SQL |
| Cost | Continuous (24/7) | Variable (compute on demand) |
| Complexity | High (windowing, state) | Low (simple SQL) |
| Example | Alert if temp > 30°C now | Daily temperature report |
When to Use Each
Use Stream Processing if:
- Need alerts or actions within seconds
- Dashboard needs live data
- Anomaly detection must be real-time
- Example: "Alert when pressure spike detected"
Use Batch Processing if:
- Analysis can wait hours/days
- Generating reports or ML training data
- Simpler logic, less infrastructure
- Example: "Daily average temperature by location"
Use Both (Lambda Architecture) if:
- Need both real-time alerts AND accurate daily reports
- Can tolerate more infrastructure complexity
Lambda vs Kappa Architectures
How to design pipelines that serve both speed and accuracy layers.
Lambda Architecture
Two parallel streams: Speed Layer (real-time, approximate) + Batch Layer (accurate, slow)
Pros:
- ✓ Fast approximate results + accurate results
- ✓ Can recover from stream processor failures
- ✓ Proven pattern at scale (Netflix, Twitter)
Cons:
- ✗ Double the infrastructure (stream + batch)
- ✗ Must keep both in sync
- ✗ Complex serving layer logic
Kappa Architecture
Single stream path. Use event log as replay source.
Pros:
- ✓ Simpler: one code path
- ✓ Easier to reason about
- ✓ Less infrastructure overhead
- ✓ Can recompute from scratch any time
Cons:
- ✗ Historical data needed (large Kafka logs)
- ✗ Recomputation takes time
- ✗ Stream processor must be deterministic
Lambda vs Kappa Decision Tree
Do you need accurate historical analytics?
├─ Yes
│ └─ Do you have spare infrastructure?
│ ├─ Yes → Lambda (speed + batch layer)
│ └─ No → Stream-only with replay capability (Kappa)
└─ No
└─ Kappa (simpler, pure streaming)Recommendation for IoT:
- Start with Kappa if ingestion is reliable (Kafka)
- Upgrade to Lambda if you need bucketing/aggregation that's expensive to recompute
Stream Processing with Kafka Streams
Kafka Streams is a library for processing data in Kafka topics. No separate infrastructure — runs in your application.
Basic Topology: Transform + Filter
// Node.js with Kafka Streams (using Kafka-js client)
const kafka = new Kafka({
clientId: 'iot-processor',
brokers: ['kafka:9092'],
});
const consumer = kafka.consumer({ groupId: 'iot-processing' });
const producer = kafka.producer();
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: 'devices.telemetry' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// Parse raw event
const event = JSON.parse(message.value.toString());
// Transform: lookup device metadata
const device = await getDeviceMetadata(event.deviceId);
// Enrich: add location and threshold
const enriched = {
...event,
location: device.location,
threshold: device.threshold,
timestamp: Date.now(),
};
// Filter: only alert if exceeds threshold
if (enriched.value > enriched.threshold) {
// Send to alerts topic
await producer.send({
topic: 'devices.alerts',
messages: [
{
key: enriched.deviceId,
value: JSON.stringify(enriched),
},
],
});
}
// Send all events (enriched) to storage
await producer.send({
topic: 'devices.enriched',
messages: [
{
key: enriched.deviceId,
value: JSON.stringify(enriched),
},
],
});
},
});Stateful Processing: Windowing & Aggregation
Calculate 5-minute averages:
// Pseudo-code for stateful Kafka Streams
// In Java/Scala, use actual Kafka Streams library
const windowState = new Map<string, WindowData>();
const WINDOW_SIZE = 5 * 60 * 1000; // 5 minutes
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString());
const deviceId = event.deviceId;
// Create window key: device + time bucket
const windowStart = Math.floor(event.ts / WINDOW_SIZE) * WINDOW_SIZE;
const windowKey = `${deviceId}:${windowStart}`;
// Initialize window if new
if (!windowState.has(windowKey)) {
windowState.set(windowKey, {
sum: 0,
count: 0,
min: Infinity,
max: -Infinity,
});
}
// Update window
const window = windowState.get(windowKey);
window.sum += event.value;
window.count++;
window.min = Math.min(window.min, event.value);
window.max = Math.max(window.max, event.value);
// Emit aggregated result
const result = {
deviceId,
window: {
start: windowStart,
end: windowStart + WINDOW_SIZE,
},
stats: {
avg: window.sum / window.count,
min: window.min,
max: window.max,
count: window.count,
},
};
await producer.send({
topic: 'devices.aggregated',
messages: [{ key: deviceId, value: JSON.stringify(result) }],
});
},
});Data Transformation & Enrichment
Raw sensor data is often incomplete. Add context before storage.
Transformation Pipeline
interface RawEvent {
deviceId: string;
sensorId: string;
value: number;
ts: number;
}
interface EnrichedEvent extends RawEvent {
location: string;
deviceType: string;
zone: string;
unit: string;
threshold: number;
isAnomaly: boolean;
}
async function enrichEvent(raw: RawEvent): Promise<EnrichedEvent> {
// 1. Lookup device metadata
const device = await deviceRegistry.get(raw.deviceId);
if (!device) throw new DeviceNotFoundError(raw.deviceId);
// 2. Lookup sensor configuration
const sensor = device.sensors.find((s) => s.id === raw.sensorId);
if (!sensor) throw new SensorNotFoundError(raw.sensorId);
// 3. Validate value
if (raw.value < sensor.min || raw.value > sensor.max) {
// Out of range
console.warn(`Out of range: ${raw.deviceId}.${raw.sensorId} = ${raw.value}`);
}
// 4. Apply unit conversion (if needed)
const convertedValue = convertUnits(raw.value, sensor.unit, 'SI');
// 5. Detect anomaly
const historical = await getHistorical(raw.deviceId, raw.sensorId, 24 * 60 * 60 * 1000);
const isAnomaly = detectAnomaly(
convertedValue,
historical,
sensor.anomalyThreshold,
);
// 6. Enrich
return {
...raw,
value: convertedValue,
location: device.location,
deviceType: device.deviceType,
zone: device.zone,
unit: sensor.unit,
threshold: sensor.threshold,
isAnomaly,
};
}
function detectAnomaly(
value: number,
historical: number[],
threshold: number,
): boolean {
const mean = historical.reduce((a, b) => a + b) / historical.length;
const std = Math.sqrt(
historical.reduce((sq, n) => sq + (n - mean) ** 2, 0) / historical.length,
);
// Flag if > 3 standard deviations from mean
return Math.abs(value - mean) > threshold * std;
}Data Quality & Anomaly Detection
Not all data is good data. Detect and quarantine problems early.
Quality Checks
interface DataQualityReport {
total: number;
valid: number;
invalid: number;
duplicate: number;
outOfRange: number;
anomaly: number;
issues: string[];
}
async function validateEvent(event: EnrichedEvent): Promise<DataQualityReport> {
const report: DataQualityReport = {
total: 1,
valid: 1,
invalid: 0,
duplicate: 0,
outOfRange: 0,
anomaly: 0,
issues: [],
};
// Check 1: Missing fields
if (!event.deviceId || !event.sensorId || event.value === undefined) {
report.valid = 0;
report.invalid = 1;
report.issues.push('Missing required fields');
return report;
}
// Check 2: Duplicate detection (seen recently?)
const recentKey = `${event.deviceId}:${event.sensorId}:${event.value}`;
const lastSeen = await cache.get(recentKey);
if (lastSeen && Date.now() - lastSeen < 1000) {
report.duplicate = 1;
report.issues.push(`Duplicate within 1s of last event`);
}
// Check 3: Out of range
if (event.value < -50 || event.value > 150) {
// Temperature sensor
report.outOfRange = 1;
report.issues.push(`Out of valid range: ${event.value}`);
}
// Check 4: Anomaly (already detected during enrichment)
if (event.isAnomaly) {
report.anomaly = 1;
report.issues.push('Detected anomaly');
}
// Check 5: Schema validation
// (done at ingestion, but good to re-check)
return report;
}
// Route events based on quality
async function routeEvent(event: EnrichedEvent) {
const quality = await validateEvent(event);
if (quality.issues.length > 0) {
// Send to quarantine topic for investigation
await producer.send({
topic: 'devices.quarantine',
messages: [
{
key: event.deviceId,
value: JSON.stringify({ event, quality }),
},
],
});
return;
}
// Send to storage
await producer.send({
topic: 'devices.enriched',
messages: [{ key: event.deviceId, value: JSON.stringify(event) }],
});
}Storage: Hot/Warm/Cold Tiers
The 90-10 rule: 90% of queries access recent data, 10% access historical.
Tier Strategy
Data Lifecycle
// Lifecycle management
interface StorageConfig {
hot: {
database: 'timescaledb';
retention: 7 * 24 * 60 * 60 * 1000; // 7 days
compression: 'snappy';
};
warm: {
database: 'clickhouse';
retention: 365 * 24 * 60 * 60 * 1000; // 1 year
compression: 'gzip';
};
cold: {
storage: 's3';
bucket: 'iot-archive';
retention: null; // Indefinite
format: 'parquet';
};
}
// Every hour, migrate old data
setInterval(async () => {
const now = Date.now();
// Send 7-100 day old data from TimescaleDB to ClickHouse
const toDrop = now - 14 * 24 * 60 * 60 * 1000; // Older than 14 days
const toWarm = now - 7 * 24 * 60 * 60 * 1000; // Older than 7 days
const warmData = await timescaledb.query(`
SELECT * FROM events
WHERE ts > ${toWarm} AND ts < ${toDrop}
`);
// Insert into ClickHouse
await clickhouse.insert('events', warmData);
// Delete from TimescaleDB
await timescaledb.query(`
DELETE FROM events WHERE ts < ${toWarm}
`);
// Every week, archive ClickHouse to S3
const toArchive = now - 365 * 24 * 60 * 60 * 1000;
const archiveData = await clickhouse.query(`
SELECT * FROM events WHERE ts < ${toArchive}
`);
// Convert to Parquet and upload to S3
const parquetFile = await convertToParquet(archiveData);
await s3.upload(`events/${datePartition(toArchive)}.parquet`, parquetFile);
// Delete from ClickHouse
await clickhouse.query(`DELETE FROM events WHERE ts < ${toArchive}`);
}, 60 * 60 * 1000); // Every hourBuilding a Complete IoT Pipeline
Here's how it all fits together: MQTT → Kafka → Processing → TimescaleDB → Grafana
Architecture Diagram
Docker Compose Setup
# docker-compose.yml for IoT pipeline
version: '3.8'
services:
# MQTT Broker
emqx:
image: emqx/emqx:5.0
ports:
- '1883:1883' # MQTT
- '8083:8083' # WebSocket
- '18083:18083' # Dashboard
environment:
EMQX_LOADED_PLUGINS: 'emqx_management,emqx_recon'
# Kafka
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- '9092:9092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
# TimescaleDB (PostgreSQL + TimescaleDB extension)
timescaledb:
image: timescale/timescaledb:latest-pg15
ports:
- '5432:5432'
environment:
POSTGRES_PASSWORD: password
POSTGRES_DB: iot
# Stream Processor (runs Node.js app)
processor:
build: .
depends_on:
- kafka
- emqx
- timescaledb
environment:
KAFKA_BROKERS: kafka:29092
MQTT_URL: mqtt://emqx:1883
DATABASE_URL: postgresql://postgres:password@timescaledb:5432/iot
# Grafana for dashboards
grafana:
image: grafana/grafana:latest
ports:
- '3000:3000'
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
depends_on:
- timescaledbStream Processor Application
// processor.js - The glue that binds it all together
const mqtt = require('mqtt');
const { Kafka } = require('kafkajs');
const { Client: PG } = require('pg');
const kafka = new Kafka({
clientId: 'iot-processor',
brokers: process.env.KAFKA_BROKERS.split(','),
});
const consumer = kafka.consumer({ groupId: 'iot-processing' });
const producer = kafka.producer();
const pgClient = new PG({
connectionString: process.env.DATABASE_URL,
});
const mqttClient = mqtt.connect(process.env.MQTT_URL);
async function start() {
await consumer.connect();
await producer.connect();
await pgClient.connect();
// Subscribe to raw telemetry
await consumer.subscribe({ topic: 'devices.telemetry' });
console.log('🚀 IoT Stream Processor started');
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const event = JSON.parse(message.value.toString());
// Transform: add timestamp
event.processedAt = Date.now();
// Enrich: add device metadata
const { rows } = await pgClient.query(
'SELECT * FROM devices WHERE device_id = $1',
[event.deviceId],
);
if (rows.length === 0) {
console.warn(`Device not found: ${event.deviceId}`);
return;
}
event.location = rows[0].location;
event.zone = rows[0].zone;
// Detect anomaly
event.isAnomaly = event.value > rows[0].threshold;
// Insert into TimescaleDB
await pgClient.query(
`INSERT INTO metrics (device_id, value, location, is_anomaly, ts)
VALUES ($1, $2, $3, $4, to_timestamp($5 / 1000.0))`,
[
event.deviceId,
event.value,
event.location,
event.isAnomaly,
event.ts,
],
);
// If anomaly, send alert
if (event.isAnomaly) {
await producer.send({
topic: 'alerts',
messages: [{ key: event.deviceId, value: JSON.stringify(event) }],
});
}
} catch (error) {
console.error('Error processing message:', error);
}
},
});
}
start().catch(console.error);Summary: IoT Data Pipeline Best Practices
1. **Ingest from devices** → MQTT Broker (Mosquitto, EMQX)
2. **Buffer in queue** → Kafka (guarantees durability)
3. **Stream process** → Kafka Streams (transform, enrich, filter)
4. **Detect anomalies** → Real-time thresholds + statistical models
5. **Store hot data** → TimescaleDB (7 days, sub-100ms queries)
6. **Migrate warm data** → ClickHouse (analytical queries, 1 year)
7. **Archive cold data** → S3 Parquet (compliance, indefinite)
8. **Visualize** → Grafana dashboards
9. **Alert** → Automatic rule evaluation
10. **Iterate** → Monitor pipeline health, adjust thresholdsChecklist for Building Your Pipeline
- ✅ MQTT broker is resilient (cluster or HA setup)
- ✅ Kafka topics are partitioned by device ID (parallelism)
- ✅ Stream processor handles failures gracefully (exactly-once semantics)
- ✅ Data enrichment is cached (not hitting DB for every event)
- ✅ Anomaly detection has baselines (not just static thresholds)
- ✅ Hot storage has TTL policies (auto-delete old data)
- ✅ Warm storage is queryable (SQL interface)
- ✅ Cold storage is cost-optimized (Parquet, compression)
- ✅ Monitoring covers pipeline lag (broker → processor → storage)
Recommended Reading
- Kafka: The Definitive Guide (Neha Narkhede et al.)
- Designing Data-Intensive Applications (Martin Kleppmann)
- TimescaleDB Documentation: Best practices for time-series
- ClickHouse Documentation: Analytics on IoT data
Series: IoT Patterns & Strategies Roadmap
Previous: Device Management & OTA Updates
Next: IoT Security: Device Identity, Encryption & Zero Trust
📬 Subscribe to Newsletter
Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.
We respect your privacy. Unsubscribe at any time.
💬 Comments
Sign in to leave a comment
We'll never post without your permission.