Event-Driven Architecture¶
Overview¶
Event-driven architecture (EDA) is a software design paradigm where system components communicate through events rather than direct calls. This approach enables loose coupling, scalability, and real-time processing.
Core Concepts¶
Events¶
An event represents a significant change in state or an occurrence in the system.
interface ChampionshipEvent {
eventId: string;
timestamp: Date;
aggregateId: string;
eventType: string;
payload: unknown;
metadata: {
userId?: string;
correlationId: string;
causationId?: string;
};
}
Event Types in EasyChamp¶
Domain Events¶
// Championship Events
class ChampionshipCreated {
constructor(
public championshipId: string,
public name: string,
public startDate: Date,
public rules: ChampionshipRules
) {}
}
class MatchCompleted {
constructor(
public matchId: string,
public championshipId: string,
public homeScore: number,
public awayScore: number,
public completedAt: Date
) {}
}
class StandingsUpdated {
constructor(
public championshipId: string,
public standings: TeamStanding[],
public calculatedAt: Date
) {}
}
Integration Events¶
// Cross-service events
class UserRegistered {
constructor(
public userId: string,
public email: string,
public registeredAt: Date
) {}
}
class PaymentProcessed {
constructor(
public paymentId: string,
public userId: string,
public amount: number,
public currency: string
) {}
}
Event Producers¶
Components that generate and publish events.
class ChampionshipService {
constructor(private eventBus: EventBus) {}
async createChampionship(data: CreateChampionshipDto) {
// Business logic
const championship = await this.repository.save(data);
// Publish event
await this.eventBus.publish(
new ChampionshipCreated(
championship.id,
championship.name,
championship.startDate,
championship.rules
)
);
return championship;
}
}
Event Consumers¶
Components that subscribe to and process events.
@EventHandler(MatchCompleted)
class StandingsCalculator {
async handle(event: MatchCompleted) {
// Recalculate standings
const standings = await this.calculateStandings(
event.championshipId
);
// Publish updated standings
await this.eventBus.publish(
new StandingsUpdated(
event.championshipId,
standings,
new Date()
)
);
}
}
Implementation Patterns¶
Event Sourcing¶
Store all changes as a sequence of events.
class EventStore {
async append(streamId: string, events: Event[]) {
// Store events with optimistic concurrency control
}
async loadEvents(streamId: string): Promise<Event[]> {
// Replay events to rebuild state
}
}
CQRS with Events¶
Separate read and write models using events.
// Write side
class ChampionshipAggregate {
private events: Event[] = [];
createMatch(data: MatchData) {
// Validate business rules
this.applyEvent(new MatchCreated(data));
}
getUncommittedEvents() {
return this.events;
}
}
// Read side projection
@EventHandler(MatchCreated, MatchCompleted)
class ChampionshipReadModel {
async handle(event: Event) {
// Update denormalized read model
await this.updateProjection(event);
}
}
Saga Pattern¶
Coordinate long-running transactions across services.
class ChampionshipRegistrationSaga {
private state: SagaState;
@SagaEventHandler(UserRegistered)
async onUserRegistered(event: UserRegistered) {
// Start registration process
await this.commandBus.send(
new CreatePlayerProfile(event.userId)
);
}
@SagaEventHandler(PlayerProfileCreated)
async onProfileCreated(event: PlayerProfileCreated) {
// Continue saga
await this.commandBus.send(
new AssignToDefaultLeague(event.playerId)
);
}
@SagaEventHandler(LeagueAssigned)
async onLeagueAssigned(event: LeagueAssigned) {
// Complete saga
await this.commandBus.send(
new SendWelcomeEmail(event.playerId)
);
this.complete();
}
}
Message Brokers¶
RabbitMQ Configuration¶
# docker-compose.yml
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin
volumes:
- rabbitmq_data:/var/lib/rabbitmq
Kafka for High Throughput¶
// Kafka producer configuration
const producer = new Kafka({
clientId: 'championship-service',
brokers: ['localhost:9092'],
compression: CompressionTypes.GZIP,
}).producer();
// Publish with partitioning
await producer.send({
topic: 'championship-events',
messages: [{
key: event.aggregateId, // Ensure ordering per aggregate
value: JSON.stringify(event),
headers: {
'event-type': event.constructor.name,
'correlation-id': event.metadata.correlationId,
},
}],
});
Event Delivery Guarantees¶
At-Least-Once Delivery¶
class ReliableEventPublisher {
async publish(event: Event) {
const maxRetries = 3;
let attempt = 0;
while (attempt < maxRetries) {
try {
await this.broker.publish(event);
await this.outbox.markAsPublished(event.id);
return;
} catch (error) {
attempt++;
if (attempt === maxRetries) {
await this.deadLetterQueue.send(event);
throw error;
}
await this.delay(Math.pow(2, attempt) * 1000);
}
}
}
}
Idempotent Consumers¶
class IdempotentEventHandler {
async handle(event: Event) {
// Check if already processed
const processed = await this.cache.get(
`processed:${event.eventId}`
);
if (processed) {
return; // Skip duplicate
}
// Process event
await this.processEvent(event);
// Mark as processed
await this.cache.set(
`processed:${event.eventId}`,
true,
{ ttl: 86400 } // 24 hours
);
}
}
Monitoring and Observability¶
Event Metrics¶
class EventMetrics {
private prometheus = new PrometheusClient();
recordEventPublished(eventType: string) {
this.prometheus.counter('events_published_total', {
type: eventType,
}).inc();
}
recordEventProcessed(eventType: string, duration: number) {
this.prometheus.histogram('event_processing_duration', {
type: eventType,
}).observe(duration);
}
recordEventFailed(eventType: string, error: string) {
this.prometheus.counter('events_failed_total', {
type: eventType,
error: error,
}).inc();
}
}
Event Tracing¶
class EventTracing {
async publishWithTracing(event: Event, span: Span) {
const childSpan = this.tracer.startSpan('event.publish', {
parent: span,
tags: {
'event.type': event.constructor.name,
'event.id': event.eventId,
},
});
try {
await this.eventBus.publish(event);
childSpan.setTag('status', 'success');
} catch (error) {
childSpan.setTag('error', true);
childSpan.log({ error: error.message });
throw error;
} finally {
childSpan.finish();
}
}
}
Best Practices¶
Event Design¶
- Immutable Events: Events represent facts that happened
- Self-Contained: Include all necessary data
- Versioning: Support schema evolution
- Meaningful Names: Use past tense (OrderPlaced, not PlaceOrder)
Error Handling¶
- Retry Logic: Implement exponential backoff
- Dead Letter Queues: Handle poison messages
- Circuit Breakers: Prevent cascade failures
- Compensating Transactions: Handle failures in sagas
Performance¶
- Batch Processing: Group events when possible
- Async Processing: Don't block on event publishing
- Partitioning: Distribute load across consumers
- Caching: Cache frequently accessed projections
Real-World Example: EasyChamp Live Updates¶
// Real-time match updates using Server-Sent Events
class LiveUpdateService {
private connections = new Map<string, Response>();
@EventHandler(MatchUpdated)
async handleMatchUpdate(event: MatchUpdated) {
// Get all clients watching this match
const clients = this.connections.get(event.matchId);
// Stream update to connected clients
clients?.forEach(client => {
client.write(`data: ${JSON.stringify({
type: 'match-update',
matchId: event.matchId,
score: event.score,
time: event.gameTime,
})}\n\n`);
});
}
streamUpdates(matchId: string, response: Response) {
// Set up SSE
response.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
// Store connection
this.connections.set(matchId, response);
// Clean up on disconnect
response.on('close', () => {
this.connections.delete(matchId);
});
}
}
Testing Event-Driven Systems¶
describe('Championship Event Flow', () => {
let eventBus: TestEventBus;
beforeEach(() => {
eventBus = new TestEventBus();
});
it('should update standings when match completes', async () => {
// Arrange
const handler = new StandingsCalculator(eventBus);
// Act
await eventBus.publish(new MatchCompleted(
'match-1',
'championship-1',
2,
1,
new Date()
));
// Assert
const publishedEvents = eventBus.getPublishedEvents();
expect(publishedEvents).toContainEqual(
expect.objectContaining({
constructor: StandingsUpdated,
championshipId: 'championship-1',
})
);
});
});
References¶
- Enterprise Integration Patterns
- Martin Fowler on Event Sourcing
- Designing Event-Driven Systems (Confluent)
Last updated: 2025-09-09