← Back to Home

Production-Ready Event-Driven Architecture Part 2 - Implementing the Outbox Pattern

시리즈 소개

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현 (현재 글)
  3. Part 3: CQRS와 Read/Write 모델 분리
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리
  5. Part 5: Event Schema 진화와 버전 관리

문제: 이중 쓰기(Dual Write) 문제

마이크로서비스에서 자주 발생하는 문제를 살펴보겠습니다:

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        // 1. 데이터베이스에 저장
        val order = orderRepository.save(Order.create(command))
 
        // 2. Kafka에 이벤트 발행
        kafkaTemplate.send("order-events", OrderCreatedEvent(order))
 
        return order
    }
}

이 코드의 문제점:

  • DB 저장 성공 후 Kafka 전송 실패 → 데이터 불일치
  • Kafka 전송 성공 후 트랜잭션 롤백 → 이벤트는 발행되었으나 데이터 없음
  • 두 시스템에 대한 원자적 쓰기가 불가능

해결책: Transactional Outbox Pattern

Outbox Pattern은 이벤트를 같은 트랜잭션 내에서 데이터베이스의 Outbox 테이블에 저장하고, 별도의 프로세스가 이를 메시지 브로커로 전달합니다.

아키텍처

┌─────────────────────────────────────────────────────┐
│                   Application                        │
│  ┌─────────────┐    ┌─────────────────────────────┐ │
│  │   Order     │    │        Outbox Table         │ │
│  │   Table     │    │  (Same Transaction)         │ │
│  └─────────────┘    └─────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │  Outbox Relay   │
                    │  (Polling or    │
                    │   CDC)          │
                    └─────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │     Kafka       │
                    └─────────────────┘

구현 방법 1: Polling Publisher

Outbox 테이블 설계

@Entity
@Table(name = "outbox_events")
class OutboxEvent(
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long? = null,
 
    @Column(nullable = false)
    val aggregateType: String,
 
    @Column(nullable = false)
    val aggregateId: String,
 
    @Column(nullable = false)
    val eventType: String,
 
    @Column(columnDefinition = "TEXT", nullable = false)
    val payload: String,
 
    @Column(nullable = false)
    val createdAt: Instant = Instant.now(),
 
    @Enumerated(EnumType.STRING)
    var status: OutboxStatus = OutboxStatus.PENDING,
 
    var processedAt: Instant? = null
)
 
enum class OutboxStatus {
    PENDING, PROCESSED, FAILED
}

Service Layer 수정

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val outboxRepository: OutboxRepository,
    private val objectMapper: ObjectMapper
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        // 1. Order 저장
        val order = orderRepository.save(Order.create(command))
 
        // 2. 같은 트랜잭션 내에서 Outbox에 이벤트 저장
        val event = OrderCreatedEvent(
            orderId = order.id,
            customerId = order.customerId,
            items = order.items,
            totalAmount = order.totalAmount
        )
 
        outboxRepository.save(
            OutboxEvent(
                aggregateType = "Order",
                aggregateId = order.id,
                eventType = "OrderCreated",
                payload = objectMapper.writeValueAsString(event)
            )
        )
 
        return order
    }
}

Polling Publisher

@Component
class OutboxPollingPublisher(
    private val outboxRepository: OutboxRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    private val logger = LoggerFactory.getLogger(javaClass)
 
    @Scheduled(fixedDelay = 1000) // 1초마다 실행
    @Transactional
    fun publishPendingEvents() {
        val pendingEvents = outboxRepository.findByStatusOrderByCreatedAtAsc(
            OutboxStatus.PENDING,
            PageRequest.of(0, 100)
        )
 
        pendingEvents.forEach { event ->
            try {
                val topic = "${event.aggregateType.lowercase()}-events"
 
                kafkaTemplate.send(topic, event.aggregateId, event.payload)
                    .get() // 동기 전송으로 확실히 전송
 
                event.status = OutboxStatus.PROCESSED
                event.processedAt = Instant.now()
                outboxRepository.save(event)
 
                logger.info("Published event: ${event.eventType} for ${event.aggregateId}")
            } catch (e: Exception) {
                logger.error("Failed to publish event: ${event.id}", e)
                event.status = OutboxStatus.FAILED
                outboxRepository.save(event)
            }
        }
    }
}

Polling 방식의 한계

  • 지연 시간: 폴링 간격만큼의 지연 발생
  • 부하: 지속적인 DB 폴링으로 인한 부하
  • 중복: 장애 상황에서 중복 전송 가능

구현 방법 2: Debezium CDC (Change Data Capture)

Debezium은 데이터베이스의 변경 사항을 실시간으로 캡처하여 Kafka로 스트리밍합니다.

Docker Compose 설정

version: '3.8'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: orderdb
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"  # CDC를 위해 필수
    ports:
      - "5432:5432"
 
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
 
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
 
  connect:
    image: debezium/connect:2.4
    depends_on:
      - kafka
      - postgres
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

Debezium Connector 설정

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "orderdb",
    "database.server.name": "orderdb",
    "table.include.list": "public.outbox_events",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.topic.replacement": "${routedByValue}-events",
    "transforms.outbox.table.fields.additional.placement": "aggregate_type:header:aggregateType"
  }
}

Connector 등록

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @outbox-connector.json

Debezium Outbox Event Router

Debezium의 Outbox Event Router는 outbox 테이블의 레코드를 자동으로 적절한 토픽으로 라우팅합니다.

// Outbox 테이블 구조 (Debezium용)
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
    @Id
    val id: UUID = UUID.randomUUID(),
 
    @Column(name = "aggregate_type", nullable = false)
    val aggregateType: String,
 
    @Column(name = "aggregate_id", nullable = false)
    val aggregateId: String,
 
    @Column(name = "event_type", nullable = false)
    val eventType: String,
 
    @Column(columnDefinition = "JSONB", nullable = false)
    val payload: String
)

Service Layer (CDC 방식)

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val outboxRepository: OutboxRepository,
    private val objectMapper: ObjectMapper
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        val order = orderRepository.save(Order.create(command))
 
        // Outbox에 저장하면 Debezium이 자동으로 Kafka로 전송
        outboxRepository.save(
            OutboxEvent(
                aggregateType = "Order",
                aggregateId = order.id,
                eventType = "OrderCreated",
                payload = objectMapper.writeValueAsString(
                    OrderCreatedEvent(order)
                )
            )
        )
 
        return order
    }
}

Exactly-Once Semantics 보장

Consumer 측 Idempotency

@Component
class OrderEventConsumer(
    private val processedEventRepository: ProcessedEventRepository
) {
    @KafkaListener(topics = ["order-events"])
    @Transactional
    fun handleOrderEvent(
        @Payload payload: String,
        @Header(KafkaHeaders.RECEIVED_KEY) key: String,
        @Header("id") eventId: String
    ) {
        // 이미 처리된 이벤트인지 확인
        if (processedEventRepository.existsById(eventId)) {
            logger.info("Event already processed: $eventId")
            return
        }
 
        // 이벤트 처리
        processEvent(payload)
 
        // 처리 완료 기록
        processedEventRepository.save(ProcessedEvent(eventId, Instant.now()))
    }
}
 
@Entity
@Table(name = "processed_events")
class ProcessedEvent(
    @Id
    val eventId: String,
    val processedAt: Instant
)

Outbox 테이블 정리

Outbox 테이블이 무한히 커지는 것을 방지하기 위한 정리 작업:

@Component
class OutboxCleaner(
    private val outboxRepository: OutboxRepository
) {
    @Scheduled(cron = "0 0 2 * * *") // 매일 새벽 2시
    @Transactional
    fun cleanOldEvents() {
        val cutoffTime = Instant.now().minus(Duration.ofDays(7))
        val deletedCount = outboxRepository.deleteByStatusAndCreatedAtBefore(
            OutboxStatus.PROCESSED,
            cutoffTime
        )
        logger.info("Deleted $deletedCount old outbox events")
    }
}

성능 최적화

Batch Processing

@Component
class BatchOutboxPublisher(
    private val outboxRepository: OutboxRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    @Scheduled(fixedDelay = 500)
    @Transactional
    fun publishBatch() {
        val events = outboxRepository.findPendingEvents(limit = 1000)
 
        if (events.isEmpty()) return
 
        val futures = events.map { event ->
            val topic = "${event.aggregateType.lowercase()}-events"
            kafkaTemplate.send(topic, event.aggregateId, event.payload)
        }
 
        // 모든 전송 완료 대기
        futures.forEach { it.get() }
 
        // Batch update
        outboxRepository.markAsProcessed(events.map { it.id!! })
    }
}

정리

Outbox Pattern은 다음과 같은 이점을 제공합니다:

| 특성 | Polling | CDC (Debezium) | |------|---------|----------------| | 지연 시간 | 폴링 간격 | 거의 실시간 | | 구현 복잡도 | 낮음 | 중간 | | 인프라 요구사항 | 낮음 | Debezium 필요 | | DB 부하 | 중간 | 낮음 | | 확장성 | 제한적 | 높음 |

다음 글에서는 CQRS 패턴을 통해 읽기와 쓰기 모델을 분리하는 방법을 다루겠습니다.