← Back to Home

Production-Ready Event-Driven Architecture Part 3 - CQRS with Separate Read/Write Models

시리즈 소개

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현
  3. Part 3: CQRS와 Read/Write 모델 분리 (현재 글)
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리
  5. Part 5: Event Schema 진화와 버전 관리

CQRS란?

CQRS(Command Query Responsibility Segregation)는 읽기(Query)와 쓰기(Command)를 분리하는 패턴입니다.

왜 CQRS인가?

전통적인 CRUD 모델의 한계:

  • 읽기와 쓰기의 요구사항이 다름
  • 복잡한 쿼리를 위해 여러 테이블 조인 필요
  • 읽기 최적화와 쓰기 최적화가 충돌
  • 스케일링이 어려움

CQRS의 장점:

  • 읽기/쓰기 각각에 최적화된 모델 사용 가능
  • 독립적인 스케일링
  • 복잡한 도메인 로직과 쿼리 로직 분리
  • Event Sourcing과의 자연스러운 결합

아키텍처 개요

                    ┌─────────────────┐
                    │     Client      │
                    └────────┬────────┘
                             │
           ┌─────────────────┴─────────────────┐
           │                                   │
           ▼                                   ▼
    ┌─────────────┐                    ┌─────────────┐
    │  Command    │                    │   Query     │
    │  Service    │                    │   Service   │
    └──────┬──────┘                    └──────┬──────┘
           │                                   │
           ▼                                   ▼
    ┌─────────────┐                    ┌─────────────┐
    │   Write     │                    │    Read     │
    │   Model     │─────Events────────▶│    Model    │
    │  (PostgreSQL)│                   │(Elasticsearch)│
    └─────────────┘                    └─────────────┘

구현

Command Side

Command 정의

// Commands
sealed interface OrderCommand {
    val orderId: String
}
 
data class CreateOrderCommand(
    override val orderId: String = UUID.randomUUID().toString(),
    val customerId: String,
    val items: List<OrderItemDto>
) : OrderCommand
 
data class ShipOrderCommand(
    override val orderId: String,
    val shippingAddress: String,
    val trackingNumber: String
) : OrderCommand
 
data class CancelOrderCommand(
    override val orderId: String,
    val reason: String
) : OrderCommand

Command Handler

@Service
class OrderCommandHandler(
    private val orderRepository: OrderRepository,
    private val eventPublisher: ApplicationEventPublisher
) {
    @Transactional
    fun handle(command: CreateOrderCommand): String {
        val order = Order.create(
            orderId = command.orderId,
            customerId = command.customerId,
            items = command.items.map { it.toOrderItem() }
        )
 
        orderRepository.save(order)
 
        // Publish domain events
        order.getPendingEvents().forEach { event ->
            eventPublisher.publishEvent(event)
        }
        order.clearPendingEvents()
 
        return order.id
    }
 
    @Transactional
    fun handle(command: ShipOrderCommand) {
        val order = orderRepository.findById(command.orderId)
            ?: throw OrderNotFoundException(command.orderId)
 
        order.ship(command.shippingAddress, command.trackingNumber)
        orderRepository.save(order)
 
        order.getPendingEvents().forEach { event ->
            eventPublisher.publishEvent(event)
        }
        order.clearPendingEvents()
    }
 
    @Transactional
    fun handle(command: CancelOrderCommand) {
        val order = orderRepository.findById(command.orderId)
            ?: throw OrderNotFoundException(command.orderId)
 
        order.cancel(command.reason)
        orderRepository.save(order)
 
        order.getPendingEvents().forEach { event ->
            eventPublisher.publishEvent(event)
        }
        order.clearPendingEvents()
    }
}

Command Controller

@RestController
@RequestMapping("/api/orders")
class OrderCommandController(
    private val commandHandler: OrderCommandHandler
) {
    @PostMapping
    fun createOrder(@RequestBody request: CreateOrderRequest): ResponseEntity<OrderIdResponse> {
        val orderId = commandHandler.handle(
            CreateOrderCommand(
                customerId = request.customerId,
                items = request.items
            )
        )
        return ResponseEntity
            .created(URI.create("/api/orders/$orderId"))
            .body(OrderIdResponse(orderId))
    }
 
    @PostMapping("/{orderId}/ship")
    fun shipOrder(
        @PathVariable orderId: String,
        @RequestBody request: ShipOrderRequest
    ): ResponseEntity<Unit> {
        commandHandler.handle(
            ShipOrderCommand(
                orderId = orderId,
                shippingAddress = request.shippingAddress,
                trackingNumber = request.trackingNumber
            )
        )
        return ResponseEntity.ok().build()
    }
 
    @PostMapping("/{orderId}/cancel")
    fun cancelOrder(
        @PathVariable orderId: String,
        @RequestBody request: CancelOrderRequest
    ): ResponseEntity<Unit> {
        commandHandler.handle(
            CancelOrderCommand(
                orderId = orderId,
                reason = request.reason
            )
        )
        return ResponseEntity.ok().build()
    }
}

Query Side

Read Model

@Document(indexName = "orders")
data class OrderReadModel(
    @Id
    val orderId: String,
    val customerId: String,
    val customerName: String,
    val status: String,
    val items: List<OrderItemReadModel>,
    val totalAmount: BigDecimal,
    val itemCount: Int,
    val shippingAddress: String?,
    val trackingNumber: String?,
    val createdAt: Instant,
    val updatedAt: Instant
)
 
data class OrderItemReadModel(
    val productId: String,
    val productName: String,
    val quantity: Int,
    val price: BigDecimal
)

Event Handler (Projector)

@Component
class OrderProjector(
    private val orderReadModelRepository: OrderReadModelRepository,
    private val customerService: CustomerService,
    private val productService: ProductService
) {
    private val logger = LoggerFactory.getLogger(javaClass)
 
    @EventListener
    @Async
    fun on(event: OrderCreated) {
        logger.info("Projecting OrderCreated: ${event.aggregateId}")
 
        val customer = customerService.getCustomer(event.customerId)
        val products = productService.getProducts(event.items.map { it.productId })
 
        val readModel = OrderReadModel(
            orderId = event.aggregateId,
            customerId = event.customerId,
            customerName = customer.name,
            status = "CREATED",
            items = event.items.map { item ->
                val product = products.find { it.id == item.productId }!!
                OrderItemReadModel(
                    productId = item.productId,
                    productName = product.name,
                    quantity = item.quantity,
                    price = item.price
                )
            },
            totalAmount = event.totalAmount,
            itemCount = event.items.size,
            shippingAddress = null,
            trackingNumber = null,
            createdAt = event.occurredAt,
            updatedAt = event.occurredAt
        )
 
        orderReadModelRepository.save(readModel)
    }
 
    @EventListener
    @Async
    fun on(event: OrderShipped) {
        logger.info("Projecting OrderShipped: ${event.aggregateId}")
 
        orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
            val updated = order.copy(
                status = "SHIPPED",
                shippingAddress = event.shippingAddress,
                trackingNumber = event.trackingNumber,
                updatedAt = event.occurredAt
            )
            orderReadModelRepository.save(updated)
        }
    }
 
    @EventListener
    @Async
    fun on(event: OrderCancelled) {
        logger.info("Projecting OrderCancelled: ${event.aggregateId}")
 
        orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
            val updated = order.copy(
                status = "CANCELLED",
                updatedAt = event.occurredAt
            )
            orderReadModelRepository.save(updated)
        }
    }
}

Query Service

@Service
class OrderQueryService(
    private val orderReadModelRepository: OrderReadModelRepository
) {
    fun findById(orderId: String): OrderReadModel? {
        return orderReadModelRepository.findById(orderId).orElse(null)
    }
 
    fun findByCustomerId(customerId: String, pageable: Pageable): Page<OrderReadModel> {
        return orderReadModelRepository.findByCustomerId(customerId, pageable)
    }
 
    fun searchOrders(criteria: OrderSearchCriteria, pageable: Pageable): Page<OrderReadModel> {
        return orderReadModelRepository.search(criteria, pageable)
    }
 
    fun getOrderStatistics(customerId: String): OrderStatistics {
        val orders = orderReadModelRepository.findByCustomerId(customerId)
        return OrderStatistics(
            totalOrders = orders.size,
            totalAmount = orders.sumOf { it.totalAmount },
            averageOrderAmount = orders.map { it.totalAmount }
                .takeIf { it.isNotEmpty() }
                ?.let { amounts -> amounts.reduce { a, b -> a + b } / amounts.size.toBigDecimal() }
                ?: BigDecimal.ZERO,
            ordersByStatus = orders.groupBy { it.status }.mapValues { it.value.size }
        )
    }
}

Query Controller

@RestController
@RequestMapping("/api/orders")
class OrderQueryController(
    private val queryService: OrderQueryService
) {
    @GetMapping("/{orderId}")
    fun getOrder(@PathVariable orderId: String): ResponseEntity<OrderReadModel> {
        val order = queryService.findById(orderId)
            ?: throw OrderNotFoundException(orderId)
        return ResponseEntity.ok(order)
    }
 
    @GetMapping
    fun searchOrders(
        @RequestParam(required = false) customerId: String?,
        @RequestParam(required = false) status: String?,
        @RequestParam(required = false) fromDate: Instant?,
        @RequestParam(required = false) toDate: Instant?,
        pageable: Pageable
    ): ResponseEntity<Page<OrderReadModel>> {
        val criteria = OrderSearchCriteria(
            customerId = customerId,
            status = status,
            fromDate = fromDate,
            toDate = toDate
        )
        return ResponseEntity.ok(queryService.searchOrders(criteria, pageable))
    }
 
    @GetMapping("/customers/{customerId}/statistics")
    fun getCustomerStatistics(
        @PathVariable customerId: String
    ): ResponseEntity<OrderStatistics> {
        return ResponseEntity.ok(queryService.getOrderStatistics(customerId))
    }
}

Elasticsearch Repository

interface OrderReadModelRepository : ElasticsearchRepository<OrderReadModel, String> {
 
    fun findByCustomerId(customerId: String, pageable: Pageable): Page<OrderReadModel>
 
    fun findByCustomerId(customerId: String): List<OrderReadModel>
 
    fun findByStatus(status: String, pageable: Pageable): Page<OrderReadModel>
 
    @Query("""
        {
            "bool": {
                "must": [
                    {"match": {"customerId": "?0"}}
                ],
                "filter": [
                    {"range": {"createdAt": {"gte": "?1", "lte": "?2"}}}
                ]
            }
        }
    """)
    fun findByCustomerIdAndDateRange(
        customerId: String,
        fromDate: Instant,
        toDate: Instant,
        pageable: Pageable
    ): Page<OrderReadModel>
}
 
@Repository
class OrderReadModelRepositoryCustomImpl(
    private val elasticsearchOperations: ElasticsearchOperations
) : OrderReadModelRepositoryCustom {
 
    override fun search(criteria: OrderSearchCriteria, pageable: Pageable): Page<OrderReadModel> {
        val queryBuilder = BoolQuery.Builder()
 
        criteria.customerId?.let {
            queryBuilder.must(MatchQuery.of { q -> q.field("customerId").query(it) }._toQuery())
        }
 
        criteria.status?.let {
            queryBuilder.must(MatchQuery.of { q -> q.field("status").query(it) }._toQuery())
        }
 
        if (criteria.fromDate != null || criteria.toDate != null) {
            val rangeQuery = RangeQuery.of { r ->
                r.field("createdAt")
                criteria.fromDate?.let { r.gte(JsonData.of(it.toString())) }
                criteria.toDate?.let { r.lte(JsonData.of(it.toString())) }
                r
            }
            queryBuilder.filter(rangeQuery._toQuery())
        }
 
        val searchQuery = NativeQuery.builder()
            .withQuery(queryBuilder.build()._toQuery())
            .withPageable(pageable)
            .build()
 
        val hits = elasticsearchOperations.search(searchQuery, OrderReadModel::class.java)
 
        return PageImpl(
            hits.searchHits.map { it.content },
            pageable,
            hits.totalHits
        )
    }
}

Eventual Consistency 처리

CQRS에서 읽기 모델은 쓰기 모델과 최종적으로 일관성을 가집니다. 이를 처리하는 전략:

1. Optimistic UI Update

// Frontend: React example
const createOrder = async (orderData: CreateOrderRequest) => {
  // Optimistically add to local state
  const tempId = generateTempId();
  setOrders(prev => [...prev, { ...orderData, id: tempId, status: 'CREATING' }]);
 
  try {
    const response = await api.post('/orders', orderData);
    // Replace temp order with real one
    setOrders(prev => prev.map(o =>
      o.id === tempId ? { ...o, id: response.data.orderId, status: 'CREATED' } : o
    ));
  } catch (error) {
    // Remove temp order on failure
    setOrders(prev => prev.filter(o => o.id !== tempId));
    throw error;
  }
};

2. Polling for Consistency

@RestController
class OrderQueryController(
    private val queryService: OrderQueryService
) {
    @GetMapping("/{orderId}")
    fun getOrder(
        @PathVariable orderId: String,
        @RequestParam(required = false, defaultValue = "false") waitForConsistency: Boolean,
        @RequestParam(required = false, defaultValue = "5000") timeoutMs: Long
    ): ResponseEntity<OrderReadModel> {
        if (waitForConsistency) {
            return waitForOrder(orderId, timeoutMs)
        }
 
        val order = queryService.findById(orderId)
            ?: throw OrderNotFoundException(orderId)
        return ResponseEntity.ok(order)
    }
 
    private fun waitForOrder(orderId: String, timeoutMs: Long): ResponseEntity<OrderReadModel> {
        val startTime = System.currentTimeMillis()
 
        while (System.currentTimeMillis() - startTime < timeoutMs) {
            queryService.findById(orderId)?.let {
                return ResponseEntity.ok(it)
            }
            Thread.sleep(100)
        }
 
        throw OrderNotFoundException(orderId)
    }
}

3. Version-based Consistency Check

@Document(indexName = "orders")
data class OrderReadModel(
    @Id
    val orderId: String,
    val version: Long,  // Add version field
    // ... other fields
)
 
@Component
class OrderProjector {
    @EventListener
    @Async
    fun on(event: OrderCreated) {
        val readModel = OrderReadModel(
            orderId = event.aggregateId,
            version = 1,
            // ...
        )
        orderReadModelRepository.save(readModel)
    }
 
    @EventListener
    @Async
    fun on(event: OrderShipped) {
        orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
            val updated = order.copy(
                status = "SHIPPED",
                version = order.version + 1,
                // ...
            )
            orderReadModelRepository.save(updated)
        }
    }
}

Read Model Rebuild

이벤트 소싱과 결합된 CQRS의 장점 중 하나는 Read Model을 언제든지 재구축할 수 있다는 것입니다.

@Service
class ReadModelRebuilder(
    private val eventStore: EventStore,
    private val orderReadModelRepository: OrderReadModelRepository,
    private val orderProjector: OrderProjector
) {
    @Async
    fun rebuildOrderReadModels() {
        logger.info("Starting read model rebuild...")
 
        // Clear existing read models
        orderReadModelRepository.deleteAll()
 
        // Replay all events
        val aggregateIds = eventStore.getAllAggregateIds("Order")
 
        aggregateIds.forEach { aggregateId ->
            val events = eventStore.getEvents(aggregateId)
            events.forEach { event ->
                when (event) {
                    is OrderCreated -> orderProjector.on(event)
                    is OrderShipped -> orderProjector.on(event)
                    is OrderCancelled -> orderProjector.on(event)
                }
            }
        }
 
        logger.info("Read model rebuild completed for ${aggregateIds.size} aggregates")
    }
}

정리

CQRS 적용 시 고려사항:

| 장점 | 단점 | |------|------| | 읽기/쓰기 독립적 최적화 | 복잡성 증가 | | 독립적 스케일링 | 최종적 일관성 처리 필요 | | 유연한 쿼리 모델 | 인프라 비용 증가 | | Event Sourcing과 자연스러운 결합 | 러닝 커브 |

다음 글에서는 분산 트랜잭션을 처리하는 Saga Pattern을 다루겠습니다.