Skip to content

Event-Driven Architecture

Overview

Event-driven architecture (EDA) is a software design paradigm where system components communicate through events rather than direct calls. This approach enables loose coupling, scalability, and real-time processing.

Core Concepts

Events

An event represents a significant change in state or an occurrence in the system.

interface ChampionshipEvent {
  eventId: string;
  timestamp: Date;
  aggregateId: string;
  eventType: string;
  payload: unknown;
  metadata: {
    userId?: string;
    correlationId: string;
    causationId?: string;
  };
}

Event Types in EasyChamp

Domain Events

// Championship Events
class ChampionshipCreated {
  constructor(
    public championshipId: string,
    public name: string,
    public startDate: Date,
    public rules: ChampionshipRules
  ) {}
}

class MatchCompleted {
  constructor(
    public matchId: string,
    public championshipId: string,
    public homeScore: number,
    public awayScore: number,
    public completedAt: Date
  ) {}
}

class StandingsUpdated {
  constructor(
    public championshipId: string,
    public standings: TeamStanding[],
    public calculatedAt: Date
  ) {}
}

Integration Events

// Cross-service events
class UserRegistered {
  constructor(
    public userId: string,
    public email: string,
    public registeredAt: Date
  ) {}
}

class PaymentProcessed {
  constructor(
    public paymentId: string,
    public userId: string,
    public amount: number,
    public currency: string
  ) {}
}

Event Producers

Components that generate and publish events.

class ChampionshipService {
  constructor(private eventBus: EventBus) {}

  async createChampionship(data: CreateChampionshipDto) {
    // Business logic
    const championship = await this.repository.save(data);

    // Publish event
    await this.eventBus.publish(
      new ChampionshipCreated(
        championship.id,
        championship.name,
        championship.startDate,
        championship.rules
      )
    );

    return championship;
  }
}

Event Consumers

Components that subscribe to and process events.

@EventHandler(MatchCompleted)
class StandingsCalculator {
  async handle(event: MatchCompleted) {
    // Recalculate standings
    const standings = await this.calculateStandings(
      event.championshipId
    );

    // Publish updated standings
    await this.eventBus.publish(
      new StandingsUpdated(
        event.championshipId,
        standings,
        new Date()
      )
    );
  }
}

Implementation Patterns

Event Sourcing

Store all changes as a sequence of events.

class EventStore {
  async append(streamId: string, events: Event[]) {
    // Store events with optimistic concurrency control
  }

  async loadEvents(streamId: string): Promise<Event[]> {
    // Replay events to rebuild state
  }
}

CQRS with Events

Separate read and write models using events.

// Write side
class ChampionshipAggregate {
  private events: Event[] = [];

  createMatch(data: MatchData) {
    // Validate business rules
    this.applyEvent(new MatchCreated(data));
  }

  getUncommittedEvents() {
    return this.events;
  }
}

// Read side projection
@EventHandler(MatchCreated, MatchCompleted)
class ChampionshipReadModel {
  async handle(event: Event) {
    // Update denormalized read model
    await this.updateProjection(event);
  }
}

Saga Pattern

Coordinate long-running transactions across services.

class ChampionshipRegistrationSaga {
  private state: SagaState;

  @SagaEventHandler(UserRegistered)
  async onUserRegistered(event: UserRegistered) {
    // Start registration process
    await this.commandBus.send(
      new CreatePlayerProfile(event.userId)
    );
  }

  @SagaEventHandler(PlayerProfileCreated)
  async onProfileCreated(event: PlayerProfileCreated) {
    // Continue saga
    await this.commandBus.send(
      new AssignToDefaultLeague(event.playerId)
    );
  }

  @SagaEventHandler(LeagueAssigned)
  async onLeagueAssigned(event: LeagueAssigned) {
    // Complete saga
    await this.commandBus.send(
      new SendWelcomeEmail(event.playerId)
    );
    this.complete();
  }
}

Message Brokers

RabbitMQ Configuration

# docker-compose.yml
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq

Kafka for High Throughput

// Kafka producer configuration
const producer = new Kafka({
  clientId: 'championship-service',
  brokers: ['localhost:9092'],
  compression: CompressionTypes.GZIP,
}).producer();

// Publish with partitioning
await producer.send({
  topic: 'championship-events',
  messages: [{
    key: event.aggregateId, // Ensure ordering per aggregate
    value: JSON.stringify(event),
    headers: {
      'event-type': event.constructor.name,
      'correlation-id': event.metadata.correlationId,
    },
  }],
});

Event Delivery Guarantees

At-Least-Once Delivery

class ReliableEventPublisher {
  async publish(event: Event) {
    const maxRetries = 3;
    let attempt = 0;

    while (attempt < maxRetries) {
      try {
        await this.broker.publish(event);
        await this.outbox.markAsPublished(event.id);
        return;
      } catch (error) {
        attempt++;
        if (attempt === maxRetries) {
          await this.deadLetterQueue.send(event);
          throw error;
        }
        await this.delay(Math.pow(2, attempt) * 1000);
      }
    }
  }
}

Idempotent Consumers

class IdempotentEventHandler {
  async handle(event: Event) {
    // Check if already processed
    const processed = await this.cache.get(
      `processed:${event.eventId}`
    );

    if (processed) {
      return; // Skip duplicate
    }

    // Process event
    await this.processEvent(event);

    // Mark as processed
    await this.cache.set(
      `processed:${event.eventId}`,
      true,
      { ttl: 86400 } // 24 hours
    );
  }
}

Monitoring and Observability

Event Metrics

class EventMetrics {
  private prometheus = new PrometheusClient();

  recordEventPublished(eventType: string) {
    this.prometheus.counter('events_published_total', {
      type: eventType,
    }).inc();
  }

  recordEventProcessed(eventType: string, duration: number) {
    this.prometheus.histogram('event_processing_duration', {
      type: eventType,
    }).observe(duration);
  }

  recordEventFailed(eventType: string, error: string) {
    this.prometheus.counter('events_failed_total', {
      type: eventType,
      error: error,
    }).inc();
  }
}

Event Tracing

class EventTracing {
  async publishWithTracing(event: Event, span: Span) {
    const childSpan = this.tracer.startSpan('event.publish', {
      parent: span,
      tags: {
        'event.type': event.constructor.name,
        'event.id': event.eventId,
      },
    });

    try {
      await this.eventBus.publish(event);
      childSpan.setTag('status', 'success');
    } catch (error) {
      childSpan.setTag('error', true);
      childSpan.log({ error: error.message });
      throw error;
    } finally {
      childSpan.finish();
    }
  }
}

Best Practices

Event Design

  1. Immutable Events: Events represent facts that happened
  2. Self-Contained: Include all necessary data
  3. Versioning: Support schema evolution
  4. Meaningful Names: Use past tense (OrderPlaced, not PlaceOrder)

Error Handling

  1. Retry Logic: Implement exponential backoff
  2. Dead Letter Queues: Handle poison messages
  3. Circuit Breakers: Prevent cascade failures
  4. Compensating Transactions: Handle failures in sagas

Performance

  1. Batch Processing: Group events when possible
  2. Async Processing: Don't block on event publishing
  3. Partitioning: Distribute load across consumers
  4. Caching: Cache frequently accessed projections

Real-World Example: EasyChamp Live Updates

// Real-time match updates using Server-Sent Events
class LiveUpdateService {
  private connections = new Map<string, Response>();

  @EventHandler(MatchUpdated)
  async handleMatchUpdate(event: MatchUpdated) {
    // Get all clients watching this match
    const clients = this.connections.get(event.matchId);

    // Stream update to connected clients
    clients?.forEach(client => {
      client.write(`data: ${JSON.stringify({
        type: 'match-update',
        matchId: event.matchId,
        score: event.score,
        time: event.gameTime,
      })}\n\n`);
    });
  }

  streamUpdates(matchId: string, response: Response) {
    // Set up SSE
    response.writeHead(200, {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    });

    // Store connection
    this.connections.set(matchId, response);

    // Clean up on disconnect
    response.on('close', () => {
      this.connections.delete(matchId);
    });
  }
}

Testing Event-Driven Systems

describe('Championship Event Flow', () => {
  let eventBus: TestEventBus;

  beforeEach(() => {
    eventBus = new TestEventBus();
  });

  it('should update standings when match completes', async () => {
    // Arrange
    const handler = new StandingsCalculator(eventBus);

    // Act
    await eventBus.publish(new MatchCompleted(
      'match-1',
      'championship-1',
      2,
      1,
      new Date()
    ));

    // Assert
    const publishedEvents = eventBus.getPublishedEvents();
    expect(publishedEvents).toContainEqual(
      expect.objectContaining({
        constructor: StandingsUpdated,
        championshipId: 'championship-1',
      })
    );
  });
});

References


Last updated: 2025-09-09