Post

Spring boot, Kafka 간단한 예제

Kafka를 단순한 Message Queue로서 사용하는 간단한 예제이다

Kafka를 단순한 Message Queue로서 사용하는 간단한 예제이다

예제에 사용된 소스 코드는 하단의 Repository 링크를 참고하면 된다

Spring boot

dependency

spring-boot-starter-web

org.springframework.boot:spring-boot-starter-web

Spring boot의 MVC 구조를 지원하고 logback logger와 JSON 관련 라이브러리인 jackson를 포함한다 추가로 기본 내장 웹서버인 tomcat을 포함하고 있다

spring-kafka

org.springframework.kafka:spring-kafka

Spring for Apache Kafka

Spring boot와 Kafka를 연동하기 위한 라이브러리

lombok

org.projectlombok:lombok

annotation 기반의 코드 자동 생성 라이브러리

application.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
server:
  port: 8081 # kafka ui가 8080으로 실행되어 있어서 spring boot는 8081로 설정
spring:
  kafka:
    bootstrap-servers: # kafka brokers
      - localhost:9092
      - localhost:9093
      - localhost:9094
    consumer:
      auto-offset-reset: earliest # consumer가 topic을 가져오는 순서, latest, earliest, none
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring:
          deserializer:
            value:
              delegate:
                class: org.springframework.kafka.support.serializer.JsonDeserializer
          json:
            trusted:
              packages: com.illdangag.kafka.data
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Kafka의 메시지 종류로는 Java bean을 사용하기 위해서 Spring boot와 Kafka는 객체를 JSON 형태로 직렬화/역직렬화 하여 메시지를 송수신 하여야 한다

Consumer는 org.springframework.kafka.support.serializer.JsonDeserializer를 사용하고 Producer는 org.springframework.kafka.support.serializer.JsonSerializer를 사용한다

위 application.yaml 설정을 보면 ErrorHandlingDeserializer를 사용하고 JsonDeserializer를 간접적으로 사용하고 있는데 이유는 역직렬화 과정에서 오류로 인하여 예외가 발생한다면 JsonDeserializer를 바로 사용 하는 경우에는 동일한 오류 메시지가 멈추지 않고 매우 빠른 속도로 많이 발생하는데 ErrorHandlingDeserializer를 사용하고 JsonDeserializer를 간접적으로 사용 한다면 오류 메시지는 한번 출력된다

consumer.properties.spring.json.trusted.packages는 역직렬화 하는 Class에 대한 whitelist이다 whitelist에 등록되지 않은 Class인 경우에는 역직렬화 과정에서 예외가 발생하여 Kafka로부터 메시지를 올바르게 소비 할 수 없다

1
2
Caused by: java.lang.IllegalArgumentException: The class 'com.illdangag.kafka.data.User' is not in the trusted packages: [java.util, java.lang, com.illdangag.kafka].
If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

Message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.illdangag.kafka.data;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;

@AllArgsConstructor
@NoArgsConstructor
@Getter
@ToString
public class User {
    private String name;
    private int age;
}

Kafka를 통하여 송수신 할 Java bean 클래스이다

lombok 라이브러리를 사용하여 몇가지 메서드를 생성했다

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.illdangag.kafka.service;

import com.illdangag.kafka.data.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class KafkaProducer {
    private final KafkaTemplate<String, User> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(User user) {
        log.info("Produce: {}", user);
        this.kafkaTemplate.send("User", user); // topic, message
    }
}

Spring boot에서 Kafka로 메시지를 송신한다

KafkaTemplate.send 메서드의 첫번째 인자는 Kafka의 topic, 두번째 인자는 Kafka에 전송할 메시지이다

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.illdangag.kafka.service;

import com.illdangag.kafka.data.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Slf4j
@Service
public class KafkaConsumer {
    @KafkaListener(topics = "User", groupId = "user-group")
    public void consume(User user) throws IOException {
        log.info("Consume: {}", user.toString());
    }
}

KafkaListener annotation으로 Kafka Broker로부터 Consumer가 구독 할 topic과 Consumer의 그룹을 설정한다

Producer로 부터 Kafka로 구독중인 topic에 메세지가 등록되면 KafkaListener annotation이 등록된 메서드가 호출 되고 메시지가 해당 메서드의 인자로 전달된다

Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.illdangag.kafka.controller;

import com.illdangag.kafka.data.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.illdangag.kafka.service.KafkaProducer;

@RestController
@RequestMapping(value = "/send")
public class UserController {
    private final KafkaProducer producer;

    @Autowired
    public UserController(KafkaProducer producer) {
        this.producer = producer;
    }

    @RequestMapping(method = RequestMethod.GET, value = "/user")
    public ResponseEntity<String> send00(@RequestParam(name = "name", defaultValue = "", required = false) String name,
                                         @RequestParam(name = "age", defaultValue = "", required = false) int age) {
        User user = new User(name, age);
        producer.sendMessage(user);

        return ResponseEntity.status(HttpStatus.OK).body("OK");
    }
}

Kafka로 메시지를 전송하기 위해서 간단히 구현한 REST API이다

간편하게 웹 브라우저를 사용하기 위해서 http method는 GET을 사용하고 User 객체에 설정할 이름과 나이는 query parameter로 전달 하도록 한다

서버 실행 및 테스트

KafkaApplication.main 메서드를 실행하여 Spring boot 서버를 실행한다

http://localhost:8081/send/user?name=kim&age=27

로컬 환경에 spring boot의 port가 상단 application.yaml 설정을 따른다면 위 URL로 Kafka에 메시지를 생성 할 수 있다

1
2
2023-05-20T15:00:37.209+09:00  INFO 49060 --- [nio-8081-exec-1] c.illdangag.kafka.service.KafkaProducer  : Produce: User(name=kim, age=27)
2023-05-20T15:00:37.306+09:00  INFO 49060 --- [ntainer#0-0-C-1] c.illdangag.kafka.service.KafkaConsumer  : Consume: User(name=kim, age=27)

Kafka-ui

spring-boot-kafka-sample-00
spring-boot-kafka-sample-01
kafka에 등록된 Consumer와 Kafka가 수신 한 메시지를 확인 할 수 있다

This post is licensed under CC BY 4.0 by the author.