← Back to Home

Production-Ready Event-Driven Architecture Part 5 - Event Schema Evolution and Versioning

시리즈 소개

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현
  3. Part 3: CQRS와 Read/Write 모델 분리
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리
  5. Part 5: Event Schema 진화와 버전 관리 (현재 글)

스키마 진화의 필요성

이벤트 기반 시스템은 시간이 지나면서 변화합니다:

  • 새로운 필드 추가
  • 기존 필드 제거
  • 필드 타입 변경
  • 필드 이름 변경

이러한 변경이 기존 Consumer를 깨뜨리지 않도록 해야 합니다.

호환성 유형

1. Backward Compatibility (하위 호환성)

새로운 스키마로 이전 데이터를 읽을 수 있음

Producer (v1) ──▶ [Event v1] ──▶ Consumer (v2) ✓

허용되는 변경:

  • 기본값이 있는 새 필드 추가
  • 선택적 필드 제거

2. Forward Compatibility (상위 호환성)

이전 스키마로 새로운 데이터를 읽을 수 있음

Producer (v2) ──▶ [Event v2] ──▶ Consumer (v1) ✓

허용되는 변경:

  • 기본값이 있는 필드 제거
  • 선택적 필드 추가

3. Full Compatibility (완전 호환성)

양방향 모두 호환

Producer (v1) ◀──▶ Consumer (v2) ✓
Producer (v2) ◀──▶ Consumer (v1) ✓

가장 안전하지만 제약이 많음.

Avro를 활용한 스키마 관리

Avro Schema 정의

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.events.order",
  "fields": [
    {
      "name": "orderId",
      "type": "string"
    },
    {
      "name": "customerId",
      "type": "string"
    },
    {
      "name": "items",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "productId", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "price", "type": "double"}
          ]
        }
      }
    },
    {
      "name": "totalAmount",
      "type": "double"
    },
    {
      "name": "occurredAt",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

스키마 진화 예제

V1 → V2: 새 필드 추가 (하위 호환)

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.events.order",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "items", "type": {"type": "array", "items": "OrderItem"}},
    {"name": "totalAmount", "type": "double"},
    {"name": "occurredAt", "type": {"type": "long", "logicalType": "timestamp-millis"}},
    {
      "name": "currency",
      "type": "string",
      "default": "KRW"
    },
    {
      "name": "metadata",
      "type": ["null", {"type": "map", "values": "string"}],
      "default": null
    }
  ]
}

Spring Boot + Avro 설정

// build.gradle.kts
plugins {
    id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}
 
dependencies {
    implementation("org.apache.avro:avro:1.11.3")
    implementation("io.confluent:kafka-avro-serializer:7.5.0")
}
 
avro {
    setCreateSetters(false)
    setFieldVisibility("PRIVATE")
}
# application.yml
spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    properties:
      schema.registry.url: http://localhost:8081
      specific.avro.reader: true

Avro Producer

@Service
class OrderEventProducer(
    private val kafkaTemplate: KafkaTemplate<String, OrderCreated>
) {
    fun publishOrderCreated(order: Order) {
        val event = OrderCreated.newBuilder()
            .setOrderId(order.id)
            .setCustomerId(order.customerId)
            .setItems(order.items.map { item ->
                OrderItem.newBuilder()
                    .setProductId(item.productId)
                    .setQuantity(item.quantity)
                    .setPrice(item.price.toDouble())
                    .build()
            })
            .setTotalAmount(order.totalAmount.toDouble())
            .setOccurredAt(Instant.now().toEpochMilli())
            .setCurrency("KRW")  // v2 필드
            .setMetadata(null)   // v2 필드
            .build()
 
        kafkaTemplate.send("order-events", order.id, event)
    }
}

Avro Consumer (V1 Consumer가 V2 메시지 처리)

@Component
class OrderEventConsumer {
 
    @KafkaListener(topics = ["order-events"])
    fun handleOrderCreated(event: OrderCreated) {
        // V1 Consumer는 currency, metadata 필드를 무시
        // Avro가 자동으로 처리
        processOrder(
            orderId = event.getOrderId(),
            customerId = event.getCustomerId(),
            items = event.getItems(),
            totalAmount = event.getTotalAmount()
        )
    }
}

Protocol Buffers를 활용한 스키마 관리

Proto 파일 정의

syntax = "proto3";
 
package com.example.events.order;
 
option java_multiple_files = true;
option java_package = "com.example.events.order";
 
import "google/protobuf/timestamp.proto";
 
message OrderCreated {
  string order_id = 1;
  string customer_id = 2;
  repeated OrderItem items = 3;
  double total_amount = 4;
  google.protobuf.Timestamp occurred_at = 5;
 
  // V2 additions
  string currency = 6;  // 새 필드 (선택적)
  map<string, string> metadata = 7;  // 새 필드 (선택적)
}
 
message OrderItem {
  string product_id = 1;
  int32 quantity = 2;
  double price = 3;
}

Protobuf 진화 규칙

// 안전한 변경들:
// 1. 새 필드 추가 (고유한 필드 번호 사용)
message OrderCreatedV2 {
  string order_id = 1;
  string customer_id = 2;
  repeated OrderItem items = 3;
  double total_amount = 4;
  google.protobuf.Timestamp occurred_at = 5;
  string currency = 6;          // 새 필드
  string shipping_method = 7;   // 새 필드
}
 
// 2. 필드를 optional에서 repeated로 변경 (scalar types)
// 3. 호환되는 타입 간 변경 (int32 <-> int64)
 
// 위험한 변경들 (하지 말 것):
// 1. 필드 번호 변경
// 2. 필드 타입을 비호환 타입으로 변경
// 3. required 필드 추가 (proto2)

Spring Boot + Protobuf

// build.gradle.kts
plugins {
    id("com.google.protobuf") version "0.9.4"
}
 
dependencies {
    implementation("com.google.protobuf:protobuf-java:3.25.1")
    implementation("io.confluent:kafka-protobuf-serializer:7.5.0")
}
 
protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.25.1"
    }
}

Schema Registry 활용

Schema Registry 아키텍처

┌──────────────┐         ┌──────────────┐         ┌──────────────┐
│   Producer   │────────▶│    Kafka     │────────▶│   Consumer   │
└──────┬───────┘         └──────────────┘         └──────┬───────┘
       │                                                  │
       │  Register Schema                    Get Schema   │
       │                                                  │
       ▼                                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                      Schema Registry                             │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  Subject: order-events-value                             │    │
│  │  ├── Version 1: OrderCreated (v1)                       │    │
│  │  ├── Version 2: OrderCreated (v2) + currency            │    │
│  │  └── Version 3: OrderCreated (v3) + metadata            │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

Docker Compose 설정

version: '3.8'
services:
  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    hostname: schema-registry
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

Schema Registry API 활용

@Configuration
class SchemaRegistryConfig {
 
    @Bean
    fun schemaRegistryClient(): SchemaRegistryClient {
        return CachedSchemaRegistryClient(
            "http://localhost:8081",
            100  // max schemas to cache
        )
    }
}
 
@Service
class SchemaService(
    private val schemaRegistryClient: SchemaRegistryClient
) {
    fun registerSchema(subject: String, schema: Schema): Int {
        return schemaRegistryClient.register(subject, schema)
    }
 
    fun getLatestSchema(subject: String): Schema {
        val metadata = schemaRegistryClient.getLatestSchemaMetadata(subject)
        return Schema.Parser().parse(metadata.schema)
    }
 
    fun checkCompatibility(subject: String, schema: Schema): Boolean {
        return schemaRegistryClient.testCompatibility(subject, schema)
    }
 
    fun getSchemaVersions(subject: String): List<Int> {
        return schemaRegistryClient.getAllVersions(subject)
    }
}

호환성 설정

# Subject 레벨 호환성 설정
curl -X PUT http://localhost:8081/config/order-events-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'
 
# 가능한 호환성 수준:
# - BACKWARD (기본값)
# - BACKWARD_TRANSITIVE
# - FORWARD
# - FORWARD_TRANSITIVE
# - FULL
# - FULL_TRANSITIVE
# - NONE

실전 스키마 진화 전략

1. 단계별 롤아웃

Day 1: Consumer V2 배포 (V1, V2 모두 처리 가능)
Day 2: Producer V2 배포 (V2 이벤트 발행 시작)
Day 7: 모니터링 후 V1 Consumer 제거

2. Feature Flag를 활용한 점진적 전환

@Service
class OrderEventProducer(
    private val kafkaTemplate: KafkaTemplate<String, GenericRecord>,
    private val featureFlagService: FeatureFlagService,
    private val schemaV1: Schema,
    private val schemaV2: Schema
) {
    fun publishOrderCreated(order: Order) {
        val event = if (featureFlagService.isEnabled("use-order-event-v2")) {
            createV2Event(order)
        } else {
            createV1Event(order)
        }
 
        kafkaTemplate.send("order-events", order.id, event)
    }
 
    private fun createV1Event(order: Order): GenericRecord {
        return GenericRecordBuilder(schemaV1)
            .set("orderId", order.id)
            .set("customerId", order.customerId)
            .set("totalAmount", order.totalAmount.toDouble())
            .build()
    }
 
    private fun createV2Event(order: Order): GenericRecord {
        return GenericRecordBuilder(schemaV2)
            .set("orderId", order.id)
            .set("customerId", order.customerId)
            .set("totalAmount", order.totalAmount.toDouble())
            .set("currency", order.currency)
            .set("metadata", order.metadata)
            .build()
    }
}

3. 다중 스키마 Consumer

@Component
class MultiVersionOrderConsumer {
 
    @KafkaListener(topics = ["order-events"])
    fun handleOrderEvent(
        @Payload record: GenericRecord,
        @Header(KafkaHeaders.RECEIVED_KEY) key: String
    ) {
        val schemaVersion = getSchemaVersion(record)
 
        when {
            schemaVersion < 2 -> handleV1(record)
            schemaVersion < 3 -> handleV2(record)
            else -> handleV3(record)
        }
    }
 
    private fun getSchemaVersion(record: GenericRecord): Int {
        return record.schema.getField("currency")?.let { 2 }
            ?: record.schema.getField("metadata")?.let { 3 }
            ?: 1
    }
 
    private fun handleV1(record: GenericRecord) {
        val orderId = record.get("orderId").toString()
        val totalAmount = record.get("totalAmount") as Double
        // V1 처리 로직 (currency 기본값 사용)
        processOrder(orderId, totalAmount, "KRW")
    }
 
    private fun handleV2(record: GenericRecord) {
        val orderId = record.get("orderId").toString()
        val totalAmount = record.get("totalAmount") as Double
        val currency = record.get("currency")?.toString() ?: "KRW"
        processOrder(orderId, totalAmount, currency)
    }
}

4. Dead Letter Queue와 스키마 불일치 처리

@Component
class SchemaAwareErrorHandler(
    private val deadLetterProducer: KafkaTemplate<String, ByteArray>
) : CommonErrorHandler {
 
    override fun handleRecord(
        thrownException: Exception,
        record: ConsumerRecord<*, *>,
        consumer: Consumer<*, *>,
        container: MessageListenerContainer
    ) {
        when (thrownException.cause) {
            is SerializationException -> {
                // 스키마 불일치 - DLQ로 전송
                sendToDeadLetter(record, thrownException)
            }
            else -> {
                // 다른 에러 처리
                throw thrownException
            }
        }
    }
 
    private fun sendToDeadLetter(
        record: ConsumerRecord<*, *>,
        exception: Exception
    ) {
        val headers = record.headers().toMutableList()
        headers.add(RecordHeader("error-reason", exception.message?.toByteArray()))
        headers.add(RecordHeader("original-topic", record.topic().toByteArray()))
 
        deadLetterProducer.send(
            ProducerRecord(
                "${record.topic()}-dlq",
                null,
                record.key() as? String,
                record.value() as? ByteArray,
                headers
            )
        )
    }
}

스키마 진화 체크리스트

변경 전

  • [ ] 현재 스키마와의 호환성 확인
  • [ ] Schema Registry에서 호환성 테스트
  • [ ] 모든 Consumer가 새 스키마 처리 가능한지 확인
  • [ ] 롤백 계획 수립

변경 중

  • [ ] Consumer 먼저 배포
  • [ ] 모니터링 강화
  • [ ] 점진적 트래픽 전환

변경 후

  • [ ] DLQ 모니터링
  • [ ] Consumer lag 확인
  • [ ] 에러율 모니터링

정리

스키마 진화의 핵심 원칙:

| 원칙 | 설명 | |------|------| | 항상 하위 호환성 유지 | 새 Consumer가 이전 이벤트를 처리 가능해야 함 | | 필드 번호/이름 재사용 금지 | 삭제된 필드 번호는 영구적으로 예약 | | 기본값 필수 | 새 필드는 항상 기본값 포함 | | 점진적 배포 | Consumer 먼저, Producer 나중에 | | Schema Registry 활용 | 중앙 집중식 스키마 관리 |

이것으로 Event-Driven Architecture 시리즈를 마칩니다. 이 시리즈에서 다룬 패턴들을 조합하면 확장 가능하고 유지보수 가능한 마이크로서비스를 구축할 수 있습니다.