← Back to Home

Production-Ready Event-Driven Architecture Part 4 - Saga Pattern for Distributed Transactions

시리즈 소개

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현
  3. Part 3: CQRS와 Read/Write 모델 분리
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리 (현재 글)
  5. Part 5: Event Schema 진화와 버전 관리

분산 트랜잭션의 문제

마이크로서비스 환경에서는 하나의 비즈니스 트랜잭션이 여러 서비스에 걸쳐 있습니다.

예: 주문 처리 프로세스

주문 생성 → 재고 확인 → 결제 처리 → 배송 예약

각 단계가 다른 서비스에서 처리되며, 전통적인 ACID 트랜잭션을 사용할 수 없습니다.

Saga Pattern 소개

Saga는 일련의 로컬 트랜잭션으로 구성됩니다. 각 로컬 트랜잭션은 다음 트랜잭션을 트리거하고, 실패 시 이전 트랜잭션을 취소하는 보상 트랜잭션을 실행합니다.

두 가지 구현 방식

  1. Choreography: 각 서비스가 이벤트를 발행하고 구독
  2. Orchestration: 중앙 조정자가 Saga를 관리

Choreography 방식

아키텍처

┌─────────┐     ┌─────────┐     ┌─────────┐     ┌─────────┐
│  Order  │────▶│Inventory│────▶│ Payment │────▶│Shipping │
│ Service │     │ Service │     │ Service │     │ Service │
└────┬────┘     └────┬────┘     └────┬────┘     └────┬────┘
     │               │               │               │
     │  OrderCreated │ StockReserved │PaymentCompleted│
     └───────────────┴───────────────┴───────────────┘
                    Event Bus (Kafka)

Order Service

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val eventPublisher: OrderEventPublisher
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        val order = Order.create(
            customerId = command.customerId,
            items = command.items
        )
 
        orderRepository.save(order)
 
        eventPublisher.publish(
            OrderCreated(
                orderId = order.id,
                customerId = order.customerId,
                items = order.items.map { OrderItemDto(it) },
                totalAmount = order.totalAmount
            )
        )
 
        return order
    }
 
    @TransactionalEventListener
    fun onPaymentCompleted(event: PaymentCompleted) {
        val order = orderRepository.findById(event.orderId)
            ?: throw OrderNotFoundException(event.orderId)
 
        order.markAsPaid()
        orderRepository.save(order)
    }
 
    @TransactionalEventListener
    fun onStockReservationFailed(event: StockReservationFailed) {
        val order = orderRepository.findById(event.orderId)
            ?: throw OrderNotFoundException(event.orderId)
 
        order.cancel("Stock reservation failed: ${event.reason}")
        orderRepository.save(order)
 
        eventPublisher.publish(OrderCancelled(order.id, event.reason))
    }
}

Inventory Service

@Service
class InventoryService(
    private val inventoryRepository: InventoryRepository,
    private val eventPublisher: InventoryEventPublisher
) {
    @KafkaListener(topics = ["order-events"])
    @Transactional
    fun onOrderCreated(event: OrderCreated) {
        try {
            val reservations = event.items.map { item ->
                val inventory = inventoryRepository.findByProductId(item.productId)
                    ?: throw ProductNotFoundException(item.productId)
 
                if (inventory.availableQuantity < item.quantity) {
                    throw InsufficientStockException(item.productId)
                }
 
                inventory.reserve(item.quantity)
                inventoryRepository.save(inventory)
 
                StockReservation(
                    productId = item.productId,
                    quantity = item.quantity,
                    reservationId = UUID.randomUUID().toString()
                )
            }
 
            eventPublisher.publish(
                StockReserved(
                    orderId = event.orderId,
                    reservations = reservations
                )
            )
        } catch (e: Exception) {
            eventPublisher.publish(
                StockReservationFailed(
                    orderId = event.orderId,
                    reason = e.message ?: "Unknown error"
                )
            )
        }
    }
 
    @KafkaListener(topics = ["payment-events"])
    @Transactional
    fun onPaymentFailed(event: PaymentFailed) {
        // 보상 트랜잭션: 예약된 재고 해제
        val reservations = stockReservationRepository.findByOrderId(event.orderId)
 
        reservations.forEach { reservation ->
            val inventory = inventoryRepository.findByProductId(reservation.productId)!!
            inventory.releaseReservation(reservation.quantity)
            inventoryRepository.save(inventory)
        }
 
        stockReservationRepository.deleteAll(reservations)
 
        eventPublisher.publish(
            StockReleased(orderId = event.orderId)
        )
    }
}

Payment Service

@Service
class PaymentService(
    private val paymentRepository: PaymentRepository,
    private val paymentGateway: PaymentGateway,
    private val eventPublisher: PaymentEventPublisher
) {
    @KafkaListener(topics = ["inventory-events"])
    @Transactional
    fun onStockReserved(event: StockReserved) {
        try {
            val order = orderClient.getOrder(event.orderId)
 
            val paymentResult = paymentGateway.charge(
                customerId = order.customerId,
                amount = order.totalAmount
            )
 
            val payment = Payment(
                orderId = event.orderId,
                amount = order.totalAmount,
                transactionId = paymentResult.transactionId,
                status = PaymentStatus.COMPLETED
            )
            paymentRepository.save(payment)
 
            eventPublisher.publish(
                PaymentCompleted(
                    orderId = event.orderId,
                    transactionId = paymentResult.transactionId
                )
            )
        } catch (e: PaymentException) {
            eventPublisher.publish(
                PaymentFailed(
                    orderId = event.orderId,
                    reason = e.message ?: "Payment failed"
                )
            )
        }
    }
}

Choreography의 장단점

| 장점 | 단점 | |------|------| | 느슨한 결합 | 전체 흐름 파악 어려움 | | 단순한 구현 | 순환 의존성 위험 | | 서비스 자율성 | 디버깅 어려움 | | 확장성 | 테스트 복잡 |

Orchestration 방식

아키텍처

                    ┌─────────────────┐
                    │     Saga        │
                    │  Orchestrator   │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
         ▼                   ▼                   ▼
    ┌─────────┐        ┌─────────┐        ┌─────────┐
    │  Order  │        │Inventory│        │ Payment │
    │ Service │        │ Service │        │ Service │
    └─────────┘        └─────────┘        └─────────┘

Saga State Machine

enum class OrderSagaState {
    STARTED,
    STOCK_RESERVING,
    STOCK_RESERVED,
    STOCK_RESERVATION_FAILED,
    PAYMENT_PROCESSING,
    PAYMENT_COMPLETED,
    PAYMENT_FAILED,
    SHIPPING_SCHEDULED,
    COMPLETED,
    COMPENSATING,
    COMPENSATED,
    FAILED
}
 
enum class OrderSagaEvent {
    START,
    STOCK_RESERVE_SUCCESS,
    STOCK_RESERVE_FAIL,
    PAYMENT_SUCCESS,
    PAYMENT_FAIL,
    SHIPPING_SUCCESS,
    SHIPPING_FAIL,
    COMPENSATE_COMPLETE
}
 
@Entity
@Table(name = "order_saga")
class OrderSaga(
    @Id
    val sagaId: String = UUID.randomUUID().toString(),
 
    @Column(nullable = false)
    val orderId: String,
 
    @Enumerated(EnumType.STRING)
    var state: OrderSagaState = OrderSagaState.STARTED,
 
    @Column(columnDefinition = "TEXT")
    var context: String = "{}",  // JSON 형태로 컨텍스트 저장
 
    val createdAt: Instant = Instant.now(),
    var updatedAt: Instant = Instant.now()
)

Saga Orchestrator

@Service
class OrderSagaOrchestrator(
    private val sagaRepository: OrderSagaRepository,
    private val inventoryClient: InventoryClient,
    private val paymentClient: PaymentClient,
    private val shippingClient: ShippingClient,
    private val orderClient: OrderClient,
    private val objectMapper: ObjectMapper
) {
    private val logger = LoggerFactory.getLogger(javaClass)
 
    @Transactional
    fun startSaga(orderId: String): OrderSaga {
        val saga = OrderSaga(orderId = orderId)
        sagaRepository.save(saga)
 
        processNextStep(saga)
        return saga
    }
 
    @Transactional
    fun handleEvent(sagaId: String, event: OrderSagaEvent, payload: Any? = null) {
        val saga = sagaRepository.findById(sagaId)
            ?: throw SagaNotFoundException(sagaId)
 
        val newState = transition(saga.state, event)
        saga.state = newState
        saga.updatedAt = Instant.now()
 
        if (payload != null) {
            val context = objectMapper.readTree(saga.context).deepCopy() as ObjectNode
            context.set<ObjectNode>(event.name, objectMapper.valueToTree(payload))
            saga.context = objectMapper.writeValueAsString(context)
        }
 
        sagaRepository.save(saga)
 
        processNextStep(saga)
    }
 
    private fun transition(currentState: OrderSagaState, event: OrderSagaEvent): OrderSagaState {
        return when (currentState to event) {
            OrderSagaState.STARTED to OrderSagaEvent.START ->
                OrderSagaState.STOCK_RESERVING
 
            OrderSagaState.STOCK_RESERVING to OrderSagaEvent.STOCK_RESERVE_SUCCESS ->
                OrderSagaState.STOCK_RESERVED
 
            OrderSagaState.STOCK_RESERVING to OrderSagaEvent.STOCK_RESERVE_FAIL ->
                OrderSagaState.STOCK_RESERVATION_FAILED
 
            OrderSagaState.STOCK_RESERVED to OrderSagaEvent.START ->
                OrderSagaState.PAYMENT_PROCESSING
 
            OrderSagaState.PAYMENT_PROCESSING to OrderSagaEvent.PAYMENT_SUCCESS ->
                OrderSagaState.PAYMENT_COMPLETED
 
            OrderSagaState.PAYMENT_PROCESSING to OrderSagaEvent.PAYMENT_FAIL ->
                OrderSagaState.COMPENSATING
 
            OrderSagaState.PAYMENT_COMPLETED to OrderSagaEvent.SHIPPING_SUCCESS ->
                OrderSagaState.COMPLETED
 
            OrderSagaState.PAYMENT_COMPLETED to OrderSagaEvent.SHIPPING_FAIL ->
                OrderSagaState.COMPENSATING
 
            OrderSagaState.COMPENSATING to OrderSagaEvent.COMPENSATE_COMPLETE ->
                OrderSagaState.COMPENSATED
 
            else -> throw InvalidStateTransitionException(currentState, event)
        }
    }
 
    private fun processNextStep(saga: OrderSaga) {
        when (saga.state) {
            OrderSagaState.STARTED -> {
                handleEvent(saga.sagaId, OrderSagaEvent.START)
            }
 
            OrderSagaState.STOCK_RESERVING -> {
                reserveStock(saga)
            }
 
            OrderSagaState.STOCK_RESERVED -> {
                handleEvent(saga.sagaId, OrderSagaEvent.START)
            }
 
            OrderSagaState.PAYMENT_PROCESSING -> {
                processPayment(saga)
            }
 
            OrderSagaState.PAYMENT_COMPLETED -> {
                scheduleShipping(saga)
            }
 
            OrderSagaState.COMPENSATING -> {
                compensate(saga)
            }
 
            OrderSagaState.COMPLETED -> {
                logger.info("Saga completed successfully: ${saga.sagaId}")
            }
 
            OrderSagaState.COMPENSATED -> {
                logger.info("Saga compensated: ${saga.sagaId}")
                markOrderAsFailed(saga)
            }
 
            else -> {
                logger.warn("No action for state: ${saga.state}")
            }
        }
    }
 
    @Async
    fun reserveStock(saga: OrderSaga) {
        try {
            val order = orderClient.getOrder(saga.orderId)
            val result = inventoryClient.reserveStock(
                ReserveStockRequest(
                    orderId = saga.orderId,
                    items = order.items
                )
            )
            handleEvent(saga.sagaId, OrderSagaEvent.STOCK_RESERVE_SUCCESS, result)
        } catch (e: Exception) {
            logger.error("Stock reservation failed for saga: ${saga.sagaId}", e)
            handleEvent(saga.sagaId, OrderSagaEvent.STOCK_RESERVE_FAIL, e.message)
        }
    }
 
    @Async
    fun processPayment(saga: OrderSaga) {
        try {
            val order = orderClient.getOrder(saga.orderId)
            val result = paymentClient.processPayment(
                ProcessPaymentRequest(
                    orderId = saga.orderId,
                    customerId = order.customerId,
                    amount = order.totalAmount
                )
            )
            handleEvent(saga.sagaId, OrderSagaEvent.PAYMENT_SUCCESS, result)
        } catch (e: Exception) {
            logger.error("Payment failed for saga: ${saga.sagaId}", e)
            handleEvent(saga.sagaId, OrderSagaEvent.PAYMENT_FAIL, e.message)
        }
    }
 
    @Async
    fun scheduleShipping(saga: OrderSaga) {
        try {
            val result = shippingClient.scheduleShipping(
                ScheduleShippingRequest(orderId = saga.orderId)
            )
            handleEvent(saga.sagaId, OrderSagaEvent.SHIPPING_SUCCESS, result)
        } catch (e: Exception) {
            logger.error("Shipping scheduling failed for saga: ${saga.sagaId}", e)
            handleEvent(saga.sagaId, OrderSagaEvent.SHIPPING_FAIL, e.message)
        }
    }
 
    @Async
    fun compensate(saga: OrderSaga) {
        logger.info("Starting compensation for saga: ${saga.sagaId}")
 
        val context = objectMapper.readTree(saga.context)
 
        // 역순으로 보상 트랜잭션 실행
        try {
            // Payment 보상 (있는 경우)
            if (context.has("PAYMENT_SUCCESS")) {
                val paymentInfo = context.get("PAYMENT_SUCCESS")
                paymentClient.refund(
                    RefundRequest(
                        orderId = saga.orderId,
                        transactionId = paymentInfo.get("transactionId").asText()
                    )
                )
            }
 
            // Stock 보상 (있는 경우)
            if (context.has("STOCK_RESERVE_SUCCESS")) {
                inventoryClient.releaseStock(
                    ReleaseStockRequest(orderId = saga.orderId)
                )
            }
 
            handleEvent(saga.sagaId, OrderSagaEvent.COMPENSATE_COMPLETE)
        } catch (e: Exception) {
            logger.error("Compensation failed for saga: ${saga.sagaId}", e)
            // 보상 실패는 수동 개입 필요
            markForManualIntervention(saga, e)
        }
    }
 
    private fun markOrderAsFailed(saga: OrderSaga) {
        orderClient.updateOrderStatus(saga.orderId, "FAILED")
    }
 
    private fun markForManualIntervention(saga: OrderSaga, error: Exception) {
        saga.state = OrderSagaState.FAILED
        sagaRepository.save(saga)
        // Alert, 알림 등 수동 개입 요청
    }
}

Saga Step Definition (더 우아한 방법)

@Configuration
class OrderSagaDefinition {
 
    @Bean
    fun orderSaga(): SagaDefinition<OrderSagaData> {
        return SagaDefinition.builder<OrderSagaData>()
            .step("reserve-stock")
                .invokeParticipant { data -> inventoryClient.reserveStock(data.orderId, data.items) }
                .withCompensation { data -> inventoryClient.releaseStock(data.orderId) }
            .step("process-payment")
                .invokeParticipant { data -> paymentClient.charge(data.orderId, data.amount) }
                .withCompensation { data -> paymentClient.refund(data.orderId) }
            .step("schedule-shipping")
                .invokeParticipant { data -> shippingClient.schedule(data.orderId) }
                .withCompensation { data -> shippingClient.cancel(data.orderId) }
            .build()
    }
}
 
data class OrderSagaData(
    val orderId: String,
    val customerId: String,
    val items: List<OrderItem>,
    val amount: BigDecimal
)

Orchestration의 장단점

| 장점 | 단점 | |------|------| | 전체 흐름 파악 용이 | 중앙 집중화 | | 쉬운 디버깅 | 단일 장애점 | | 명확한 책임 | Orchestrator 복잡도 | | 테스트 용이 | 결합도 증가 |

보상 트랜잭션 전략

Semantic Lock

@Entity
class StockReservation(
    @Id
    val id: String = UUID.randomUUID().toString(),
    val productId: String,
    val orderId: String,
    val quantity: Int,
    val status: ReservationStatus = ReservationStatus.RESERVED,
    val expiresAt: Instant = Instant.now().plus(Duration.ofMinutes(15))
)
 
enum class ReservationStatus {
    RESERVED, CONFIRMED, RELEASED, EXPIRED
}
 
@Scheduled(fixedDelay = 60000)
fun releaseExpiredReservations() {
    val expired = reservationRepository.findExpiredReservations(Instant.now())
    expired.forEach { reservation ->
        releaseReservation(reservation)
    }
}

Compensating Transaction Log

@Entity
@Table(name = "compensation_log")
class CompensationLog(
    @Id
    val id: String = UUID.randomUUID().toString(),
    val sagaId: String,
    val stepName: String,
    val compensationData: String,
    var status: CompensationStatus = CompensationStatus.PENDING,
    val createdAt: Instant = Instant.now(),
    var executedAt: Instant? = null
)
 
@Service
class CompensationExecutor(
    private val compensationLogRepository: CompensationLogRepository
) {
    @Scheduled(fixedDelay = 10000)
    @Transactional
    fun executePendingCompensations() {
        val pending = compensationLogRepository.findByStatus(CompensationStatus.PENDING)
 
        pending.forEach { log ->
            try {
                executeCompensation(log)
                log.status = CompensationStatus.COMPLETED
                log.executedAt = Instant.now()
            } catch (e: Exception) {
                log.status = CompensationStatus.FAILED
                // 재시도 로직 또는 알림
            }
            compensationLogRepository.save(log)
        }
    }
}

모니터링과 가시성

Saga 상태 조회 API

@RestController
@RequestMapping("/api/sagas")
class SagaController(
    private val sagaRepository: OrderSagaRepository
) {
    @GetMapping("/{sagaId}")
    fun getSaga(@PathVariable sagaId: String): ResponseEntity<SagaStatusResponse> {
        val saga = sagaRepository.findById(sagaId)
            ?: throw SagaNotFoundException(sagaId)
 
        return ResponseEntity.ok(
            SagaStatusResponse(
                sagaId = saga.sagaId,
                orderId = saga.orderId,
                state = saga.state.name,
                context = saga.context,
                createdAt = saga.createdAt,
                updatedAt = saga.updatedAt
            )
        )
    }
 
    @GetMapping
    fun listSagas(
        @RequestParam(required = false) state: OrderSagaState?,
        pageable: Pageable
    ): ResponseEntity<Page<SagaStatusResponse>> {
        val sagas = if (state != null) {
            sagaRepository.findByState(state, pageable)
        } else {
            sagaRepository.findAll(pageable)
        }
 
        return ResponseEntity.ok(sagas.map { SagaStatusResponse(it) })
    }
}

정리

Saga Pattern 선택 가이드:

| 상황 | 권장 방식 | |------|----------| | 단순한 플로우 (2-3 서비스) | Choreography | | 복잡한 플로우 | Orchestration | | 서비스 자율성 중요 | Choreography | | 가시성/디버깅 중요 | Orchestration | | 빈번한 플로우 변경 | Orchestration |

다음 글에서는 Event Schema의 진화와 버전 관리를 다루겠습니다.