CQRS (Command Query Responsibility Segregation)¶
Overview¶
CQRS separates read and write operations into different models, optimizing each for its specific purpose. Commands modify state, while queries retrieve data without side effects.
Core Concepts¶
Command Model¶
Handles write operations and business logic.
// Command definition
class CreateChampionshipCommand {
constructor(
public readonly name: string,
public readonly startDate: Date,
public readonly endDate: Date,
public readonly maxTeams: number,
public readonly rules: ChampionshipRules
) {}
}
// Command handler
@CommandHandler(CreateChampionshipCommand)
class CreateChampionshipHandler {
constructor(
private repository: ChampionshipRepository,
private eventBus: EventBus
) {}
async execute(command: CreateChampionshipCommand): Promise<void> {
// Validate business rules
if (command.maxTeams < 2) {
throw new BusinessRuleViolation('Championship requires at least 2 teams');
}
// Create aggregate
const championship = Championship.create(
command.name,
command.startDate,
command.endDate,
command.maxTeams,
command.rules
);
// Save to write store
await this.repository.save(championship);
// Publish domain events
const events = championship.getUncommittedEvents();
await this.eventBus.publishAll(events);
}
}
Query Model¶
Optimized for reading data.
// Query definition
class GetChampionshipDetailsQuery {
constructor(
public readonly championshipId: string,
public readonly includeStats: boolean = false
) {}
}
// Query handler
@QueryHandler(GetChampionshipDetailsQuery)
class GetChampionshipDetailsHandler {
constructor(private readDb: ReadModelDatabase) {}
async execute(query: GetChampionshipDetailsQuery): Promise<ChampionshipView> {
// Read from denormalized view
const view = await this.readDb.query(`
SELECT
c.id,
c.name,
c.start_date,
c.end_date,
c.current_round,
COUNT(DISTINCT t.id) as total_teams,
COUNT(DISTINCT m.id) as total_matches,
COUNT(DISTINCT CASE WHEN m.status = 'completed' THEN m.id END) as completed_matches
FROM championship_view c
LEFT JOIN teams_view t ON t.championship_id = c.id
LEFT JOIN matches_view m ON m.championship_id = c.id
WHERE c.id = $1
GROUP BY c.id
`, [query.championshipId]);
if (query.includeStats) {
view.stats = await this.getChampionshipStats(query.championshipId);
}
return view;
}
}
Implementation Patterns¶
Event Sourcing with CQRS¶
Store all changes as events and build read models from them.
// Aggregate with event sourcing
class Championship extends AggregateRoot {
private id: string;
private name: string;
private teams: Team[] = [];
private matches: Match[] = [];
private status: ChampionshipStatus;
// Commands modify state and produce events
addTeam(team: Team): void {
// Validate
if (this.teams.length >= this.maxTeams) {
throw new Error('Championship is full');
}
// Apply event
this.apply(new TeamAddedEvent(this.id, team));
}
scheduleMatch(homeTeamId: string, awayTeamId: string, date: Date): void {
// Business logic
if (!this.canScheduleMatch(homeTeamId, awayTeamId, date)) {
throw new Error('Cannot schedule match');
}
const match = new Match(uuid(), homeTeamId, awayTeamId, date);
this.apply(new MatchScheduledEvent(this.id, match));
}
// Event handlers update internal state
protected onTeamAddedEvent(event: TeamAddedEvent): void {
this.teams.push(event.team);
}
protected onMatchScheduledEvent(event: MatchScheduledEvent): void {
this.matches.push(event.match);
}
}
Projection Building¶
Build read models from events.
// Projection handler updates read models
@EventHandler(TeamAddedEvent, MatchScheduledEvent, MatchCompletedEvent)
class ChampionshipProjection {
constructor(private readDb: ReadModelDatabase) {}
async handleTeamAdded(event: TeamAddedEvent): Promise<void> {
await this.readDb.execute(`
INSERT INTO teams_view (id, championship_id, name, joined_at)
VALUES ($1, $2, $3, $4)
`, [event.team.id, event.championshipId, event.team.name, event.timestamp]);
// Update denormalized championship view
await this.updateChampionshipStats(event.championshipId);
}
async handleMatchCompleted(event: MatchCompletedEvent): Promise<void> {
// Update match view
await this.readDb.execute(`
UPDATE matches_view
SET status = 'completed',
home_score = $1,
away_score = $2,
completed_at = $3
WHERE id = $4
`, [event.homeScore, event.awayScore, event.timestamp, event.matchId]);
// Update standings view
await this.updateStandings(event.championshipId);
// Update team statistics
await this.updateTeamStats(event.homeTeamId, event.awayTeamId);
}
private async updateStandings(championshipId: string): Promise<void> {
// Complex calculation for standings
await this.readDb.execute(`
WITH match_results AS (
SELECT
CASE
WHEN home_score > away_score THEN home_team_id
WHEN away_score > home_score THEN away_team_id
ELSE NULL
END as winner_id,
CASE
WHEN home_score < away_score THEN home_team_id
WHEN away_score < home_score THEN away_team_id
ELSE NULL
END as loser_id,
CASE
WHEN home_score = away_score THEN home_team_id
END as draw_team1_id,
CASE
WHEN home_score = away_score THEN away_team_id
END as draw_team2_id
FROM matches_view
WHERE championship_id = $1 AND status = 'completed'
)
INSERT INTO standings_view (championship_id, team_id, points, wins, draws, losses, position)
SELECT
$1,
t.id,
COALESCE(w.wins, 0) * 3 + COALESCE(d.draws, 0) as points,
COALESCE(w.wins, 0),
COALESCE(d.draws, 0),
COALESCE(l.losses, 0),
ROW_NUMBER() OVER (ORDER BY COALESCE(w.wins, 0) * 3 + COALESCE(d.draws, 0) DESC)
FROM teams_view t
LEFT JOIN (
SELECT winner_id as team_id, COUNT(*) as wins
FROM match_results WHERE winner_id IS NOT NULL
GROUP BY winner_id
) w ON w.team_id = t.id
LEFT JOIN (
SELECT team_id, COUNT(*) as draws FROM (
SELECT draw_team1_id as team_id FROM match_results WHERE draw_team1_id IS NOT NULL
UNION ALL
SELECT draw_team2_id as team_id FROM match_results WHERE draw_team2_id IS NOT NULL
) draws GROUP BY team_id
) d ON d.team_id = t.id
LEFT JOIN (
SELECT loser_id as team_id, COUNT(*) as losses
FROM match_results WHERE loser_id IS NOT NULL
GROUP BY loser_id
) l ON l.team_id = t.id
WHERE t.championship_id = $1
ON CONFLICT (championship_id, team_id)
DO UPDATE SET
points = EXCLUDED.points,
wins = EXCLUDED.wins,
draws = EXCLUDED.draws,
losses = EXCLUDED.losses,
position = EXCLUDED.position
`, [championshipId]);
}
}
Synchronization Strategies¶
Eventual Consistency¶
Read models are updated asynchronously.
class EventualConsistencyManager {
private projectionLag = new Map<string, number>();
async processEvent(event: DomainEvent): Promise<void> {
const startTime = Date.now();
try {
// Update all projections
await Promise.all([
this.updateChampionshipView(event),
this.updateStandingsView(event),
this.updateStatisticsView(event)
]);
// Track projection lag
const lag = Date.now() - event.timestamp.getTime();
this.projectionLag.set(event.aggregateId, lag);
// Alert if lag is too high
if (lag > 5000) { // 5 seconds
this.alerting.warn(`High projection lag: ${lag}ms for ${event.aggregateId}`);
}
} catch (error) {
// Handle projection errors
await this.handleProjectionError(event, error);
}
}
private async handleProjectionError(event: DomainEvent, error: Error): Promise<void> {
// Log error
this.logger.error('Projection failed', { event, error });
// Store for retry
await this.failedEventsStore.add(event);
// Mark projection as inconsistent
await this.markProjectionInconsistent(event.aggregateId);
}
}
Synchronous Projections¶
Update read models synchronously for critical data.
class SynchronousProjectionHandler {
async handle(command: Command): Promise<void> {
const transaction = await this.db.beginTransaction();
try {
// Execute command
const events = await this.commandHandler.execute(command);
// Update write model
await this.eventStore.append(events, transaction);
// Update read model in same transaction
for (const event of events) {
await this.projectionHandler.project(event, transaction);
}
await transaction.commit();
} catch (error) {
await transaction.rollback();
throw error;
}
}
}
Query Optimization¶
Materialized Views¶
Pre-compute complex queries.
-- Materialized view for championship dashboard
CREATE MATERIALIZED VIEW championship_dashboard AS
SELECT
c.id,
c.name,
c.status,
COUNT(DISTINCT t.id) as total_teams,
COUNT(DISTINCT m.id) as total_matches,
COUNT(DISTINCT CASE WHEN m.status = 'completed' THEN m.id END) as completed_matches,
AVG(m.home_score + m.away_score) as avg_goals_per_match,
MAX(s.points) as leader_points,
(SELECT name FROM teams_view WHERE id =
(SELECT team_id FROM standings_view WHERE championship_id = c.id ORDER BY position LIMIT 1)
) as current_leader
FROM championships c
LEFT JOIN teams_view t ON t.championship_id = c.id
LEFT JOIN matches_view m ON m.championship_id = c.id
LEFT JOIN standings_view s ON s.championship_id = c.id
GROUP BY c.id;
-- Refresh strategy
CREATE OR REPLACE FUNCTION refresh_championship_dashboard()
RETURNS trigger AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY championship_dashboard;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER refresh_dashboard_on_match_complete
AFTER UPDATE ON matches_view
FOR EACH ROW
WHEN (NEW.status = 'completed' AND OLD.status != 'completed')
EXECUTE FUNCTION refresh_championship_dashboard();
Read Model Caching¶
Cache frequently accessed read models.
class CachedQueryHandler {
constructor(
private cache: Redis,
private db: ReadModelDatabase
) {}
async getChampionshipLeaderboard(championshipId: string): Promise<Leaderboard> {
const cacheKey = `leaderboard:${championshipId}`;
// Try cache first
const cached = await this.cache.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
// Query database
const leaderboard = await this.db.query(`
SELECT
t.id,
t.name,
s.points,
s.wins,
s.draws,
s.losses,
s.goals_for,
s.goals_against,
s.goal_difference,
s.position
FROM standings_view s
JOIN teams_view t ON t.id = s.team_id
WHERE s.championship_id = $1
ORDER BY s.position
`, [championshipId]);
// Cache with TTL
await this.cache.setex(
cacheKey,
300, // 5 minutes
JSON.stringify(leaderboard)
);
return leaderboard;
}
// Invalidate cache on updates
@EventHandler(MatchCompletedEvent)
async invalidateLeaderboardCache(event: MatchCompletedEvent): Promise<void> {
await this.cache.del(`leaderboard:${event.championshipId}`);
}
}
Testing CQRS Systems¶
Command Testing¶
describe('CreateChampionshipCommand', () => {
let handler: CreateChampionshipHandler;
let eventBus: MockEventBus;
beforeEach(() => {
eventBus = new MockEventBus();
handler = new CreateChampionshipHandler(
new InMemoryRepository(),
eventBus
);
});
it('should create championship and publish event', async () => {
// Arrange
const command = new CreateChampionshipCommand(
'Premier League',
new Date('2025-01-01'),
new Date('2025-12-31'),
20,
defaultRules
);
// Act
await handler.execute(command);
// Assert
const events = eventBus.getPublishedEvents();
expect(events).toHaveLength(1);
expect(events[0]).toBeInstanceOf(ChampionshipCreatedEvent);
expect(events[0].name).toBe('Premier League');
});
it('should reject invalid championship', async () => {
// Arrange
const command = new CreateChampionshipCommand(
'Invalid',
new Date('2025-01-01'),
new Date('2024-12-31'), // End before start
1, // Too few teams
defaultRules
);
// Act & Assert
await expect(handler.execute(command))
.rejects.toThrow(BusinessRuleViolation);
});
});
Query Testing¶
describe('Championship Query Handler', () => {
it('should return denormalized championship view', async () => {
// Arrange
const handler = new GetChampionshipDetailsHandler(testReadDb);
await seedTestData();
// Act
const result = await handler.execute(
new GetChampionshipDetailsQuery('champ-1', true)
);
// Assert
expect(result).toMatchObject({
id: 'champ-1',
name: 'Test Championship',
totalTeams: 4,
totalMatches: 6,
completedMatches: 3,
stats: {
averageGoalsPerMatch: 2.5,
highestScoringTeam: 'Team A'
}
});
});
});
Projection Testing¶
describe('Championship Projection', () => {
it('should update standings when match completes', async () => {
// Arrange
const projection = new ChampionshipProjection(testReadDb);
// Act
await projection.handleMatchCompleted(new MatchCompletedEvent(
'match-1',
'champ-1',
'team-a',
'team-b',
2, // home score
1 // away score
));
// Assert
const standings = await testReadDb.query(
'SELECT * FROM standings_view WHERE championship_id = $1',
['champ-1']
);
const teamAStanding = standings.find(s => s.team_id === 'team-a');
expect(teamAStanding.wins).toBe(1);
expect(teamAStanding.points).toBe(3);
});
});
Benefits and Tradeoffs¶
Benefits¶
- Performance: Optimized read and write models
- Scalability: Scale reads and writes independently
- Flexibility: Different storage for different models
- Simplicity: Simpler queries on denormalized data
Tradeoffs¶
- Complexity: More moving parts
- Eventual Consistency: Read models may lag
- Storage: Duplicate data in multiple models
- Maintenance: Keep projections in sync
When to Use CQRS¶
- Complex domains with different read/write patterns
- High-performance requirements
- Need for multiple read models
- Event sourcing implementations
- Collaborative domains with many users
References¶
Last updated: 2025-09-09