Event-Driven Architecture with Node.js and Apache Kafka
Introduction
Event-driven architecture (EDA) decouples services by having them communicate through events rather than direct API calls. Services publish events when something happens; interested services subscribe and react. The result: loose coupling, independent scalability, and natural audit trails.
Kafka is the backbone of this pattern at most large-scale companies. This guide walks through a production implementation with Node.js.
Core Concepts
- Producer — publishes events to a topic
- Topic — a named, ordered, durable log of events
- Partition — a topic is split into partitions for parallelism
- Consumer — reads events from a topic
- Consumer Group — multiple consumers sharing work across partitions
- Offset — a consumer's position in a partition
Order Service (Producer) → [orders topic] → Inventory Service (Consumer)
→ Notification Service (Consumer)
→ Analytics Service (Consumer)
Setup with KafkaJS
npm install kafkajs
// lib/kafka.ts
import { Kafka, logLevel } from 'kafkajs';
export const kafka = new Kafka({
clientId: 'order-service',
brokers: process.env.KAFKA_BROKERS!.split(','),
ssl: process.env.NODE_ENV === 'production',
sasl: process.env.NODE_ENV === 'production' ? {
mechanism: 'scram-sha-512',
username: process.env.KAFKA_USERNAME!,
password: process.env.KAFKA_PASSWORD!,
} : undefined,
logLevel: logLevel.WARN,
});
Producer: Publishing Events
// services/order/events/publisher.ts
import { kafka } from '@/lib/kafka';
import type { Order } from '../types';
const producer = kafka.producer({
idempotent: true, // Exactly-once delivery guarantee
maxInFlightRequests: 5,
});
let connected = false;
async function ensureConnected() {
if (!connected) {
await producer.connect();
connected = true;
}
}
export async function publishOrderCreated(order: Order) {
await ensureConnected();
await producer.send({
topic: 'orders',
messages: [
{
key: order.id, // Same key = same partition = ordered delivery
value: JSON.stringify({
eventType: 'ORDER_CREATED',
eventId: crypto.randomUUID(),
occurredAt: new Date().toISOString(),
payload: {
orderId: order.id,
userId: order.userId,
items: order.items,
totalAmount: order.totalAmount,
},
}),
headers: {
'content-type': 'application/json',
'schema-version': '1',
},
},
],
});
}
Consumer: Processing Events
// services/inventory/consumers/orderConsumer.ts
import { kafka } from '@/lib/kafka';
import { db } from '@/lib/db';
const consumer = kafka.consumer({
groupId: 'inventory-service-group',
});
export async function startOrderConsumer() {
await consumer.connect();
await consumer.subscribe({ topics: ['orders'], fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value!.toString());
if (event.eventType !== 'ORDER_CREATED') return;
try {
await processOrderCreated(event.payload);
// Offset is committed automatically after eachMessage resolves
} catch (error) {
console.error('Failed to process event:', event.eventId, error);
// Send to dead-letter topic
await publishToDLQ(topic, message);
}
},
});
}
async function processOrderCreated(payload: { orderId: string; items: Item[] }) {
// Idempotency check — avoid reprocessing on consumer restart
const alreadyProcessed = await db.processedEvent.findUnique({
where: { orderId: payload.orderId },
});
if (alreadyProcessed) return;
// Reduce inventory
await db.$transaction(async (tx) => {
for (const item of payload.items) {
await tx.inventory.update({
where: { productId: item.productId },
data: { quantity: { decrement: item.quantity } },
});
}
await tx.processedEvent.create({
data: { orderId: payload.orderId, processedAt: new Date() },
});
});
}
Dead Letter Queue Pattern
// lib/dlq.ts
const dlqProducer = kafka.producer();
export async function publishToDLQ(originalTopic: string, message: KafkaMessage) {
await dlqProducer.send({
topic: `${originalTopic}.dlq`,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'dlq-original-topic': originalTopic,
'dlq-failed-at': new Date().toISOString(),
},
}],
});
}
Schema Evolution with Avro
As your event schema changes, you need backward compatibility. Use Confluent Schema Registry:
// Version 1
const orderCreatedV1 = {
type: 'record',
name: 'OrderCreated',
fields: [
{ name: 'orderId', type: 'string' },
{ name: 'userId', type: 'string' },
{ name: 'totalAmount', type: 'double' },
],
};
// Version 2 — adding optional field (backward compatible)
const orderCreatedV2 = {
type: 'record',
name: 'OrderCreated',
fields: [
{ name: 'orderId', type: 'string' },
{ name: 'userId', type: 'string' },
{ name: 'totalAmount', type: 'double' },
{ name: 'currency', type: ['null', 'string'], default: null }, // NULLABLE = safe addition
],
};
Conclusion
Event-driven architecture with Kafka makes your services genuinely independent. Failures in one service don't cascade; new services can subscribe without touching existing ones; every state change is durably recorded.
Key takeaways:
- Use message keys to ensure ordered delivery per entity
- Always implement idempotency — consumers will reprocess on restart
- Dead-letter queues are non-negotiable for production
- Schema registry prevents breaking changes in event contracts
Related: See the post on API Gateway patterns for how to combine synchronous and asynchronous communication in one architecture.