Production-Ready Event-Driven Architecture Part 2 - Implementing the Outbox Pattern
SERIES
event-driven-architecture
- 1.Production-Ready Event-Driven Architecture Part 5 - Event Schema Evolution and Versioning
- 2.Production-Ready Event-Driven Architecture Part 4 - Saga Pattern for Distributed Transactions
- 3.Production-Ready Event-Driven Architecture Part 3 - CQRS with Separate Read/Write Models
- 4.Production-Ready Event-Driven Architecture Part 2 - Implementing the Outbox PatternReading
- 5.Production-Ready Event-Driven Architecture Part 1 - Event Sourcing Fundamentals
시리즈 소개
이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.
- Part 1: Event Sourcing 기초
- Part 2: Outbox Pattern 구현 (현재 글)
- Part 3: CQRS와 Read/Write 모델 분리
- Part 4: Saga Pattern으로 분산 트랜잭션 처리
- Part 5: Event Schema 진화와 버전 관리
문제: 이중 쓰기(Dual Write) 문제
마이크로서비스에서 자주 발생하는 문제를 살펴보겠습니다:
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
// 1. 데이터베이스에 저장
val order = orderRepository.save(Order.create(command))
// 2. Kafka에 이벤트 발행
kafkaTemplate.send("order-events", OrderCreatedEvent(order))
return order
}
}이 코드의 문제점:
- DB 저장 성공 후 Kafka 전송 실패 → 데이터 불일치
- Kafka 전송 성공 후 트랜잭션 롤백 → 이벤트는 발행되었으나 데이터 없음
- 두 시스템에 대한 원자적 쓰기가 불가능
해결책: Transactional Outbox Pattern
Outbox Pattern은 이벤트를 같은 트랜잭션 내에서 데이터베이스의 Outbox 테이블에 저장하고, 별도의 프로세스가 이를 메시지 브로커로 전달합니다.
아키텍처
┌─────────────────────────────────────────────────────┐
│ Application │
│ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ Order │ │ Outbox Table │ │
│ │ Table │ │ (Same Transaction) │ │
│ └─────────────┘ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
│
▼
┌─────────────────┐
│ Outbox Relay │
│ (Polling or │
│ CDC) │
└─────────────────┘
│
▼
┌─────────────────┐
│ Kafka │
└─────────────────┘
구현 방법 1: Polling Publisher
Outbox 테이블 설계
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long? = null,
@Column(nullable = false)
val aggregateType: String,
@Column(nullable = false)
val aggregateId: String,
@Column(nullable = false)
val eventType: String,
@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,
@Column(nullable = false)
val createdAt: Instant = Instant.now(),
@Enumerated(EnumType.STRING)
var status: OutboxStatus = OutboxStatus.PENDING,
var processedAt: Instant? = null
)
enum class OutboxStatus {
PENDING, PROCESSED, FAILED
}Service Layer 수정
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val outboxRepository: OutboxRepository,
private val objectMapper: ObjectMapper
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
// 1. Order 저장
val order = orderRepository.save(Order.create(command))
// 2. 같은 트랜잭션 내에서 Outbox에 이벤트 저장
val event = OrderCreatedEvent(
orderId = order.id,
customerId = order.customerId,
items = order.items,
totalAmount = order.totalAmount
)
outboxRepository.save(
OutboxEvent(
aggregateType = "Order",
aggregateId = order.id,
eventType = "OrderCreated",
payload = objectMapper.writeValueAsString(event)
)
)
return order
}
}Polling Publisher
@Component
class OutboxPollingPublisher(
private val outboxRepository: OutboxRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
private val logger = LoggerFactory.getLogger(javaClass)
@Scheduled(fixedDelay = 1000) // 1초마다 실행
@Transactional
fun publishPendingEvents() {
val pendingEvents = outboxRepository.findByStatusOrderByCreatedAtAsc(
OutboxStatus.PENDING,
PageRequest.of(0, 100)
)
pendingEvents.forEach { event ->
try {
val topic = "${event.aggregateType.lowercase()}-events"
kafkaTemplate.send(topic, event.aggregateId, event.payload)
.get() // 동기 전송으로 확실히 전송
event.status = OutboxStatus.PROCESSED
event.processedAt = Instant.now()
outboxRepository.save(event)
logger.info("Published event: ${event.eventType} for ${event.aggregateId}")
} catch (e: Exception) {
logger.error("Failed to publish event: ${event.id}", e)
event.status = OutboxStatus.FAILED
outboxRepository.save(event)
}
}
}
}Polling 방식의 한계
- 지연 시간: 폴링 간격만큼의 지연 발생
- 부하: 지속적인 DB 폴링으로 인한 부하
- 중복: 장애 상황에서 중복 전송 가능
구현 방법 2: Debezium CDC (Change Data Capture)
Debezium은 데이터베이스의 변경 사항을 실시간으로 캡처하여 Kafka로 스트리밍합니다.
Docker Compose 설정
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: orderdb
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command:
- "postgres"
- "-c"
- "wal_level=logical" # CDC를 위해 필수
ports:
- "5432:5432"
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
connect:
image: debezium/connect:2.4
depends_on:
- kafka
- postgres
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statusesDebezium Connector 설정
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orderdb",
"database.server.name": "orderdb",
"table.include.list": "public.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.topic.replacement": "${routedByValue}-events",
"transforms.outbox.table.fields.additional.placement": "aggregate_type:header:aggregateType"
}
}Connector 등록
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @outbox-connector.jsonDebezium Outbox Event Router
Debezium의 Outbox Event Router는 outbox 테이블의 레코드를 자동으로 적절한 토픽으로 라우팅합니다.
// Outbox 테이블 구조 (Debezium용)
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
@Id
val id: UUID = UUID.randomUUID(),
@Column(name = "aggregate_type", nullable = false)
val aggregateType: String,
@Column(name = "aggregate_id", nullable = false)
val aggregateId: String,
@Column(name = "event_type", nullable = false)
val eventType: String,
@Column(columnDefinition = "JSONB", nullable = false)
val payload: String
)Service Layer (CDC 방식)
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val outboxRepository: OutboxRepository,
private val objectMapper: ObjectMapper
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
val order = orderRepository.save(Order.create(command))
// Outbox에 저장하면 Debezium이 자동으로 Kafka로 전송
outboxRepository.save(
OutboxEvent(
aggregateType = "Order",
aggregateId = order.id,
eventType = "OrderCreated",
payload = objectMapper.writeValueAsString(
OrderCreatedEvent(order)
)
)
)
return order
}
}Exactly-Once Semantics 보장
Consumer 측 Idempotency
@Component
class OrderEventConsumer(
private val processedEventRepository: ProcessedEventRepository
) {
@KafkaListener(topics = ["order-events"])
@Transactional
fun handleOrderEvent(
@Payload payload: String,
@Header(KafkaHeaders.RECEIVED_KEY) key: String,
@Header("id") eventId: String
) {
// 이미 처리된 이벤트인지 확인
if (processedEventRepository.existsById(eventId)) {
logger.info("Event already processed: $eventId")
return
}
// 이벤트 처리
processEvent(payload)
// 처리 완료 기록
processedEventRepository.save(ProcessedEvent(eventId, Instant.now()))
}
}
@Entity
@Table(name = "processed_events")
class ProcessedEvent(
@Id
val eventId: String,
val processedAt: Instant
)Outbox 테이블 정리
Outbox 테이블이 무한히 커지는 것을 방지하기 위한 정리 작업:
@Component
class OutboxCleaner(
private val outboxRepository: OutboxRepository
) {
@Scheduled(cron = "0 0 2 * * *") // 매일 새벽 2시
@Transactional
fun cleanOldEvents() {
val cutoffTime = Instant.now().minus(Duration.ofDays(7))
val deletedCount = outboxRepository.deleteByStatusAndCreatedAtBefore(
OutboxStatus.PROCESSED,
cutoffTime
)
logger.info("Deleted $deletedCount old outbox events")
}
}성능 최적화
Batch Processing
@Component
class BatchOutboxPublisher(
private val outboxRepository: OutboxRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
@Scheduled(fixedDelay = 500)
@Transactional
fun publishBatch() {
val events = outboxRepository.findPendingEvents(limit = 1000)
if (events.isEmpty()) return
val futures = events.map { event ->
val topic = "${event.aggregateType.lowercase()}-events"
kafkaTemplate.send(topic, event.aggregateId, event.payload)
}
// 모든 전송 완료 대기
futures.forEach { it.get() }
// Batch update
outboxRepository.markAsProcessed(events.map { it.id!! })
}
}정리
Outbox Pattern은 다음과 같은 이점을 제공합니다:
| 특성 | Polling | CDC (Debezium) | |------|---------|----------------| | 지연 시간 | 폴링 간격 | 거의 실시간 | | 구현 복잡도 | 낮음 | 중간 | | 인프라 요구사항 | 낮음 | Debezium 필요 | | DB 부하 | 중간 | 낮음 | | 확장성 | 제한적 | 높음 |
다음 글에서는 CQRS 패턴을 통해 읽기와 쓰기 모델을 분리하는 방법을 다루겠습니다.