IoT Ingestion API Design Guide

When you connect thousands of devices to the internet, they all want to send data — temperature readings, GPS coordinates, heartbeat signals, error logs. The question isn't whether you need an ingestion API, it's whether your ingestion API can handle what's coming.
An ingestion API is the entry point for all device-generated data. Unlike a typical REST API where a user clicks a button and waits for a response, ingestion APIs deal with machines that send data continuously, often in bursts, sometimes unreliably, and always at scale.
This guide walks through how to design an ingestion API that can handle real-world IoT workloads without falling over.
Time commitment: 2-3 hours
Prerequisites: Understanding of REST APIs, HTTP, and basic message queue concepts
What You'll Learn
✅ What makes IoT ingestion different from typical APIs
✅ How to design ingestion endpoints for high-throughput device data
✅ Batching strategies to reduce network overhead
✅ Payload validation and schema enforcement for device telemetry
✅ Backpressure and flow control to protect your backend
✅ Storage strategies — time-series databases, message queues, and data lakes
✅ Authentication and device identity management
✅ Practical implementation with Python (FastAPI) and MQTT
1. What Is an Ingestion API?
An ingestion API receives, validates, and routes incoming data from external sources into your system. In IoT, those sources are devices — sensors, actuators, gateways, and edge nodes.
How It Differs from a Typical REST API
| Characteristic | Typical REST API | IoT Ingestion API |
|---|---|---|
| Client type | Humans (via browser/app) | Machines (sensors, devices) |
| Traffic pattern | Request-response, sporadic | Continuous streams, bursty |
| Payload size | Varies (KB to MB) | Small but frequent (bytes to KB) |
| Throughput | Hundreds to thousands RPS | Tens of thousands to millions RPS |
| Latency requirement | < 500ms response | Acknowledge fast, process later |
| Data format | JSON | JSON, CBOR, Protobuf, binary |
| Connection | Short-lived HTTP | Long-lived (MQTT, WebSocket) or HTTP |
| Error handling | Return error to user | Buffer, retry, never lose data |
The fundamental difference: a REST API serves users who expect immediate, meaningful responses. An ingestion API accepts data from machines that just need a quick "got it" — the real processing happens asynchronously.
2. Ingestion Patterns
There are three main patterns for how devices send data to your ingestion API.
Pattern 1: HTTP Push (Request-Response)
The simplest approach. Devices send HTTP POST requests with telemetry data.
# FastAPI ingestion endpoint
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional
import uuid
app = FastAPI()
class TelemetryPayload(BaseModel):
device_id: str = Field(..., min_length=1, max_length=64)
timestamp: datetime
metrics: dict
metadata: Optional[dict] = None
@app.post("/api/v1/ingest", status_code=202)
async def ingest_telemetry(
payload: TelemetryPayload,
x_device_token: str = Header(...),
):
# Validate device identity
if not await verify_device_token(payload.device_id, x_device_token):
raise HTTPException(status_code=401, detail="Invalid device token")
# Enqueue for async processing — respond immediately
message_id = str(uuid.uuid4())
await enqueue_message(message_id, payload.model_dump())
return {"status": "accepted", "message_id": message_id}Key design decisions:
- HTTP 202 Accepted — not 200 OK. The data is accepted for processing, not processed yet.
- Async processing — validate, enqueue, respond. Don't do heavy processing in the request path.
- Device token in header — separate from the payload for clean separation of concerns.
When to use: Low to medium throughput (< 10K messages/sec), devices with reliable internet, simple deployment requirements.
Pattern 2: MQTT Publish-Subscribe
MQTT is the standard protocol for IoT. Devices publish messages to topics, and your ingestion service subscribes.
# MQTT ingestion service
import asyncio
import json
from aiomqtt import Client
async def mqtt_ingestion():
async with Client("mqtt-broker.internal", port=1883) as client:
await client.subscribe("devices/+/telemetry")
async for message in client.messages:
topic_parts = message.topic.value.split("/")
device_id = topic_parts[1]
try:
payload = json.loads(message.payload.decode())
await process_telemetry(device_id, payload)
except json.JSONDecodeError:
await handle_malformed_message(device_id, message.payload)
async def process_telemetry(device_id: str, payload: dict):
# Validate schema
validated = validate_telemetry_schema(payload)
# Enrich with server-side timestamp
validated["server_received_at"] = datetime.utcnow().isoformat()
validated["device_id"] = device_id
# Forward to message queue for downstream processing
await publish_to_kafka("telemetry-raw", validated)MQTT advantages for IoT:
- Lightweight — minimal packet overhead, designed for constrained devices
- QoS levels — QoS 0 (at most once), QoS 1 (at least once), QoS 2 (exactly once)
- Last Will and Testament — broker notifies when a device disconnects unexpectedly
- Retained messages — new subscribers get the last message on a topic immediately
- Persistent sessions — broker queues messages while a device is offline
When to use: High throughput, constrained devices, unreliable networks, need for bidirectional communication.
Pattern 3: Batch Upload
Devices collect data locally and upload in batches — useful for devices that go offline frequently.
from pydantic import BaseModel, Field
from typing import List
from datetime import datetime
class TelemetryReading(BaseModel):
timestamp: datetime
metrics: dict
class BatchPayload(BaseModel):
device_id: str
batch_id: str
readings: List[TelemetryReading] = Field(..., max_length=1000)
@app.post("/api/v1/ingest/batch", status_code=202)
async def ingest_batch(
payload: BatchPayload,
x_device_token: str = Header(...),
):
if not await verify_device_token(payload.device_id, x_device_token):
raise HTTPException(status_code=401, detail="Invalid device token")
# Check for duplicate batch
if await is_duplicate_batch(payload.batch_id):
return {"status": "duplicate", "batch_id": payload.batch_id}
# Enqueue entire batch
await enqueue_batch(payload.batch_id, payload.model_dump())
return {
"status": "accepted",
"batch_id": payload.batch_id,
"readings_count": len(payload.readings),
}Key design decisions:
- batch_id for idempotency — devices retry on failure. Without deduplication, you get duplicates.
- Max batch size (1000) — prevents a single device from overwhelming the endpoint.
- 202 Accepted — same as single ingestion. Process the batch asynchronously.
When to use: Devices with intermittent connectivity, edge computing scenarios, reduced network overhead.
3. Payload Design and Validation
Device telemetry data needs a consistent structure. Without schema enforcement, your downstream pipeline becomes a mess.
Standard Telemetry Schema
{
"device_id": "sensor-factory-floor-001",
"timestamp": "2026-03-27T10:15:30.123Z",
"metrics": {
"temperature": 23.5,
"humidity": 67.2,
"pressure": 1013.25
},
"metadata": {
"firmware_version": "2.1.0",
"battery_level": 85,
"signal_strength": -67
}
}Validation Layers
Validation in an ingestion API happens in layers — each layer catches different problems:
Layer 1 — Protocol validation: Is the content type correct? Is the body parseable?
Layer 2 — Schema validation: Does the payload have required fields? Are types correct?
from pydantic import BaseModel, Field, validator
from datetime import datetime, timedelta
class TelemetryPayload(BaseModel):
device_id: str = Field(..., pattern=r"^[a-zA-Z0-9\-_]{1,64}$")
timestamp: datetime
metrics: dict = Field(..., min_length=1)
metadata: dict = {}
@validator("timestamp")
def timestamp_not_in_future(cls, v):
if v > datetime.utcnow() + timedelta(minutes=5):
raise ValueError("Timestamp cannot be more than 5 minutes in the future")
return v
@validator("timestamp")
def timestamp_not_too_old(cls, v):
if v < datetime.utcnow() - timedelta(days=7):
raise ValueError("Timestamp cannot be more than 7 days old")
return vLayer 3 — Semantic validation: Are the values reasonable? A temperature of 9999°C is syntactically valid but semantically wrong.
# Define expected ranges per metric type
METRIC_RANGES = {
"temperature": (-50.0, 150.0),
"humidity": (0.0, 100.0),
"pressure": (800.0, 1200.0),
"battery_level": (0, 100),
"signal_strength": (-120, 0),
}
def validate_metric_ranges(metrics: dict) -> list[str]:
warnings = []
for key, value in metrics.items():
if key in METRIC_RANGES:
min_val, max_val = METRIC_RANGES[key]
if not (min_val <= value <= max_val):
warnings.append(
f"{key}={value} outside expected range [{min_val}, {max_val}]"
)
return warningsLayer 4 — Business rules: Is this device registered? Is it sending data from the expected location? Has the firmware been recalled?
Important: Layers 1-2 should reject immediately (HTTP 400). Layers 3-4 should accept the data but flag it — route to a dead letter queue for investigation. You never want to lose data from a device in the field because of a validation bug in your API.
4. Backpressure and Flow Control
The hardest problem in ingestion: what happens when data arrives faster than you can process it?
The Problem
10,000 messages/sec in, 5,000 messages/sec out. Without backpressure, the queue grows unboundedly until something crashes.
Strategy 1: Rate Limiting per Device
from fastapi import Request, HTTPException
import time
# In-memory rate limiter (use Redis in production)
device_request_counts: dict[str, list[float]] = {}
RATE_LIMIT = 10 # max requests per second per device
WINDOW = 1.0 # 1 second window
async def check_rate_limit(device_id: str):
now = time.time()
if device_id not in device_request_counts:
device_request_counts[device_id] = []
# Remove expired entries
device_request_counts[device_id] = [
ts for ts in device_request_counts[device_id]
if now - ts < WINDOW
]
if len(device_request_counts[device_id]) >= RATE_LIMIT:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={"Retry-After": "1"},
)
device_request_counts[device_id].append(now)Strategy 2: Load Shedding
When the system is overwhelmed, intentionally drop lower-priority traffic to protect critical data.
import asyncio
class LoadShedder:
def __init__(self, max_queue_depth: int = 10000):
self.max_queue_depth = max_queue_depth
self.current_depth = 0
async def should_accept(self, priority: str) -> bool:
utilization = self.current_depth / self.max_queue_depth
if utilization < 0.7:
return True # Accept everything
if utilization < 0.9:
# Only accept medium and high priority
return priority in ("high", "medium")
# Critical mode — only accept high priority
return priority == "high"Priority levels for IoT data:
- High: Alarms, safety events, device failures
- Medium: Regular telemetry, status updates
- Low: Diagnostic logs, debug data, heartbeats
Strategy 3: Buffering at the Edge
Push the buffer closer to the source. Edge gateways collect and batch data before sending to the ingestion API.
This reduces the number of connections to your API by orders of magnitude. Instead of 10,000 sensors connecting directly, 100 gateways each aggregate 100 sensors and send batched requests every 30 seconds.
5. Authentication and Device Identity
Every device that sends data must prove it is who it claims to be. Without this, anyone can inject fake data into your system.
Device Authentication Strategies
API Keys (Simple)
# Device sends API key in header
@app.post("/api/v1/ingest")
async def ingest(
payload: TelemetryPayload,
x_api_key: str = Header(...),
):
device = await lookup_device_by_api_key(x_api_key)
if not device:
raise HTTPException(status_code=401, detail="Invalid API key")
if device.device_id != payload.device_id:
raise HTTPException(status_code=403, detail="Device ID mismatch")
# Proceed with ingestion
...Mutual TLS (mTLS) — Strong
Each device has a client certificate. The TLS handshake verifies both server and client identity.
# Nginx configuration for mTLS
server {
listen 8883 ssl;
ssl_certificate /etc/nginx/certs/server.crt;
ssl_certificate_key /etc/nginx/certs/server.key;
# Require client certificate
ssl_client_certificate /etc/nginx/certs/ca.crt;
ssl_verify_client on;
location /api/v1/ingest {
# Pass client cert info to backend
proxy_set_header X-Client-Cert-DN $ssl_client_s_dn;
proxy_set_header X-Client-Cert-Serial $ssl_client_serial;
proxy_pass http://ingestion-api:8000;
}
}JWT with Device Provisioning
Devices obtain short-lived tokens through a provisioning flow, then use those tokens for ingestion.
Which to Choose?
| Method | Security | Device Complexity | Management Overhead |
|---|---|---|---|
| API Key | Low | Very Low | Medium (key rotation) |
| mTLS | High | Medium (cert management) | High (PKI infrastructure) |
| JWT + Provisioning | Medium-High | Low-Medium | Medium |
Recommendation: Start with API keys for development and prototyping. Move to mTLS or JWT for production deployments with > 1,000 devices.
6. Storage Strategies
Once data passes through the ingestion API and message queue, it needs to land somewhere. The choice depends on your query patterns.
Time-Series Database (InfluxDB, TimescaleDB)
Best for: real-time monitoring, dashboards, alerting.
-- TimescaleDB: Create a hypertable for telemetry
CREATE TABLE telemetry (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
metric_name TEXT NOT NULL,
value DOUBLE PRECISION,
metadata JSONB
);
SELECT create_hypertable('telemetry', 'time');
-- Automatic data retention: drop data older than 90 days
SELECT add_retention_policy('telemetry', INTERVAL '90 days');
-- Continuous aggregates for dashboards
CREATE MATERIALIZED VIEW telemetry_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
device_id,
metric_name,
AVG(value) AS avg_value,
MIN(value) AS min_value,
MAX(value) AS max_value,
COUNT(*) AS sample_count
FROM telemetry
GROUP BY bucket, device_id, metric_name;Message Queue → Stream Processing
Best for: event-driven architectures, real-time analytics, complex event processing.
# Kafka consumer for stream processing
from confluent_kafka import Consumer
consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": "telemetry-processor",
"auto.offset.reset": "earliest",
})
consumer.subscribe(["telemetry-raw"])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
telemetry = json.loads(msg.value())
# Real-time anomaly detection
if is_anomalous(telemetry):
publish_alert(telemetry)
# Write to time-series DB
write_to_timescaledb(telemetry)
# Archive to data lake
write_to_s3_parquet(telemetry)Data Lake (S3 + Parquet)
Best for: historical analysis, machine learning, long-term storage.
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime
# Partition by date and device for efficient queries
def write_to_data_lake(records: list[dict], base_path: str):
table = pa.Table.from_pylist(records)
# Partitioned write
pq.write_to_dataset(
table,
root_path=f"{base_path}/telemetry",
partition_cols=["date", "device_id"],
)Multi-Tier Storage Architecture
Most production IoT systems use all three:
- Hot tier (Time-Series DB): Last 7-30 days. Fast queries for dashboards and monitoring.
- Cold tier (Data Lake): All historical data. Slow but cheap. Used for ML training and compliance.
- Downsampling: After the hot retention period, aggregate raw data (1-second readings → 1-minute averages) before archiving.
7. End-to-End Implementation
Let's put it all together with a production-ready ingestion service.
Project Structure
iot-ingestion/
├── app/
│ ├── main.py # FastAPI application
│ ├── models.py # Pydantic models
│ ├── auth.py # Device authentication
│ ├── rate_limiter.py # Rate limiting
│ ├── queue.py # Message queue producer
│ └── health.py # Health check endpoints
├── docker-compose.yml
└── requirements.txtCore Ingestion Service
# app/main.py
from fastapi import FastAPI, HTTPException, Header, Depends
from contextlib import asynccontextmanager
from app.models import TelemetryPayload, BatchPayload
from app.auth import verify_device
from app.rate_limiter import RateLimiter
from app.queue import QueueProducer
producer: QueueProducer | None = None
rate_limiter: RateLimiter | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global producer, rate_limiter
producer = QueueProducer(brokers="kafka:9092")
await producer.start()
rate_limiter = RateLimiter(max_requests=10, window_seconds=1)
yield
await producer.stop()
app = FastAPI(title="IoT Ingestion API", lifespan=lifespan)
@app.post("/api/v1/ingest", status_code=202)
async def ingest(
payload: TelemetryPayload,
x_device_token: str = Header(...),
):
# Authenticate
device = await verify_device(payload.device_id, x_device_token)
if not device:
raise HTTPException(status_code=401, detail="Invalid device")
# Rate limit
if not await rate_limiter.allow(payload.device_id):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Validate metric ranges (warn, don't reject)
warnings = validate_metric_ranges(payload.metrics)
# Enqueue
await producer.send(
topic="telemetry-raw",
key=payload.device_id,
value=payload.model_dump_json(),
)
return {
"status": "accepted",
"warnings": warnings if warnings else None,
}
@app.post("/api/v1/ingest/batch", status_code=202)
async def ingest_batch(
payload: BatchPayload,
x_device_token: str = Header(...),
):
device = await verify_device(payload.device_id, x_device_token)
if not device:
raise HTTPException(status_code=401, detail="Invalid device")
if not await rate_limiter.allow(payload.device_id):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Send each reading as a separate message for parallel processing
for reading in payload.readings:
await producer.send(
topic="telemetry-raw",
key=payload.device_id,
value=reading.model_dump_json(),
)
return {
"status": "accepted",
"readings_count": len(payload.readings),
}Health Check
Production ingestion APIs need health checks that go beyond "is the process running":
# app/health.py
from fastapi import APIRouter
router = APIRouter()
@router.get("/health/ready")
async def readiness():
"""Can this instance accept traffic?"""
checks = {
"kafka": await producer.is_connected(),
"rate_limiter": rate_limiter is not None,
}
all_healthy = all(checks.values())
return {
"status": "ready" if all_healthy else "not_ready",
"checks": checks,
}
@router.get("/health/live")
async def liveness():
"""Is this process alive?"""
return {"status": "alive"}- /health/live — Used by orchestrators (Kubernetes) to know if the process is stuck. If it fails, restart the pod.
- /health/ready — Used by load balancers to know if this instance can serve traffic. If Kafka is down, stop sending requests to this instance.
8. Monitoring and Observability
You can't manage what you can't measure. An ingestion API needs thorough monitoring because failures are silent — devices don't complain to support.
Key Metrics to Track
from prometheus_client import Counter, Histogram, Gauge
# Ingestion metrics
messages_received = Counter(
"ingestion_messages_total",
"Total messages received",
["device_type", "status"],
)
ingestion_latency = Histogram(
"ingestion_latency_seconds",
"Time to accept and enqueue a message",
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25],
)
queue_depth = Gauge(
"ingestion_queue_depth",
"Current message queue depth",
)
active_devices = Gauge(
"ingestion_active_devices",
"Number of devices that sent data in the last 5 minutes",
)Dashboard Essentials
| Metric | Alert Threshold | Why It Matters |
|---|---|---|
| Messages/sec | Drop > 30% in 5 min | Devices may have lost connectivity |
| p99 latency | > 100ms | Ingestion is too slow, risk of timeouts |
| Queue depth | > 100K messages | Processing can't keep up with ingestion |
| Error rate (4xx) | > 5% | Many devices failing authentication |
| Error rate (5xx) | > 0.1% | Your service is failing |
| Active devices | Drop > 10% in 15 min | Network outage or firmware bug |
Device Silence Detection
One of the most important IoT-specific alerts: detecting when a device stops sending data.
import asyncio
from datetime import datetime, timedelta
# Track last-seen time per device
device_last_seen: dict[str, datetime] = {}
async def update_last_seen(device_id: str):
device_last_seen[device_id] = datetime.utcnow()
async def detect_silent_devices(
expected_interval_seconds: int = 60,
silence_multiplier: int = 3,
):
"""Flag devices that haven't reported in 3x their expected interval."""
threshold = timedelta(seconds=expected_interval_seconds * silence_multiplier)
now = datetime.utcnow()
silent = [
device_id
for device_id, last_seen in device_last_seen.items()
if now - last_seen > threshold
]
if silent:
await publish_alert({
"type": "device_silence",
"devices": silent,
"threshold_seconds": threshold.total_seconds(),
})9. Scaling Considerations
Horizontal Scaling
The ingestion layer is stateless — scale horizontally by adding more pods behind a load balancer. The message queue decouples ingestion from processing, so each can scale independently.
Key Scaling Numbers
| Scale | Devices | Messages/sec | Architecture |
|---|---|---|---|
| Small | < 1,000 | < 1,000 | Single instance + PostgreSQL |
| Medium | 1K - 100K | 1K - 50K | 3-5 instances + Kafka + TimescaleDB |
| Large | 100K - 1M | 50K - 500K | Auto-scaling cluster + Kafka + multi-tier storage |
| Massive | > 1M | > 500K | Regional ingestion + global aggregation |
Protocol Selection by Scale
- < 10K devices: HTTP REST is fine. Simple, well-understood, easy to debug.
- 10K - 100K devices: MQTT or HTTP with aggressive batching.
- > 100K devices: MQTT with QoS 1, edge gateways for aggregation, regional ingestion endpoints.
10. Common Pitfalls
Pitfall 1: Synchronous Processing
# ❌ Don't do this — processing blocks the response
@app.post("/api/v1/ingest")
async def ingest(payload: TelemetryPayload):
await write_to_database(payload) # Slow!
await run_anomaly_detection(payload) # Slower!
await update_dashboard(payload) # Slowest!
return {"status": "ok"}
# ✅ Do this — accept fast, process later
@app.post("/api/v1/ingest", status_code=202)
async def ingest(payload: TelemetryPayload):
await enqueue(payload) # Fast!
return {"status": "accepted"}Pitfall 2: No Idempotency
Devices retry. Networks hiccup. If your ingestion API doesn't handle duplicate messages, your data is dirty.
# ✅ Use a deduplication key
@app.post("/api/v1/ingest", status_code=202)
async def ingest(
payload: TelemetryPayload,
x_idempotency_key: str = Header(None),
):
# Generate key from device_id + timestamp if not provided
dedup_key = x_idempotency_key or f"{payload.device_id}:{payload.timestamp.isoformat()}"
if await is_duplicate(dedup_key):
return {"status": "duplicate"}
await enqueue(payload)
await mark_as_seen(dedup_key, ttl_seconds=3600)
return {"status": "accepted"}Pitfall 3: Trusting Device Timestamps
Device clocks drift. Sometimes badly. Always record a server-side timestamp and use it as the authoritative time for ordering.
enriched_payload = {
**payload.model_dump(),
"server_received_at": datetime.utcnow().isoformat(),
"time_drift_seconds": (
datetime.utcnow() - payload.timestamp
).total_seconds(),
}Pitfall 4: No Dead Letter Queue
When a message fails processing, where does it go? If the answer is "it disappears," you have a data loss problem.
async def process_message(message):
try:
validated = validate(message)
await store(validated)
except ValidationError as e:
# Don't lose it — send to DLQ for investigation
await send_to_dlq(message, reason=str(e))
except Exception as e:
# Unexpected error — DLQ with full context
await send_to_dlq(message, reason=str(e), retryable=True)Summary
Designing an ingestion API for IoT is fundamentally different from building a typical web API. The key principles:
- Accept fast, process later — always return 202, enqueue for async processing
- Design for failure — devices retry, networks fail, clocks drift. Handle all of it
- Layer your validation — reject bad syntax immediately, flag bad semantics for review
- Implement backpressure — rate limiting, load shedding, and edge buffering
- Authenticate every device — API keys for dev, mTLS or JWT for production
- Use multi-tier storage — hot data in time-series DB, cold data in data lake
- Monitor the silence — in IoT, the absence of data is often the most important signal
The architecture pattern that works at every scale:
Devices → Ingestion API → Message Queue → Stream Processor → Storage TiersStart simple with HTTP and a single database. Add MQTT, Kafka, and multi-tier storage as your device fleet grows. The ingestion API pattern stays the same — only the infrastructure behind it scales.
Further Reading
- REST API Complete Guide — fundamentals of API design
- API Gateway Complete Guide — managing API traffic at scale
- HTTP Protocol Guide — understanding the protocol layer
📬 Subscribe to Newsletter
Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.
We respect your privacy. Unsubscribe at any time.
💬 Comments
Sign in to leave a comment
We'll never post without your permission.