SpringBoot에서 Kafka Event Message 보내고 받기
당연한 얘기지만 Kafka를 사용하기 위해서는 Kakfa가 설치되어있는 서버를 사용하거나 직접 설치해서 사용해야 한다
나는 Docker를 사용해서 Kafka를 로컬 개발환경에 설치 한 후, 사용하려고 한다.
다른곳에 설치되어있는 Kafka를 사용한다면 오늘 사용하는 Kafka 주소만 변경 해 주면 될 것 같다
간단하게 개념정리
Kafka Broker
- 단일 Kafka 클러스터는 브로커로 구성
- 생산자와 소비자를 처리하고 클러스터에 복제된 데이터를 유지하는 역할
Kafka Topic
- 레코드가 게시되는 범주
- 카프카 메시지의 주제
Kafka Producer
- Kafka에 데이터를 가져오기 위해 작성하는 애플리케이션
- 데이터 생산자
Kafka Consumer
- Kafka에서 데이터를 가져오기 위해 작성하는 프로그램
- 데이터 소비자
Zookeeper
- Kafka 클러스터를 관리하고, 노드 상태를 추적하고, 주제 및 메시지 목록을 유지 관리하는 데 사용
Obviously, to use Kafka, you either need to use a server where Kafka is already installed or install it yourself.
I'm going to install Kafka in my local development environment using Docker.
If you're using Kafka installed elsewhere, you just need to change the Kafka address used today.
Quick Concept Overview
Kafka Broker
- A single Kafka cluster consists of brokers
- Responsible for handling producers and consumers and maintaining replicated data in the cluster
Kafka Topic
- A category where records are published
- The subject of Kafka messages
Kafka Producer
- An application written to send data to Kafka
- Data producer
Kafka Consumer
- A program written to retrieve data from Kafka
- Data consumer
Zookeeper
- Used to manage the Kafka cluster, track node status, and maintain a list of topics and messages
zookeeper와 kafka를 설치하려고한다
따로 Docker파일을 작성하지 않고 docker-compose.yml로 작성하려고 한다
I'm going to install zookeeper and kafka.
Instead of writing a separate Dockerfile, I'll use docker-compose.yml.
#docker-compose.yml - kafka/zookeper
version: '3'
services:
zookeeper:
image: arm64v8/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181docker ps 명령어를 통해서 해당 프로세스들이 잘 실행되고 있는지 확인한다
Use the docker ps command to verify that these processes are running properly.
Kafka 연결하기
Zookeeper 및 Kafka 컨테이너가 실행되면, 카프카에 접속 해 준다
Connecting to Kafka
Once the Zookeeper and Kafka containers are running, connect to Kafka.
docker exec -it kafka /bin/shKafka 컨테이너 내에 카프카 스크립트들은 opt 폴더 내의 kafka_<버전> 폴더 내의 bin 아래에 있다.
나의 경우, opt/kafka_2.13-2.81/bin 에서 작업을 했다 (설치 시기나 이미지에 따라서 카프카버전이 다를 것)
/opt/kafka_2.13-2.81/bin경로로 들어가서 아래와 같은 명령어로 토픽을 생성한다
The Kafka scripts inside the Kafka container are located in the bin folder under the kafka_
In my case, I worked in opt/kafka_2.13-2.81/bin (the Kafka version may differ depending on when you installed it or the image).
Navigate to /opt/kafka_2.13-2.81/bin and create a topic with the following command.
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic wool_kafka_topickafka-topics.sh --list --zookeeper zookeeper:2181간단한 토픽 관련된 명령어 및 python으로 핸들링 하는 방법은 미리 포스팅 해 본 적이 있다.
I've previously posted about simple topic-related commands and how to handle them with Python.
SpringBoot를 gradle이나 maven으로 시작했는지 잘 기억하고, 각각 환경에 맞는 방법으로 설치 해 주면 될 것 같다.
혹은, Intellij에서 SpringBoot를 시작할때 Spring Starter에서 Web과 Kafka를 선택해서 설치 해 주어도 된다.
Gradle -> build.gradle
Remember whether you started SpringBoot with Gradle or Maven, and install it using the appropriate method for your environment.
Alternatively, when starting SpringBoot in IntelliJ, you can select Web and Kafka in Spring Starter to install them.
Gradle -> build.gradle
// https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
implementation 'org.springframework.kafka:spring-kafka:2.8.5'
maven -> pom.xml
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.5</version>
</dependency>파이썬으로 작성한 Producer,Consumer Application
SpringBoot Application 하나에 Producer, Consumer 모두 작성 할 수 있지만 서로 다른 스프링부트 어플리케이션에서 데이터를 주고받는 작업을 진행 해 보려고한다.
Producer Application - config
스프링부트 어플리케이션을 생성하고, applicaion.yml을 만들어 서버 기본정보를 세팅하려고한다 (기존에 미리 세팅되어있는 application.properties는 삭제 해준다)
Producer and Consumer Applications written in Python
While you can write both Producer and Consumer in a single SpringBoot Application, I'm going to work on sending and receiving data between different SpringBoot applications.
Producer Application - config
I'll create a SpringBoot application and set up basic server information in application.yml (delete the pre-configured application.properties).
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializerProducer Application - Controller
package com.example.producerapplication.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Random;
@RestController
public class ProduceController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/publish")
public String publish() {
int leftLimit = 48; // numeral '0'
int rightLimit = 122; // letter 'z'
int targetStringLength = 10;
Random random = new Random();
String generatedString = random.ints(leftLimit, rightLimit + 1)
.filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
.limit(targetStringLength)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
this.kafkaTemplate.send("wool_kafka_topic", generatedString);
return "success";
}
}Consumer Application - config
컨슈머 어플리케이션의 정보는 기본 카프카의 정보와 더불어 서버 포트를 달리 해 주는 설정까지 추가했다
Consumer Application - config
For the consumer application, I added settings for basic Kafka information plus a different server port.
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: my_group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:
port: 8081Consumer Application - Service
package com.example.consumerapplication.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class ConsumerService {
@KafkaListener(topics = "wool_kafka_topic")
public void receive(String message) {
System.out.println(message);
}
}- Producer 역할을 하는 SpringBoot Application을 시작한다 (localhost:8080)
- 마찬가지로 Consumer SpringBoot Application을 시작한다 (localhost:8081)
- Producer에서
/publish의 url 주소를 호출 해 준다- 이 때 , 우리가 Random으로 생성한 문자열을 kafka로 보내게 된다
- Consumer Application의 콘솔에서 kafka에서 보낸 랜덤스트링이 있는지 확인한다
- Start the SpringBoot Application acting as Producer (localhost:8080)
- Similarly, start the Consumer SpringBoot Application (localhost:8081)
- Call the
/publishURL endpoint from the Producer- At this point, the randomly generated string is sent to Kafka
- Check the Consumer Application console to see if the random string sent from Kafka is received