프로젝트에 비동기 처리를 위해 큐를 사용해야 할 일이 생겼습니다. 그래서 개인적으로 사용해보고 싶었던 카프카를 스프링에서 사용하는 예제를 공유하는 글입니다.
카프카에 대한 설명은 하지 않습니다.
준비 사항
우선 의존성을 추가해줍니다.
1
implementation 'org.springframework.kafka:spring-kafka:3.0.8'
그리고 yml 파일에 필요한 정보를 정의해줍니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
spring:
kafka:
bootstrap-servers: 3.34.97.97:9092
consumer:
# consumer bootstrap servers가 따로 존재하면 설정
# bootstrap-servers: 3.34.97.97:9092
# 식별 가능한 Consumer Group Id
group-id: waveofmymind # 임의로 작성
# Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 존재하지 않을 경우 수행할 작업을 설정
# latest: 가장 최근에 생산된 메시지로 offeset reset
# earliest: 가장 오래된 메시지로 offeset reset
# none: offset 정보가 없으면 Exception 발생
auto-offset-reset: earliest
# 데이터를 받아올 때, key/value를 역직렬화
# JSON 데이터를 받아올 것이라면 JsonDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# producer bootstrap servers가 따로 존재하면 설정
# bootstrap-servers: 3.34.97.97:9092
# 데이터를 보낼 때, key/value를 직렬화
# JSON 데이터를 보낼 것이라면 JsonDeserializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
이외에도 더 많은 설정을 할 수 있는데, 여기를 참고해주세요
이제 간단한 설정을 다 끝났고, 간단한 Pub/Sub를 해봅시다.
Spring에서
우선 포스트맨으로 요청을 보내면 프로젝트에서 카프카 토픽을 생성해서 메시지를 보내는 간단한 예제입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@RestController
@RequestMapping("/kafka")
class KafkaController(
private val KafkaProducer producer
) {
@PostMapping("/publish")
fun publish(@RequestParam message: String): String {
producer.sendMessage(message)
return "success"
}
}
@Service
class KafkaProducer(
private val KafkaTemplate<String, String> kafkaTemplate
) {
private val= TOPIC="Test"
fun sendMessage(message: String) {
println(String.format("Produce message : %s", message))
kafkaTemplate.send(TOPIC, message)
}
}
@Service
class KafkaConsumer {
@KafkaListener(topics = "Test", groupId = "waveofmymind")
fun consume(message: String) {
println(String.format("Consumed message : %s", message))
}
}
이제 기본적인 생성은 다 끝났습니다.
이제 서버를 실행하고, 포스트맨으로 테스트를 해봅시다.
저는 쿠버네티스에 카프카를 실행시켜놓았기 때문에, 콘솔로 접속해서 확인해보겠습니다.
잘 연결이 된 것 같으니, 쿠버네티스 콘솔에서 토픽 수신을 대기해놓고, 포스트맨을 통해 요청해보겠습니다.
위와 같이 정상적으로 수신이 된 것을 확인할 수 있습니다.