Kafka를 사용한 전달 방법
Kafka 개요
Kafka도 RabbitMQ와 마찬가지로 Producer Confirm과 Consumer Acknowledgement를 구현할 수 있다.
| 메커니즘 | 설명 |
|---|---|
| Producer Confirm | Producer → Kafka Topic 전달 확인 |
| Consumer Acknowledgement | Consumer의 메시지 처리 확인 (Offset Commit) |
Producer Confirm
KafkaTemplate 객체의 send() 메서드를 사용하면 Kafka에 메시지를 발행할 수 있다.
Producer.java
@Slf4j
@Component
@RequiredArgsConstructor
public class Producer {
public void sendEvent(CreateTaskEvent event) {
ListenableFuture<SendResult<String, CreateTaskEvent>> future = kafkaTemplate.send(TOPIC_TASK, event);
future.addCallback(
result -> log.info("offset: {}", result.getRecordMetadata().offset()),
throwable -> log.error("fail to publish", throwable)
);
}
}응답 값은 ListenableFuture로, ListenableFuture 객체에 두 개의 콜백을 등록할 수 있다:
- Success Callback: SendResult를 인자로 받을 수 있으며, 발행한 메시지의 메타 정보나 ProducerRecord의 값을 참조할 수 있다.
- Failure Callback: 예외를 인자로 받아 실패한 메시지의 예외를 처리할 수 있다. 로그 기록, 이메일 전송 등의 처리가 가능하다.
Consumer Acknowledgement
Consumer가 메시지를 정상적으로 처리했는지 Kafka에 알리는 메커니즘이다.
구현 방법
AcknowledgingMessageListener.java
@FunctionalInterface
public interface AcknowledgingMessageListener<K, V> extends MessageListener<K, V> {
default void onMessage(ConsumerRecord<K, V> data) {
throw new UnsupportedOperationException("Container should never call this");
}
void onMessage(ConsumerRecord<K, V> var1, Acknowledgment var2);
}* 두 번째 인자를 사용하여 Consumer Ack를 구현할 수 있다.
CustomKafkaListener.java
@Override
@KafkaListener(
// ...
containerFactory = "kafkaListenerContainerFactory"
)
public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
try {
// Do something
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Error to receive messages", e);
}
}두 번째 인자로 받은 Acknowledgment의 메서드를 호출하면 성공한 Consumer Ack가 전송된다.
설정
중요한 점은 설정에서 enable-auto-commit 설정값을 false로 지정하여 수동으로 Ack를 전송하도록 명시해야 한다.
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}참고 자료
Last updated on