Producer를 개발해서 메시지를 Queue에 보낼 수 있게 해놓았으면 이게 그 메시지를 받아 DB에 적재하든지, 아니면 다른 방법으로 활용하기 위한 Consumer를 개발해야 할 것이다.
이것도 수신 기능을 구현하는 것 자체는 해보면 간단하다..
SpringBoot 프로젝트를 하나 새로 생성한다.
Kafka Consumer
1.build.gradle 에 spring-kafka 라이브러리 의존성을 추가한다.
dependencies {
// Kafka
implementation 'org.springframework.kafka:spring-kafka:2.8.2'
}
2.application.yml에 메시지를 수신할 Kafka Broker 서버 정보와, 컨슈머 그룹ID를 설정한다.
spring:
kafka:
consumer:
group-id: test_consumer_group_01
bootstrap-servers: kafka.01.server.com:9092,kafka.02.server.com:9092,kafka.03.server.com:9092
컨슈머 그룹ID는 임의의 값을 직접 설정해서 쓰면 된다. 같은 컨슈머 그룹 내에서는 메시지를 중복 수신하지 않는다. 다른 컨슈머 그룹끼리는 동일한 메시지를 중복 수신할 수 있다.
컨슈머 그룹은 Kafka Topic의 offset에 등록되어 메시지가 모든 컨슈머 그룹에서 수신이 완료되지 않았을 경우 서버에 Lag으로 쌓이게 된다.
3.Listener 역할을 할 클래스를 만들고 @KafkaListener 애노테이션으로 메시지 수신 후 로직을 담을 메서드를 만든다.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsume {
@KafkaListener(topics = "TEST-TOPIC", concurrency = "1", autoStartup = "false")
public void listener(ConsumerRecord<String, String> payload) {
System.out.println("Received Key : " + payload.key());
System.out.println("Received Message : " + payload.value());
}
}
옵션
- topics : 수신 대상이 되는 Kafka Topic의 이름을 넣는다.
- concurrency : 이 옵션에 적시할 수대로 thread가 발생하여 돌아간다. 통상 partition의 숫자만큼 thread가 동작하는 것이 가장 좋다고 한다.
- autoStartup : true 로 지정 시 애플리케이션이 start될 때 자동으로 컨슈밍이 시작되며, false 로 되어 있으면 수동으로 시작 메서드를 실행시켜줘야 한다.
4.Graceful Up/Down 만들기
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Service;
@Service
public class KafkaListenerService {
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
public KafkaListenerService(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
}
public void stopListener() {
kafkaListenerEndpointRegistry.stop();
}
public void startListener() {
for (MessageListenerContainer listenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
if (!listenerContainer.isRunning()) {
listenerContainer.start();
}
}
}
}
적당히 서비스 클래스를 만들고 KafkaListenerEndpointRegistry 빈 의존성을 주입한다.
kafkaListenerEndpointRegistry 객체의 stop() 메서드를 실행하면 모든 컨슈밍 작동이 중단되며, 등록된 listenerContainer 객체들의 start() 메서드를 실행시켜 주면 컨슈밍이 재개된다.
SQS Consumer
1.build.gradle에 Spring Cloud AWS의 라이브러리 중 SQS에 관련된 요소의 의존성을 추가한다.
dependencies {
// Sqs
implementation 'io.awspring.cloud:spring-cloud-aws-autoconfigure:2.4.1'
implementation 'io.awspring.cloud:spring-cloud-aws-messaging:2.4.1'
implementation 'io.awspring.cloud:spring-cloud-starter-aws:2.4.1'
}
2.application.yml에 AWS Credential 과 Region 관련 설정을 넣는다.
cloud:
aws:
credentials:
accessKey: # access-key 사용시 적시
secretKey: # secret-key 사용시 적시
use-default-aws-credentials-chain: true
region:
static: ap-northeast-2
stack:
auto: false
cloud.aws.stack.auto는 기본값 설정이 true인데 AWS CloudFormation이 셋팅되어 있지 않으면 에러를 발생시키므로 false로 설정한다.
위 설정에서 accessKey와 secretKey는 AWS가 아닌 다른 환경에서 구동할 때 필요하다.
AWS SDK에서는 credential 관련해서 6가지의 인증 옵션을 제공하는데, 아무런 설정을 하지 않으면 우선 순위에 의해서 인증 옵션이 순차적으로 적용된다.(Provider Chain)
Default Credential Provider Chain : https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
별도의 설정을 하고 싶지 않으면 use-default-aws-credentials-chain의 값을 true로 지정해줘야 Provider Chain이 작동된다. (기본값이 false)
특정한 인증수단을 지정해주고 싶으면, SQS나 S3의 Client 빈을 별도로 생성해주고 인증수단을 적시해놓는다.(아래 예시)
@Bean
public AmazonSQS amazonSQS() {
return AmazonSQSAsyncClientBuilder
.standard()
.withCredentials(new WebIdentityTokenCredentialsProvider())
.withRegion(Regions.AP_NORTHEAST_2)
.build();
}
3.Listener 클래스를 만들고 @SqsListener 애노테이션으로 수신 후 로직을 담을 메서드를 만든다.
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
import static org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy.ON_SUCCESS;
@Component
public class SqsConsume {
@SqsListener(value = "TEST-QUEUE", deletionPolicy = ON_SUCCESS)
public void listener(@Payload String payload, @Headers Map<String, String> headers) {
log.info("Received Header : {}", headers);
log.info("Received Payload : {}", payload);
}
}
매개변수 중 대개는 payload만 활용한다. (Message Body)
옵션
- value : 수신 대상이 되는 SQS 대기열(queue)의 이름이나, URL을 넣는다.
- deletionPolicy : 수신 후 대기열에 남아있는 메시지를 어떻게 할 것인지 정책.
- ALWAYS : 메서드로 메시지가 수신되면 바로 삭제한다.
- NEVER : 메시지를 삭제하지 않는다.
- NO_REDRIVE : DLQ(Dead-letter queue) 가 설정되지 않으면 삭제한다.
- ON_SUCCESS : Exception이 발생하지 않고 메서드가 정상 종료되면 삭제한다.
- DEFAULT : 아무 것도 정의하지 않으면 DEFAULT로 구동되는데, NO_REDRIVE로 작동한다.
deletionPolicy를 NEVER로 하고 Acknowledgment 파라메터의 메서드로 메시지를 삭제하는 CASE
@SqsListener(value = "TEST-QUEUE", deletionPolicy = NEVER)
public void onMessage(@Payload String payload, Acknowledgment ack) {
try {
transferService.convertAndTransfer(payload);
ack.acknowledge();
} catch (Exception e) {
e.printStackTrace();
}
}
4.Graceful Up/Down 만들기
import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
public class ConsumeSwitchService {
private final SimpleMessageListenerContainer simpleMessageListenerContainer;
public ConsumeSwitchService(SimpleMessageListenerContainer simpleMessageListenerContainer) {
this.simpleMessageListenerContainer = simpleMessageListenerContainer;
}
public void start() {
simpleMessageListenerContainer.start();
}
public void stop() {
simpleMessageListenerContainer.stop();
}
public boolean isRunning() {
return simpleMessageListenerContainer.isRunning();
}
}
적당히 서비스 클래스를 만들고 SimpleMessageListenerContainer 빈 의존성을 주입한다.
simpleMessageListenerContainer 객체의 start() 메서드를 실행하면 컨슈밍이 시작되고, stop() 메서드를 실행하면 컨슈밍이 중단된다.
API를 만들어 외부에서 호출하여 Start/Stop을 컨트롤할 수 있게 해줘도 좋을 것이다.
simpleMessageListenerContainer는 기본 설정이 autoStartup = true 이다. 빈 생성시나 서비스가 시작할 때 false로 별도 셋팅해주면 된다.
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSqs, QueueMessageHandler queueMessageHandler) {
final SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory = new SimpleMessageListenerContainerFactory();
simpleMessageListenerContainerFactory.setAmazonSqs(amazonSqs);
SimpleMessageListenerContainer simpleMessageListenerContainer = simpleMessageListenerContainerFactory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
simpleMessageListenerContainer.setAutoStartup(false); // 자동 시작 여부
simpleMessageListenerContainer.setMaxNumberOfMessages(10); // Consume 1회시 수신할 수 있는 최대의 메시지 갯수 : 10이 Maximun 값
return simpleMessageListenerContainer;
}