Skip to content

CQRS (Command Query Responsibility Segregation)

Overview

CQRS separates read and write operations into different models, optimizing each for its specific purpose. Commands modify state, while queries retrieve data without side effects.

Core Concepts

Command Model

Handles write operations and business logic.

// Command definition
class CreateChampionshipCommand {
  constructor(
    public readonly name: string,
    public readonly startDate: Date,
    public readonly endDate: Date,
    public readonly maxTeams: number,
    public readonly rules: ChampionshipRules
  ) {}
}

// Command handler
@CommandHandler(CreateChampionshipCommand)
class CreateChampionshipHandler {
  constructor(
    private repository: ChampionshipRepository,
    private eventBus: EventBus
  ) {}

  async execute(command: CreateChampionshipCommand): Promise<void> {
    // Validate business rules
    if (command.maxTeams < 2) {
      throw new BusinessRuleViolation('Championship requires at least 2 teams');
    }

    // Create aggregate
    const championship = Championship.create(
      command.name,
      command.startDate,
      command.endDate,
      command.maxTeams,
      command.rules
    );

    // Save to write store
    await this.repository.save(championship);

    // Publish domain events
    const events = championship.getUncommittedEvents();
    await this.eventBus.publishAll(events);
  }
}

Query Model

Optimized for reading data.

// Query definition
class GetChampionshipDetailsQuery {
  constructor(
    public readonly championshipId: string,
    public readonly includeStats: boolean = false
  ) {}
}

// Query handler
@QueryHandler(GetChampionshipDetailsQuery)
class GetChampionshipDetailsHandler {
  constructor(private readDb: ReadModelDatabase) {}

  async execute(query: GetChampionshipDetailsQuery): Promise<ChampionshipView> {
    // Read from denormalized view
    const view = await this.readDb.query(`
      SELECT 
        c.id,
        c.name,
        c.start_date,
        c.end_date,
        c.current_round,
        COUNT(DISTINCT t.id) as total_teams,
        COUNT(DISTINCT m.id) as total_matches,
        COUNT(DISTINCT CASE WHEN m.status = 'completed' THEN m.id END) as completed_matches
      FROM championship_view c
      LEFT JOIN teams_view t ON t.championship_id = c.id
      LEFT JOIN matches_view m ON m.championship_id = c.id
      WHERE c.id = $1
      GROUP BY c.id
    `, [query.championshipId]);

    if (query.includeStats) {
      view.stats = await this.getChampionshipStats(query.championshipId);
    }

    return view;
  }
}

Implementation Patterns

Event Sourcing with CQRS

Store all changes as events and build read models from them.

// Aggregate with event sourcing
class Championship extends AggregateRoot {
  private id: string;
  private name: string;
  private teams: Team[] = [];
  private matches: Match[] = [];
  private status: ChampionshipStatus;

  // Commands modify state and produce events
  addTeam(team: Team): void {
    // Validate
    if (this.teams.length >= this.maxTeams) {
      throw new Error('Championship is full');
    }

    // Apply event
    this.apply(new TeamAddedEvent(this.id, team));
  }

  scheduleMatch(homeTeamId: string, awayTeamId: string, date: Date): void {
    // Business logic
    if (!this.canScheduleMatch(homeTeamId, awayTeamId, date)) {
      throw new Error('Cannot schedule match');
    }

    const match = new Match(uuid(), homeTeamId, awayTeamId, date);
    this.apply(new MatchScheduledEvent(this.id, match));
  }

  // Event handlers update internal state
  protected onTeamAddedEvent(event: TeamAddedEvent): void {
    this.teams.push(event.team);
  }

  protected onMatchScheduledEvent(event: MatchScheduledEvent): void {
    this.matches.push(event.match);
  }
}

Projection Building

Build read models from events.

// Projection handler updates read models
@EventHandler(TeamAddedEvent, MatchScheduledEvent, MatchCompletedEvent)
class ChampionshipProjection {
  constructor(private readDb: ReadModelDatabase) {}

  async handleTeamAdded(event: TeamAddedEvent): Promise<void> {
    await this.readDb.execute(`
      INSERT INTO teams_view (id, championship_id, name, joined_at)
      VALUES ($1, $2, $3, $4)
    `, [event.team.id, event.championshipId, event.team.name, event.timestamp]);

    // Update denormalized championship view
    await this.updateChampionshipStats(event.championshipId);
  }

  async handleMatchCompleted(event: MatchCompletedEvent): Promise<void> {
    // Update match view
    await this.readDb.execute(`
      UPDATE matches_view 
      SET status = 'completed',
          home_score = $1,
          away_score = $2,
          completed_at = $3
      WHERE id = $4
    `, [event.homeScore, event.awayScore, event.timestamp, event.matchId]);

    // Update standings view
    await this.updateStandings(event.championshipId);

    // Update team statistics
    await this.updateTeamStats(event.homeTeamId, event.awayTeamId);
  }

  private async updateStandings(championshipId: string): Promise<void> {
    // Complex calculation for standings
    await this.readDb.execute(`
      WITH match_results AS (
        SELECT 
          CASE 
            WHEN home_score > away_score THEN home_team_id
            WHEN away_score > home_score THEN away_team_id
            ELSE NULL
          END as winner_id,
          CASE 
            WHEN home_score < away_score THEN home_team_id
            WHEN away_score < home_score THEN away_team_id
            ELSE NULL
          END as loser_id,
          CASE 
            WHEN home_score = away_score THEN home_team_id
          END as draw_team1_id,
          CASE 
            WHEN home_score = away_score THEN away_team_id
          END as draw_team2_id
        FROM matches_view
        WHERE championship_id = $1 AND status = 'completed'
      )
      INSERT INTO standings_view (championship_id, team_id, points, wins, draws, losses, position)
      SELECT 
        $1,
        t.id,
        COALESCE(w.wins, 0) * 3 + COALESCE(d.draws, 0) as points,
        COALESCE(w.wins, 0),
        COALESCE(d.draws, 0),
        COALESCE(l.losses, 0),
        ROW_NUMBER() OVER (ORDER BY COALESCE(w.wins, 0) * 3 + COALESCE(d.draws, 0) DESC)
      FROM teams_view t
      LEFT JOIN (
        SELECT winner_id as team_id, COUNT(*) as wins
        FROM match_results WHERE winner_id IS NOT NULL
        GROUP BY winner_id
      ) w ON w.team_id = t.id
      LEFT JOIN (
        SELECT team_id, COUNT(*) as draws FROM (
          SELECT draw_team1_id as team_id FROM match_results WHERE draw_team1_id IS NOT NULL
          UNION ALL
          SELECT draw_team2_id as team_id FROM match_results WHERE draw_team2_id IS NOT NULL
        ) draws GROUP BY team_id
      ) d ON d.team_id = t.id
      LEFT JOIN (
        SELECT loser_id as team_id, COUNT(*) as losses
        FROM match_results WHERE loser_id IS NOT NULL
        GROUP BY loser_id
      ) l ON l.team_id = t.id
      WHERE t.championship_id = $1
      ON CONFLICT (championship_id, team_id) 
      DO UPDATE SET 
        points = EXCLUDED.points,
        wins = EXCLUDED.wins,
        draws = EXCLUDED.draws,
        losses = EXCLUDED.losses,
        position = EXCLUDED.position
    `, [championshipId]);
  }
}

Synchronization Strategies

Eventual Consistency

Read models are updated asynchronously.

class EventualConsistencyManager {
  private projectionLag = new Map<string, number>();

  async processEvent(event: DomainEvent): Promise<void> {
    const startTime = Date.now();

    try {
      // Update all projections
      await Promise.all([
        this.updateChampionshipView(event),
        this.updateStandingsView(event),
        this.updateStatisticsView(event)
      ]);

      // Track projection lag
      const lag = Date.now() - event.timestamp.getTime();
      this.projectionLag.set(event.aggregateId, lag);

      // Alert if lag is too high
      if (lag > 5000) { // 5 seconds
        this.alerting.warn(`High projection lag: ${lag}ms for ${event.aggregateId}`);
      }
    } catch (error) {
      // Handle projection errors
      await this.handleProjectionError(event, error);
    }
  }

  private async handleProjectionError(event: DomainEvent, error: Error): Promise<void> {
    // Log error
    this.logger.error('Projection failed', { event, error });

    // Store for retry
    await this.failedEventsStore.add(event);

    // Mark projection as inconsistent
    await this.markProjectionInconsistent(event.aggregateId);
  }
}

Synchronous Projections

Update read models synchronously for critical data.

class SynchronousProjectionHandler {
  async handle(command: Command): Promise<void> {
    const transaction = await this.db.beginTransaction();

    try {
      // Execute command
      const events = await this.commandHandler.execute(command);

      // Update write model
      await this.eventStore.append(events, transaction);

      // Update read model in same transaction
      for (const event of events) {
        await this.projectionHandler.project(event, transaction);
      }

      await transaction.commit();
    } catch (error) {
      await transaction.rollback();
      throw error;
    }
  }
}

Query Optimization

Materialized Views

Pre-compute complex queries.

-- Materialized view for championship dashboard
CREATE MATERIALIZED VIEW championship_dashboard AS
SELECT 
  c.id,
  c.name,
  c.status,
  COUNT(DISTINCT t.id) as total_teams,
  COUNT(DISTINCT m.id) as total_matches,
  COUNT(DISTINCT CASE WHEN m.status = 'completed' THEN m.id END) as completed_matches,
  AVG(m.home_score + m.away_score) as avg_goals_per_match,
  MAX(s.points) as leader_points,
  (SELECT name FROM teams_view WHERE id = 
    (SELECT team_id FROM standings_view WHERE championship_id = c.id ORDER BY position LIMIT 1)
  ) as current_leader
FROM championships c
LEFT JOIN teams_view t ON t.championship_id = c.id
LEFT JOIN matches_view m ON m.championship_id = c.id
LEFT JOIN standings_view s ON s.championship_id = c.id
GROUP BY c.id;

-- Refresh strategy
CREATE OR REPLACE FUNCTION refresh_championship_dashboard()
RETURNS trigger AS $$
BEGIN
  REFRESH MATERIALIZED VIEW CONCURRENTLY championship_dashboard;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER refresh_dashboard_on_match_complete
AFTER UPDATE ON matches_view
FOR EACH ROW
WHEN (NEW.status = 'completed' AND OLD.status != 'completed')
EXECUTE FUNCTION refresh_championship_dashboard();

Read Model Caching

Cache frequently accessed read models.

class CachedQueryHandler {
  constructor(
    private cache: Redis,
    private db: ReadModelDatabase
  ) {}

  async getChampionshipLeaderboard(championshipId: string): Promise<Leaderboard> {
    const cacheKey = `leaderboard:${championshipId}`;

    // Try cache first
    const cached = await this.cache.get(cacheKey);
    if (cached) {
      return JSON.parse(cached);
    }

    // Query database
    const leaderboard = await this.db.query(`
      SELECT 
        t.id,
        t.name,
        s.points,
        s.wins,
        s.draws,
        s.losses,
        s.goals_for,
        s.goals_against,
        s.goal_difference,
        s.position
      FROM standings_view s
      JOIN teams_view t ON t.id = s.team_id
      WHERE s.championship_id = $1
      ORDER BY s.position
    `, [championshipId]);

    // Cache with TTL
    await this.cache.setex(
      cacheKey,
      300, // 5 minutes
      JSON.stringify(leaderboard)
    );

    return leaderboard;
  }

  // Invalidate cache on updates
  @EventHandler(MatchCompletedEvent)
  async invalidateLeaderboardCache(event: MatchCompletedEvent): Promise<void> {
    await this.cache.del(`leaderboard:${event.championshipId}`);
  }
}

Testing CQRS Systems

Command Testing

describe('CreateChampionshipCommand', () => {
  let handler: CreateChampionshipHandler;
  let eventBus: MockEventBus;

  beforeEach(() => {
    eventBus = new MockEventBus();
    handler = new CreateChampionshipHandler(
      new InMemoryRepository(),
      eventBus
    );
  });

  it('should create championship and publish event', async () => {
    // Arrange
    const command = new CreateChampionshipCommand(
      'Premier League',
      new Date('2025-01-01'),
      new Date('2025-12-31'),
      20,
      defaultRules
    );

    // Act
    await handler.execute(command);

    // Assert
    const events = eventBus.getPublishedEvents();
    expect(events).toHaveLength(1);
    expect(events[0]).toBeInstanceOf(ChampionshipCreatedEvent);
    expect(events[0].name).toBe('Premier League');
  });

  it('should reject invalid championship', async () => {
    // Arrange
    const command = new CreateChampionshipCommand(
      'Invalid',
      new Date('2025-01-01'),
      new Date('2024-12-31'), // End before start
      1, // Too few teams
      defaultRules
    );

    // Act & Assert
    await expect(handler.execute(command))
      .rejects.toThrow(BusinessRuleViolation);
  });
});

Query Testing

describe('Championship Query Handler', () => {
  it('should return denormalized championship view', async () => {
    // Arrange
    const handler = new GetChampionshipDetailsHandler(testReadDb);
    await seedTestData();

    // Act
    const result = await handler.execute(
      new GetChampionshipDetailsQuery('champ-1', true)
    );

    // Assert
    expect(result).toMatchObject({
      id: 'champ-1',
      name: 'Test Championship',
      totalTeams: 4,
      totalMatches: 6,
      completedMatches: 3,
      stats: {
        averageGoalsPerMatch: 2.5,
        highestScoringTeam: 'Team A'
      }
    });
  });
});

Projection Testing

describe('Championship Projection', () => {
  it('should update standings when match completes', async () => {
    // Arrange
    const projection = new ChampionshipProjection(testReadDb);

    // Act
    await projection.handleMatchCompleted(new MatchCompletedEvent(
      'match-1',
      'champ-1',
      'team-a',
      'team-b',
      2, // home score
      1  // away score
    ));

    // Assert
    const standings = await testReadDb.query(
      'SELECT * FROM standings_view WHERE championship_id = $1',
      ['champ-1']
    );

    const teamAStanding = standings.find(s => s.team_id === 'team-a');
    expect(teamAStanding.wins).toBe(1);
    expect(teamAStanding.points).toBe(3);
  });
});

Benefits and Tradeoffs

Benefits

  • Performance: Optimized read and write models
  • Scalability: Scale reads and writes independently
  • Flexibility: Different storage for different models
  • Simplicity: Simpler queries on denormalized data

Tradeoffs

  • Complexity: More moving parts
  • Eventual Consistency: Read models may lag
  • Storage: Duplicate data in multiple models
  • Maintenance: Keep projections in sync

When to Use CQRS

  • Complex domains with different read/write patterns
  • High-performance requirements
  • Need for multiple read models
  • Event sourcing implementations
  • Collaborative domains with many users

References


Last updated: 2025-09-09