2025-11-09
IoT Lojistik Uygulamaları için Mesajlaşma Protokolü Seçimi: MQTT, AMQP, ZeroMQ, CoAP ve DDS Karşılaştırması
IoT lojistik uygulamaları için mesajlaşma protokollerinin kapsamlı teknik karşılaştırması. Filo takibi, soğuk zincir izleme ve gerçek zamanlı cihaz iletişimi için MQTT, AMQP, ZeroMQ, CoAP ve DDS'yi ne zaman kullanacağını öğren.
Özet
IoT sistemleriyle çalışırken öğrendiğim en önemli şeylerden biri, protokol seçiminin sistem performansını, güvenilirliğini ve operasyonel maliyetleri önemli ölçüde etkilediğidir. Bu rehberde beş mesajlaşma protokolünü - MQTT, AMQP, ZeroMQ, CoAP ve DDS - filo takibi, soğuk zincir izleme ve gerçek zamanlı cihaz iletişimi senaryolarından örneklerle karşılaştırıyorum. Çalışan kod örnekleri, gerçekçi performans metrikleri ve senin spesifik gereksinimlerine uygun protokolü seçmene yardımcı olacak karar çerçeveleri bulacaksın.
Protokol Seçimi Sorunu
Lojistik için IoT çözümleri geliştirirken, birbirine bağlı birçok teknik karar alman gerekiyor. Yüzlerceden milyonlarca eşzamanlı cihaz bağlantısını yönetmen lazım. Cihazların güvenilmeyen cellular ağlarda, sınırlı bandwidth ile çalışıyor. Bazı veriler garantili teslimat gerektiriyor (ilaç soğuk zincirindeki sıcaklık okumaları), diğerleri ara sıra kayıp tolerans gösterebiliyor (sık GPS güncellemeleri). Gerçek zamanlı takip için düşük latency’ye ihtiyacın var ama compliance için güvenilirliğe de.
Bu sadece bir protokol seçmek değil - teknik özellikleri senin spesifik kısıtlamalarınla eşleştirmek.
Protokol Genel Bakış ve Özellikleri
Spesifik kullanım senaryolarına girmeden önce, üst seviye bir karşılaştırmayla başlayayım:
| Protokol | Mimari | Transport | Mesajlaşma Modeli | QoS Seviyeleri | En İyi Kullanım |
|---|---|---|---|---|---|
| MQTT | Broker-based | TCP | Pub/Sub | 3 (0,1,2) | Genel IoT, güvenilmeyen ağlar |
| AMQP | Broker-based | TCP | Çoklu | 3 (0,1,2) | Enterprise entegrasyon, karmaşık routing |
| ZeroMQ | Broker-less | Çoklu | Çoklu | Yok (app layer) | Yüksek performans, düşük latency lokal |
| CoAP | Peer-to-peer | UDP | Request/Response | Opsiyonel | Kısıtlı cihazlar, düşük güç |
| DDS | Broker-less | Çoklu | Pub/Sub | Detaylı | Gerçek zamanlı kritik sistemler |
Performans Karakteristikleri
Latency (en düşükten en yükseğe):
- CoAP: Yüzlerce microsaniye
- ZeroMQ: Microsaniye - düşük millisaniye
- DDS: Düşük millisaniye
- MQTT: 10-50ms tipik
- AMQP: MQTT’den yüksek
Throughput (en yüksekten en düşüğe):
- CoAP: ~2x MQTT QoS 0 throughput
- ZeroMQ: Broker-less için son derece yüksek
- MQTT QoS 0: Yüksek throughput
- MQTT QoS 1: ~QoS 0’ın %50’si (2x overhead)
- MQTT QoS 2: ~QoS 0’ın %25’i (4x overhead)
MQTT: Filo Takibi İçin Varsayılan Seçim
MQTT, IoT uygulamaları için fiili standart haline geldi ve bunun iyi nedenleri var. Filo takip sistemleri geliştirirken öğrendiklerimi paylaşayım.
Filo Takibi Implementasyonu
10,000+ araç GPS koordinatları, hız, yakıt seviyesi ve diagnostics gönderiyor. MQTT’nin pub/sub modeli doğal olarak uyuyor. Anahtar nokta, her veri tipi için doğru QoS seviyesini kullanmak.
Topic Yapısı ve QoS Stratejisi
import mqtt from 'mqtt';
interface GpsData {
lat: number;
lon: number;
timestamp: string;
}
interface BehaviorData {
speed: number;
acceleration: number;
harshBraking: boolean;
}
interface AlertData {
type: string;
severity: string;
location: { lat: number; lon: number };
}
function setupFleetTracker(vehicleId: string, brokerHost: string): mqtt.MqttClient {
const client = mqtt.connect(`mqtts://${brokerHost}:8883`, {
clientId: `vehicle-${vehicleId}`,
// Last Will and Testament for connection monitoring
will: {
topic: `fleet/${vehicleId}/status`,
payload: Buffer.from('offline'),
qos: 1,
retain: true
},
// TLS options for production
// ca: fs.readFileSync('ca.crt'),
// cert: fs.readFileSync('client.crt'),
// key: fs.readFileSync('client.key')
});
client.on('connect', () => {
// Publish online status immediately after connection
client.publish(
`fleet/${vehicleId}/status`,
'online',
{ qos: 1, retain: true }
);
});
return client;
}
function publishTelemetry(client: mqtt.MqttClient, vehicleId: string) {
// GPS updates - frequent, tolerate occasional loss
const gpsData: GpsData = {
lat: 52.520,
lon: 13.405,
timestamp: new Date().toISOString()
};
client.publish(
`fleet/eu-central/${vehicleId}/gps`,
JSON.stringify(gpsData),
{ qos: 0 } // Fire and forget
);
// Driver behavior - important for analytics
const behaviorData: BehaviorData = {
speed: 85,
acceleration: 2.5,
harshBraking: false
};
client.publish(
`fleet/eu-central/${vehicleId}/behavior`,
JSON.stringify(behaviorData),
{ qos: 1 } // At least once delivery
);
// Critical alerts - must not duplicate
if (detectAccident()) {
const alertData: AlertData = {
type: 'accident',
severity: 'high',
location: { lat: 52.520, lon: 13.405 }
};
client.publish(
`fleet/eu-central/${vehicleId}/alert`,
JSON.stringify(alertData),
{ qos: 2 } // Exactly once delivery
);
}
}
Bu QoS stratejisi önemli bir fark yaratıyor. Üzerinde çalıştığım sistemlerde, sık GPS güncellemeleri için QoS 0 kullanmak, genel QoS 1’e göre network trafiğini yaklaşık %50 azalttı. Kritik alarmlar için QoS 2 ise tekrarlayan acil durum yanıtlarını önledi.
Wildcards ile Fan-In Pattern
Subscriber tarafında, MQTT wildcards güçlü aggregation pattern’leri sağlıyor:
function setupRegionalMonitor(region: string, brokerHost: string) {
const client = mqtt.connect(`mqtts://${brokerHost}:8883`, {
clientId: `monitor-${region}`
});
client.on('connect', () => {
// Subscribe to all vehicles in region
client.subscribe(`fleet/${region}/+/gps`, { qos: 0 });
// Subscribe to all critical alerts
client.subscribe('fleet/+/+/alert', { qos: 2 });
});
client.on('message', (topic: string, payload: Buffer) => {
// Parse topic to extract vehicle_id and data_type
const parts = topic.split('/');
const vehicleId = parts[2];
const dataType = parts[3];
processVehicleData(vehicleId, dataType, payload);
});
}
GPS Verilerini PostgreSQL/PostGIS ile Saklama
Filo takip uygulamaları için, PostGIS extension’ı ile PostgreSQL güçlü geospatial özellikler sağlıyor. Araç konumlarını kalıcı hale getirmek ve sorgulamak için:
import { Pool } from 'pg';
interface VehicleLocation {
vehicleId: string;
lat: number;
lon: number;
speed: number;
timestamp: Date;
}
class FleetDatabase {
private pool: Pool;
constructor(connectionString: string) {
this.pool = new Pool({ connectionString });
}
async initialize() {
const client = await this.pool.connect();
try {
// Enable PostGIS extension
await client.query('CREATE EXTENSION IF NOT EXISTS postgis;');
// Create vehicle_locations table with geospatial column
await client.query(`
CREATE TABLE IF NOT EXISTS vehicle_locations (
id BIGSERIAL PRIMARY KEY,
vehicle_id VARCHAR(50) NOT NULL,
location GEOGRAPHY(POINT, 4326) NOT NULL,
speed NUMERIC(5,2),
recorded_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Index for spatial queries
CREATE INDEX IF NOT EXISTS idx_vehicle_locations_geog
ON vehicle_locations USING GIST(location);
-- Index for time-series queries
CREATE INDEX IF NOT EXISTS idx_vehicle_locations_time
ON vehicle_locations(recorded_at DESC);
-- Index for vehicle-specific queries
CREATE INDEX IF NOT EXISTS idx_vehicle_locations_vehicle
ON vehicle_locations(vehicle_id, recorded_at DESC);
`);
} finally {
client.release();
}
}
async storeLocation(location: VehicleLocation): Promise<void> {
await this.pool.query(
`INSERT INTO vehicle_locations (vehicle_id, location, speed, recorded_at)
VALUES ($1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4, $5)`,
[location.vehicleId, location.lon, location.lat, location.speed, location.timestamp]
);
}
// Find vehicles within radius (geofencing)
async findVehiclesNearLocation(
lat: number,
lon: number,
radiusMeters: number
): Promise<any[]> {
const result = await this.pool.query(
`SELECT
vehicle_id,
ST_X(location::geometry) as lon,
ST_Y(location::geometry) as lat,
speed,
ST_Distance(location, ST_SetSRID(ST_MakePoint($2, $1), 4326)) as distance_meters,
recorded_at
FROM vehicle_locations
WHERE ST_DWithin(
location,
ST_SetSRID(ST_MakePoint($2, $1), 4326),
$3
)
ORDER BY recorded_at DESC`,
[lat, lon, radiusMeters]
);
return result.rows;
}
// Calculate route distance for a vehicle
async calculateRouteDistance(vehicleId: string, startTime: Date, endTime: Date): Promise<number> {
const result = await this.pool.query(
`WITH ordered_locations AS (
SELECT location::geometry as geom
FROM vehicle_locations
WHERE vehicle_id = $1
AND recorded_at BETWEEN $2 AND $3
ORDER BY recorded_at
)
SELECT ST_Length(
ST_MakeLine(geom ORDER BY geom)::geography
) / 1000 as distance_km
FROM ordered_locations`,
[vehicleId, startTime, endTime]
);
return result.rows[0]?.distance_km || 0;
}
}
// Integration with MQTT subscriber
function setupFleetDatabaseIntegration(brokerHost: string, dbConnectionString: string) {
const db = new FleetDatabase(dbConnectionString);
await db.initialize();
const client = mqtt.connect(`mqtts://${brokerHost}:8883`, {
clientId: 'fleet-db-writer'
});
client.on('connect', () => {
client.subscribe('fleet/+/+/gps', { qos: 1 });
});
client.on('message', async (topic: string, payload: Buffer) => {
const parts = topic.split('/');
const vehicleId = parts[2];
const data = JSON.parse(payload.toString());
await db.storeLocation({
vehicleId,
lat: data.lat,
lon: data.lon,
speed: data.speed || 0,
timestamp: new Date(data.timestamp)
});
});
}
Bu kurulum şunları sağlıyor:
- Geospatial indexing - hızlı yakınlık sorguları için PostGIS ile
- Geofencing sorguları - bir yarıçap içindeki araçları bul
- Rota analizi - toplam gidilen mesafeyi hesapla
- Time-series optimizasyonu - bölümlenmiş indekslerle
GEOGRAPHY tipi, doğru mesafe hesaplamaları için Dünya’nın eğriliğini otomatik olarak işliyor. 30 saniyede bir rapor veren 10,000 araçlık bir filo için, bu günde ~29M kayıt oluşturuyor. Verimli historical veri yönetimi için PostgreSQL table partitioning kullan.
AWS IoT Core Entegrasyonu
Birkaç bin cihazın ötesine geçtiğinde, managed servisler operasyonları önemli ölçüde basitleştiriyor. AWS IoT Core’un Rules Engine’i ile yüksek hacimli telemetry routing’i şöyle:
-- Tüm telemetry'yi stream processing için Kinesis'e yönlendir
SELECT * FROM 'fleet/+/+/telemetry'
-- Soğuk zincir izleme için sıcaklık ihlallerini filtrele
SELECT * FROM 'coldchain/+/temperature'
WHERE temperature < 2 OR temperature > 8
Rules Engine, özel infrastructure gerekmeden milyonlarca mesajı işliyor, SQL sorgularına göre Kinesis, Lambda, SNS veya diğer AWS servislerine yönlendiriyor.
MQTT Broker Seçimi
Doğru broker’ı seçmek, senin scale’ine ve operasyonel modeline bağlı:
Mosquitto edge gateway’ler ve development için iyi çalışıyor. Single-threaded ve tipik hardware’da pratik olarak yaklaşık 100K connection ile sınırlı (teorik maksimum daha yüksek olsa da), ama en düşük resource footprint’e sahip. Cloud broker’lara bridge etmeden önce lokal MQTT aggregation için Raspberry Pi gateway’lere deploy ettim.
EMQX büyük scale’de mükemmel. Testlerde, EMQX 23-node cluster’da 100M+ eşzamanlı connection işledi. Masterless clustering ve horizontal scaling, multi-milyon cihaz deployment’ları için uygun. Open source edition temel functionality sağlıyor, enterprise özellikler commercial olarak mevcut.
HiveMQ enterprise güvenilirliğine odaklanıyor. 200M connection ile 37 dakikalık ramp-up benchmark’ı, mission-critical uygulamalar için olgunluğu gösteriyor. BMW’nin connected car platformu, HiveMQ kullanarak unlock süresini 30 saniyeden 1 saniyenin altına indirdi. Trade-off: sadece commercial licensing.
AWS IoT Core infrastructure yönetimini tamamen ortadan kaldırıyor. Serverless, cihaz filosunu otomatik olarak yönetecek şekilde scale oluyor. Pay-as-you-go fiyatlandırma değişken workload’lar için iyi ama çok yüksek hacimlerde self-hosted maliyetleri aşabiliyor.
Soğuk Zincir İzleme: Multi-Protocol Mimari
İlaç soğuk zinciri izleme, birden fazla protokolün birlikte etkili çalıştığı senaryoları gösteriyor. Sıcaklık 2-8°C arasında kalmalı, ihlaller için anında alarm gerekiyor.
Protokol Stack
Edge’de, BLE sensörler ile gateway’ler arasında düşük güç tüketimli lokal iletişim sağlıyor. Gateway okumaları aggregate ediyor ve MQTT’ye publish ediyor. Sıcaklık okumaları için QoS 1 teslimatı garanti ediyor, threshold ihlalleri için QoS 2 tekrarlayan alarmları önlüyor (gereksiz acil prosedürleri tetikleyebilir).
İşte pratik bir gateway implementasyonu:
import noble from '@abandonware/noble';
import mqtt from 'mqtt';
import { Pool } from 'pg';
interface TemperatureReading {
temperature: number;
sensor: string;
timestamp: Date;
}
class ColdChainGateway {
private mqttClient: mqtt.MqttClient;
private dbPool: Pool;
private sensorMacs: string[];
private readonly thresholdMin = 2.0;
private readonly thresholdMax = 8.0;
private readonly tempCharUuid = '2a1c'; // Temperature Measurement UUID
constructor(mqttBroker: string, dbConnectionString: string, sensorMacs: string[]) {
this.mqttClient = mqtt.connect(`mqtts://${mqttBroker}:8883`);
this.dbPool = new Pool({ connectionString: dbConnectionString });
this.sensorMacs = sensorMacs;
}
async initialize() {
// Initialize temperature_readings table
await this.dbPool.query(`
CREATE TABLE IF NOT EXISTS temperature_readings (
id BIGSERIAL PRIMARY KEY,
sensor_mac VARCHAR(17) NOT NULL,
temperature NUMERIC(5,2) NOT NULL,
recorded_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Index for time-series queries
CREATE INDEX IF NOT EXISTS idx_temp_readings_time
ON temperature_readings(recorded_at DESC);
-- Index for sensor-specific queries
CREATE INDEX IF NOT EXISTS idx_temp_readings_sensor
ON temperature_readings(sensor_mac, recorded_at DESC);
-- Hypertable for TimescaleDB (if using TimescaleDB extension)
-- SELECT create_hypertable('temperature_readings', 'recorded_at', if_not_exists => TRUE);
`);
}
async readSensor(macAddress: string): Promise<number> {
return new Promise((resolve, reject) => {
noble.on('discover', async (peripheral) => {
if (peripheral.address.toLowerCase() === macAddress.toLowerCase()) {
await peripheral.connectAsync();
const { characteristics } = await peripheral.discoverSomeServicesAndCharacteristicsAsync(
[],
[this.tempCharUuid]
);
const tempChar = characteristics[0];
const data = await tempChar.readAsync();
// Parse temperature (depends on sensor format)
const temperature = data.readInt16LE(0) / 100.0;
await peripheral.disconnectAsync();
resolve(temperature);
}
});
noble.startScanning([], false);
setTimeout(() => {
noble.stopScanning();
reject(new Error('Sensor not found'));
}, 10000);
});
}
async storeReading(reading: TemperatureReading): Promise<void> {
await this.dbPool.query(
`INSERT INTO temperature_readings (sensor_mac, temperature, recorded_at)
VALUES ($1, $2, $3)`,
[reading.sensor, reading.temperature, reading.timestamp]
);
}
async monitorSensors(): Promise<void> {
setInterval(async () => {
for (const mac of this.sensorMacs) {
try {
const temp = await this.readSensor(mac);
const reading: TemperatureReading = {
temperature: temp,
sensor: mac,
timestamp: new Date()
};
// Store in PostgreSQL
await this.storeReading(reading);
// Publish temperature reading (QoS 1)
this.mqttClient.publish(
`coldchain/${mac}/temperature`,
JSON.stringify(reading),
{ qos: 1 }
);
// Check threshold violation (QoS 2 for alerts)
if (temp < this.thresholdMin || temp > this.thresholdMax) {
this.mqttClient.publish(
`coldchain/${mac}/alert`,
JSON.stringify({
temperature: temp,
threshold: `${this.thresholdMin}-${this.thresholdMax}`,
severity: 'critical',
timestamp: reading.timestamp
}),
{ qos: 2 } // Exactly once - prevent duplicate emergency responses
);
}
} catch (error) {
console.error(`Error reading sensor ${mac}:`, error);
}
}
}, 60000); // Read every minute
}
// Query recent violations for compliance reporting
async getViolations(hours: number = 24): Promise<any[]> {
const result = await this.dbPool.query(
`SELECT sensor_mac, temperature, recorded_at
FROM temperature_readings
WHERE recorded_at > NOW() - INTERVAL '${hours} hours'
AND (temperature < $1 OR temperature > $2)
ORDER BY recorded_at DESC`,
[this.thresholdMin, this.thresholdMax]
);
return result.rows;
}
}
AMQP: Dağıtım Merkezleri İçin Karmaşık Routing
AMQP, MQTT’nin basit pub/sub’ının verimli şekilde yapamadığı sofistike routing pattern’leri gerektirdiğinde parlıyor. Birden fazla event tipi olan dağıtım merkezleri, AMQP’nin exchange type’larından faydalanıyor.
Exchange Type’ları Pratikte
import amqp from 'amqplib';
interface PackageEvent {
packageId: string;
status: string;
location: string;
timestamp: Date;
}
async function setupLogisticsRouting(): Promise<amqp.Channel> {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// Topic exchange for flexible routing patterns
await channel.assertExchange('logistics', 'topic', { durable: true });
// Direct exchange for targeted delivery
await channel.assertExchange('warehouse', 'direct', { durable: true });
// Fanout for critical broadcasts
await channel.assertExchange('emergency', 'fanout', { durable: true });
return channel;
}
async function publishPackageEvent(
channel: amqp.Channel,
region: string,
warehouse: string,
eventType: string,
data: PackageEvent
): Promise<void> {
// Topic routing: logistics.{region}.{warehouse}.{event}
const routingKey = `logistics.${region}.${warehouse}.${eventType}`;
channel.publish(
'logistics',
routingKey,
Buffer.from(JSON.stringify(data)),
{
persistent: true,
contentType: 'application/json'
}
);
}
async function subscribeToRegionalEvents(
channel: amqp.Channel,
region: string
): Promise<void> {
// Create exclusive queue for this consumer
const { queue } = await channel.assertQueue('', { exclusive: true });
// Subscribe to all events in region
await channel.bindQueue(
queue,
'logistics',
`logistics.${region}.#` // # matches zero or more segments
);
// Subscribe to all package arrivals globally
await channel.bindQueue(
queue,
'logistics',
'logistics.*.*.package.arrived' // * matches exactly one segment
);
channel.consume(queue, (msg) => {
if (msg) {
const event = JSON.parse(msg.content.toString());
processLogisticsEvent(msg.fields.routingKey, event);
channel.ack(msg);
}
});
}
Topic exchange, subscriber’ların application layer’da filtreleme yapmadan tam olarak ihtiyaç duydukları event’leri almasını sağlıyor. Regional operasyon merkezi logistics.europe.#’ye subscribe olup tüm Avrupa event’lerini alabilirken, paket takip sistemi dünya çapında paket event’leri için logistics.*.*.package.*’e subscribe oluyor.
Güvenilirlik İçin Dead Letter Exchange
Zor yoldan öğrendiğim bir ders: başarısız mesaj işleme için her zaman dead letter exchange yapılandır:
async function setupReliableQueue(
channel: amqp.Channel,
queueName: string
): Promise<void> {
// Dead letter exchange for failed messages
await channel.assertExchange('dlx', 'direct', { durable: true });
await channel.assertQueue('failed_messages', { durable: true });
await channel.bindQueue('failed_messages', 'dlx', 'failed');
// Main queue with dead letter configuration
await channel.assertQueue(queueName, {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed',
'x-message-ttl': 3600000, // 1 hour TTL
'x-max-length': 100000 // Prevent unbounded growth
}
});
}
Retry’lardan sonra mesaj işleme başarısız olduğunda, mesajlar otomatik olarak dead letter queue’ya yönleniyor. Bu, broker memory’yi tüketebilecek queue buildup’ı önlüyor - DLX yapılandırmayı öğrenmeden önce production outage’lere neden olan bir sorun.
ZeroMQ: Yüksek Performanslı Edge Processing
ZeroMQ’nun broker-less mimarisi, MQTT’nin millisaniyelerinin yeterli olmadığı edge processing senaryoları için microsaniye seviyesinde latency sağlıyor.
Load Balancing İçin Pipeline Pattern
import * as zmq from 'zeromq';
interface TelemetryMessage {
vehicleId: string;
telemetry: any;
timestamp: number;
}
// Vehicle gateway - PUSH socket
async function vehicleTelemetryGateway() {
const sender = new zmq.Push();
await sender.bind('tcp://*:5557');
setInterval(async () => {
const telemetry = collectVehicleTelemetry();
const message: TelemetryMessage = {
vehicleId: 'truck-1234',
telemetry,
timestamp: Date.now()
};
await sender.send(JSON.stringify(message));
}, 100); // 10 messages/second
}
// Processing workers - PULL socket
async function telemetryProcessor(workerId: string) {
const receiver = new zmq.Pull();
await receiver.connect('tcp://gateway:5557');
for await (const [msg] of receiver) {
const message: TelemetryMessage = JSON.parse(msg.toString());
// Automatic load balancing across workers
processTelemetry(message);
console.log(`Worker ${workerId} processed ${message.vehicleId}`);
}
}
PUSH/PULL pattern, broker konfigürasyonu olmadan otomatik load balancing sağlıyor. Mesajlar mevcut worker’lara round-robin dağılıyor. Saniyede binlerce mesaj işleyen edge senaryolarında, bu basitlik değerli hale geliyor.
Topic Filtreleme ile Pub/Sub
// Publisher
async function telemetryPublisher() {
const pub = new zmq.Publisher();
await pub.bind('tcp://*:5556');
setInterval(async () => {
const region = 'europe';
const data = getRegionalTelemetry(region);
// Topic prefix for filtering
await pub.send([region, JSON.stringify(data)]);
}, 1000);
}
// Subscriber with topic filtering
async function regionalSubscriber(region: string) {
const sub = new zmq.Subscriber();
await sub.connect('tcp://publisher:5556');
// Subscribe only to specific region
sub.subscribe(region);
for await (const [topic, message] of sub) {
const data = JSON.parse(message.toString());
processData(data);
}
}
ZeroMQ ile trade-off: delivery guarantee’leri kendin implement etmen gerekiyor. Otomatik retry veya persistence yok. Cihazların veriyi hızlıca yeniden oluşturabildiği lokal edge processing için bu iyi çalışıyor. Kritik uzun mesafe iletişim için, MQTT’nin built-in QoS’u daha uygun hale geliyor.
CoAP: Kısıtlı Cihaz İletişimi
CoAP’ın UDP tabanlı tasarımı ve 4-byte header’ı, her byte’ın ve her watt’ın önemli olduğu pil ile çalışan asset tracker’lar için ideal.
CoAP ile RESTful IoT
import coap from 'coap';
interface LocationData {
lat: number;
lon: number;
timestamp: string;
}
// CoAP Client - GET request
async function coapAssetTracker(): Promise<void> {
const req = coap.request({
host: 'tracker.example.com',
pathname: '/location',
method: 'GET'
});
req.on('response', (res) => {
const location = JSON.parse(res.payload.toString());
console.log('Asset location:', location);
});
req.on('error', (err) => {
console.error('Request failed:', err);
});
req.end();
}
// CoAP Server - handle GET requests
function coapServer() {
const server = coap.createServer();
server.on('request', (req, res) => {
if (req.url === '/location' && req.method === 'GET') {
const locationData: LocationData = getCurrentLocation();
res.setOption('Content-Format', 'application/json');
res.code = '2.05'; // Content
res.end(JSON.stringify(locationData));
} else {
res.code = '4.04'; // Not Found
res.end();
}
});
server.listen(5683, () => {
console.log('CoAP server listening on port 5683');
});
}
CoAP’ın RESTful tasarımı, HTTP API’leriyle çalıştıysan tanıdık geliyor. Fark: dramatik şekilde daha düşük overhead. Pil ile çalışan deployment’larda, CoAP pil ömrünü TCP üzerinden MQTT’ye göre 2-3x uzatabilir, TCP’nin güvenilirlik garantilerini kaybetsen de.
Confirmable vs. Non-Confirmable Mesajlar
// Non-confirmable (NON) - fire and forget, lowest latency
const requestNon = coap.request({
host: 'tracker.example.com',
pathname: '/telemetry',
method: 'POST',
confirmable: false, // NON type
options: {
'Content-Format': 'text/plain'
}
});
requestNon.write(sensorData);
requestNon.end();
// Confirmable (CON) - requires ACK, reliable delivery
const requestCon = coap.request({
host: 'tracker.example.com',
pathname: '/alert',
method: 'POST',
confirmable: true, // CON type
options: {
'Content-Format': 'application/json'
}
});
requestCon.on('response', (res) => {
console.log('Alert acknowledged:', res.code);
});
requestCon.write(JSON.stringify(alertData));
requestCon.end();
Ara sıra kayıp kabul edilebilir sık telemetry için NON kullan. Acknowledgment gerektiren kritik event’ler için CON. Bu, MQTT’nin QoS 0 vs QoS 1 trade-off’unu yansıtıyor, ama UDP layer’da daha düşük overhead ile.
DDS: Gerçek Zamanlı Otonom Sistemler
DDS (Data Distribution Service) farklı bir kategoride çalışıyor - microsaniye timing’in önemli olduğu deterministik, gerçek zamanlı sistemler. Sensör füzyonunun öngörülebilir latency gerektirdiği otonom araç projelerinde gördüm.
DDS, ZeroMQ gibi broker-less ama topic seviyesinde sofistike QoS policy’leri ekliyor. Mesaj-merkezli değil veri-merkezli, sistemi dağıtık bir veritabanı olarak ele alıyor.
DDS Quality of Service
DDS 22 QoS policy sunuyor, kulağa bunaltıcı gelse de detaylı kontrol sağlıyor:
- Reliability: Reliable (garantili teslimat) veya Best Effort (hızlı, güvenilmez)
- Durability: Transient (geç katılanlar için) veya Volatile (sadece güncel veri)
- Deadline: Periyodik veri belirtilen aralıkta gelmeli
- Lifespan: Otomatik veri expiration
- History: Son N sample’ı tut vs. tüm sample’ları
- Ownership: Exclusive veya shared veri sahipliği
İşte DDS pattern’lerini gösteren konseptüel bir örnek (gerçek Fast DDS Python API’si farklılık gösterebilir):
# Not: Bu DDS pattern'lerini gösteren konseptüel koddur.
# Gerçek Fast DDS Python bindings API'si farklılık gösterebilir.
import fastdds
from sensor_msgs.msg import Temperature
# DDS Participant (MQTT'deki client gibi)
participant = fastdds.DomainParticipant(domain_id=0)
# Sıcaklık verisi için publisher
temperature_topic = fastdds.Topic(
participant,
"temperature",
Temperature
)
publisher = fastdds.Publisher(
participant,
qos_profile={
'reliability': 'RELIABLE',
'durability': 'TRANSIENT_LOCAL',
'deadline': 1000 # millisaniye
}
)
# Sıcaklık okuması publish et
temp_msg = Temperature(temperature=22.5)
publisher.publish(temp_msg)
Deadline QoS özellikle otonom araçlar için kullanışlı. Sensör verisi deadline içinde gelmezse, subscriber verinin eski olduğunu bilir ve uygun aksiyonu alabilir (örneğin, acil duruş).
DDS, MQTT’nin broker’ının kabul edilemez latency eklediği senaryolarda mükemmel. RTI Connext yakın zamanda otomotiv güvenlik sertifikasyonu aldı (Kasım 2024), DDS’yi güvenlik-kritik uygulamalar için doğruluyor. Trade-off: karmaşıklık ve lisans maliyetleri (Fast DDS ve Cyclone DDS gibi açık kaynak implementasyonlar mevcut olsa da).
Karar Çerçevesi: Protokolünü Seç
Benim için işe yarayan framework:
MQTT’yi Ne Zaman Seç:
- Cihazlar güvenilmeyen ağlarda çalışıyor (cellular, WiFi)
- Application layer implementasyon olmadan teslimat garantisi gerekiyor
- Basit pub/sub senaryona uyuyor
- Kaynak kısıtlı cihazlar (bandwidth, pil)
- Hızlı production süresi öncelik
- Örnek: Filo takibi, sensör izleme, mobil uygulamalar
AMQP’yi Ne Zaman Seç:
- Karmaşık routing pattern’leri gerekli (topic exchange, header-based routing)
- Enterprise sistem entegrasyonu esansiyel
- Message queuing ile persistence gerekiyor
- Tek sistemde birden fazla mesajlaşma modeli
- Örnek: Dağıtım merkezleri, multi-tier lojistik, legacy entegrasyon
ZeroMQ’yu Ne Zaman Seç:
- Microsaniye latency kritik
- Broker-less mimari tercih ediliyor (SPOF yok)
- Yüksek throughput lokal processing
- Delivery guarantee’leri application layer’da implement edebilirsin
- Örnek: Edge analytics, gerçek zamanlı kontrol sistemleri, HFT benzeri senaryolar
CoAP’ı Ne Zaman Seç:
- Son derece kısıtlı cihazlar (pil, memory, CPU)
- RESTful paradigma doğal uyuyor
- UDP paket kaybı kabul edilebilir
- Minimal bandwidth mevcut
- Örnek: Pil ile çalışan tracker’lar, çevre sensörleri, giyilebilir cihazlar
DDS’yi Ne Zaman Seç:
- Deterministik gerçek zamanlı davranış gerekli
- Güvenlik-kritik uygulamalar
- Karmaşık veri-merkezli sistemler
- Otonom sistem entegrasyonu
- Yüksek lisans maliyetlerini justify edebilirsin
- Örnek: Otonom araçlar, endüstriyel otomasyon, havacılık
Karşılaştığım Yaygın Hatalar
1. MQTT’de Topic Explosion
Cihaz başına metric başına unique topic oluşturmak (fleet-region-vehicle-metric-001, fleet-region-vehicle-metric-002, vb.) milyonlarca topic’e yol açıyor. Bu aşırı broker memory tüketiyor ve topic matching’i önemli ölçüde yavaşlatıyordu.
Çözüm: Wildcards ile hiyerarşik topic’ler kullan: fleet/{region}/{vehicle-id}/{metric}
Bir deployment’ta, topic’leri yeniden yapılandırmak broker memory kullanımını yaklaşık %60 azalttı.
2. Yanlış QoS Seçimi
“Güvenli olmak için” tüm mesajlar için QoS 2 kullanmak network overhead’ı 4x artırdı ve throughput’u önemli ölçüde azalttı. Çoğu veri tam olarak bir kere semantiğine ihtiyaç duymuyor.
Çözüm:
- QoS 0: Sık kritik olmayan güncellemeler (%80 mesaj)
- QoS 1: Teslimat onayı gereken önemli veri (%15 mesaj)
- QoS 2: Kritik tekrarlamayan komutlar (%5 mesaj)
Bu doğru boyutlandırma, filo takip sistemlerinde network trafiğini yaklaşık %50 azalttı.
3. Last Will and Testament’ı Unutmak
LWT konfigürasyonu olmadan, dashboard araçları zarif olmayan şekilde bağlantısı kesildiğinde “online” gösteriyordu. Bunu sadece network testleri sırasında keşfettik.
Çözüm: Her zaman bağlantıda LWT yapılandır:
client.will_set(
topic=f"fleet/{vehicle_id}/status",
payload="offline",
qos=1,
retain=True
)
4. Telemetry’de Retained Mesajlar
Yüksek frekanslı sıcaklık okumaları üzerinde retain=True ayarlamak, her yeni subscriber’ın eski veri alması anlamına geliyordu. Broker gereksiz geçmiş değerleri saklıyordu.
Çözüm: Sadece durum/status mesajlarını retain et, telemetry stream’lerini değil.
5. Clustering Planı Yapmamak
Tek broker instance ile başlamak, gerektiğinde “sonra” cluster yapmayı planlamak. Yük altında migration son derece zorlu oldu.
Çözüm: Managed servisler (AWS IoT Core) kullan veya baştan cluster deploy et. Sonradan migration engineering zamanında yaklaşık 10x daha fazla maliyetlendiriyor.
6. Yetersiz Monitoring
Mesaj teslimat sorunlarını kullanıcı raporlarından keşfettik, monitoring’den değil. Latency yüzdeliklerine, hata oranlarına veya queue derinliklerine görünürlük yoktu.
Çözüm: İlk günden kapsamlı monitoring implement et:
# İzlenecek Prometheus metrikleri
- mqtt_messages_published_total
- mqtt_messages_delivered_total
- mqtt_connection_count
- mqtt_message_latency_seconds
- mqtt_queue_depth
- mqtt_subscription_count
İşe Yarayan Mimari Pattern’ler
Pattern 1: Hibrit Protokol Stratejisi
Farklı layer’lar kısıtlamalarına göre farklı protokoller kullanıyor:
Her protokol mükemmel olduğu yerde çalışıyor:
- CoAP pil ile çalışan sensörler için (en düşük güç)
- DDS gerçek zamanlı otonom sistemler için (deterministik latency)
- MQTT cloud iletişimi için (güvenilirlik + basitlik)
- AMQP enterprise entegrasyon için (routing esnekliği)
Pattern 2: Cloud Aggregation ile Edge Processing
Lokal yüksek hızlı processing için ZeroMQ, cloud aggregation için MQTT kullanmak:
import * as zmq from 'zeromq';
import mqtt from 'mqtt';
class DataAggregator {
private data: any[] = [];
private windowSeconds: number;
private lastPublish: number;
constructor(windowSeconds: number) {
this.windowSeconds = windowSeconds;
this.lastPublish = Date.now();
}
add(sensorData: any): void {
this.data.push(sensorData);
}
shouldPublish(): boolean {
return (Date.now() - this.lastPublish) >= this.windowSeconds * 1000;
}
getSummary(): any {
const summary = {
count: this.data.length,
avg: this.calculateAverage(),
min: this.calculateMin(),
max: this.calculateMax(),
timestamp: new Date().toISOString()
};
this.data = [];
this.lastPublish = Date.now();
return summary;
}
private calculateAverage(): number {
if (this.data.length === 0) return 0;
const sum = this.data.reduce((acc, d) => acc + (d.value || 0), 0);
return sum / this.data.length;
}
private calculateMin(): number {
if (this.data.length === 0) return 0;
return Math.min(...this.data.map(d => d.value || 0));
}
private calculateMax(): number {
if (this.data.length === 0) return 0;
return Math.max(...this.data.map(d => d.value || 0));
}
}
// Edge: ZeroMQ for microsecond-latency local processing
async function edgeProcessor() {
// Receive from local sensors
const sensorSocket = new zmq.Pull();
await sensorSocket.bind('tcp://*:5557');
// Aggregate and forward to cloud via MQTT
const mqttClient = mqtt.connect('mqtts://mqtt-broker.example.com:8883');
const aggregator = new DataAggregator(60); // 60 seconds window
for await (const [msg] of sensorSocket) {
const sensorData = JSON.parse(msg.toString());
aggregator.add(sensorData);
// Send aggregated summaries to cloud every minute
if (aggregator.shouldPublish()) {
const summary = aggregator.getSummary();
mqttClient.publish(
'fleet/gateway-123/summary',
JSON.stringify(summary),
{ qos: 1 }
);
}
}
}
Bu, bir deployment’ta cloud bandwidth’i yaklaşık %90 azaltırken lokal gerçek zamanlı yanıt verme kabiliyetini korudu.
Pattern 3: Aşamalı İyileştirme
Basit başla, gerçek kullanıma göre optimize et:
- Faz 1: Hızlı deployment için AWS IoT Core
- Faz 2: Hacim arttıkça edge processing ekle
- Faz 3: Spesifik kısıtlamalar için özel protokoller (CoAP, DDS) ekle
- Faz 4: Kritik path’leri ZeroMQ ile optimize et
Bu yaklaşım, time-to-market ile uzun vadeli optimizasyon arasında denge kuruyor.
MQTT v5 İyileştirmeleri
Yeni başlıyorsan, MQTT v5 özelliklerini düşün (2019’da standardize edildi):
Shared Subscription (Load Balancing)
# Birden fazla consumer mesajları load-balance ediyor
client.subscribe("$share/workers/fleet/+/+/telemetry", qos=1)
Mesajlar gruptaki subscriber’lara dağılıyor, harici load balancer ihtiyacını ortadan kaldırıyor.
Request-Response Pattern
# Publisher response topic ayarlıyor
client.publish(
"vehicle/truck-123/command",
json.dumps({"command": "status"}),
properties=Properties(
ResponseTopic="response/truck-123",
CorrelationData=b"req-001"
)
)
# Responder response topic'e publish ediyor
def on_message(client, userdata, msg):
props = msg.properties
response_topic = props.ResponseTopic
correlation_data = props.CorrelationData
client.publish(
response_topic,
json.dumps({"status": "ok"}),
properties=Properties(CorrelationData=correlation_data)
)
Built-in correlation, custom topic correlation lojiğini ortadan kaldırıyor.
Message Expiry
# Mesaj 5 dakika sonra expire oluyor
client.publish(
"fleet/truck-123/gps",
json.dumps(location),
properties=Properties(MessageExpiryInterval=300)
)
Yeniden bağlanan client’lara eski konum verisinin teslimini önlüyor - gerçek zamanlı takip için kritik.
İzlenecek Performans Metrikleri
Operasyon deneyimine göre, en önemli metrikler:
Connection Metrikleri
- Eşzamanlı cihaz bağlantıları
- Connection establishment süresi
- Connection başarı oranı
- Reconnection sıklığı
Mesaj Teslimat Metrikleri
- End-to-end latency (p50, p95, p99)
- Mesaj throughput (mesaj/saniye)
- QoS’ye göre teslimat başarı oranı
- Mesaj kayıp oranı (QoS 0)
Kaynak Metrikleri
- CPU kullanımı (broker ve client)
- Connection başına memory
- Network bandwidth kullanımı
- Queue derinliği
Business Metrikleri
- Elde edilen GPS güncelleme sıklığı
- Alarm teslimat süresi (tespit - bildirim)
- Cihaz offline tespit latency’si
- Compliance raporlama için veri bütünlüğü
- Milyon mesaj başına maliyet
Maliyet Değerlendirmeleri
Protokol seçimi operasyonel maliyetleri önemli ölçüde etkiliyor:
Managed vs. Self-Hosted
AWS IoT Core (managed):
- ~$1.00 milyon mesaj başına
- Milyon connection dakikası başına $0.08
- Sıfır operasyonel overhead
- Otomatik scale
Self-hosted EMQX (tahmin):
- ~$0.10-0.20 milyon mesaj başına (instance maliyetleri)
- Bakım, scaling, güncelleme için ops ekibi gerekiyor
- Infrastructure üzerinde daha fazla kontrol
- Çok yüksek hacimlerde (10B+ mesaj/ay) daha ekonomik
QoS’nin Maliyete Etkisi
- QoS 0: Baseline maliyet
- QoS 1: ~2x network bandwidth (acknowledgment’lar)
- QoS 2: ~4x network bandwidth (four-way handshake)
Veri maliyetinin MB başına $0.10 olduğu cellular ağlarda, QoS seviye seçimi aylık faturaları doğrudan etkiliyor.
Payload Optimizasyonu
# JSON payload: ~150 bytes
json_payload = {
"vehicle_id": "truck-1234",
"latitude": 52.520008,
"longitude": 13.404954,
"speed": 85,
"fuel": 45.2,
"timestamp": "2025-11-09T10:30:00Z"
}
# Protocol Buffers: ~50 bytes (%67 azalma)
# .proto dosyası tanımla ve protobuf serialization kullan
Ayda 1 milyar mesaj için, JSON’dan Protocol Buffers’a payload sıkıştırma cellular ağlarda yıllık bandwidth maliyetlerini $50,000-100,000 azaltabilir.
Araçlar ve Implementasyonlar
MQTT Broker’lar
- HiveMQ v4.x: Enterprise odaklı, mükemmel clustering
- EMQX v5.x: Ekstrem scale, açık kaynak + enterprise
- Mosquitto v2.x: Hafif, edge deployment’lar
- AWS IoT Core: Managed servis, AWS ekosistemi
- VerneMQ: Erlang tabanlı, iyi clustering
Client Kütüphaneleri
- Paho MQTT: Python, Java, JavaScript, C/C++ (Eclipse Foundation)
- MQTT.js: Node.js, web için en popüler
- AWS IoT Device SDK: Python, JavaScript, Java, C++
AMQP Implementasyonları
- RabbitMQ v3.x: En popüler, Erlang tabanlı
- Azure Service Bus: Microsoft’tan managed AMQP
- Pika: RabbitMQ için Python AMQP client
ZeroMQ Kütüphaneleri
- libzmq: Core C library v4.x
- PyZMQ: Python bindings
- zeromq.js: Node.js bindings
Monitoring
- Prometheus + Grafana: Metrikler ve görselleştirme
- InfluxDB/TimescaleDB: Time-series storage
- CloudWatch: AWS-native monitoring
Önemli Çıkarımlar
Çeşitli lojistik senaryolarında bu protokollerle çalıştıktan sonra, en önemli noktalar:
-
MQTT çoğu IoT uygulaması için varsayılan seçim. Spesifik kısıtlamaların alternatif gerektirmediği sürece buradan başla.
-
QoS seviyelerini doğru boyutlandır. Güvenilirliği aşırı engineer etme. Sık kritik olmayan veri için QoS 0, önemli analytics için QoS 1, tekrarlamayan kritik komutlar için sadece QoS 2.
-
Baştan scale için tasarla. Clustering ve multi-region deployment, sonradan migrate etmekten başta implement etmek çok daha ucuz.
-
Kapsamlı monitoring implement et. Kullanıcılar rapor etmeden önce ne olduğunu bil. Latency yüzdeliklerini izle, sadece ortalamaları değil.
-
Hibrit mimarileri düşün. Farklı protokoller farklı görevlerde mükemmel. Edge’de CoAP, cloud’a MQTT, enterprise entegrasyon için AMQP birlikte etkili çalışabilir.
-
Protokol seçimi maliyetleri etkiliyor. QoS seviyeleri, payload formatı ve managed vs. self-hosted aylık giderleri önemli ölçüde etkiliyor.
-
Hata senaryolarını test et. Network partition’lar, broker hataları ve mesaj storm’ları production öncesi tasarım zayıflıklarını ortaya çıkarıyor.
Doğru protokol, senin spesifik kısıtlamalarına bağlı: network güvenilirliği, cihaz kaynakları, latency gereksinimleri ve scale. Bu trade-off’ları anlamak, trendleri takip etmek yerine bilinçli kararlar almana olanak sağlıyor.
İlgili yazılar
In-memory uygulama cache'lerinden distributed Redis cluster'lara ve CDN edge caching'e kadar çok katmanlı caching stratejilerini uygulamaya yönelik kapsamlı bir rehber. Cache-aside ve write-through pattern'leri ne zaman kullanılır, ElastiCache ile MemoryDB arasında nasıl seçim yapılır ve production'da cache stampede nasıl önlenir öğrenin.
Key-value storage hakkında dört temel soruyu yanıtlayan kapsamlı bir temel rehber: KV storage nedir? Nerede kullanılır? Neden KV storage seçilir? Hangi tech stack'lerde hangi çözümler var?
Projeniz için doğru veritabanını seçmek için kapsamlı rehber - SQL, NoSQL, NewSQL ve edge çözümlerini gerçek dünya implementasyon hikayeleri ve performans ölçümleri ile kapsıyor.
Stack'ten bağımsız bir WebAssembly haritası (tarayıcıda performans, sunucuda WASI runtime, edge'de compute) üç farklı bahsi ayırarak hangi konuşmanın hangisinden bahsettiğini anlamanı sağlar.
Bir keşif tezi: event-driven sistemlerde vendor lock-in runtime katmanında değil, bus topolojisinde yaşar; wasmCloud ve NATS ise bus'ı taşınabilir bir primitif haline getiriyor.