Skip to Content

Kafka를 사용한 전달 방법

Kafka 개요

Kafka도 RabbitMQ와 마찬가지로 Producer ConfirmConsumer Acknowledgement를 구현할 수 있다.

메커니즘설명
Producer ConfirmProducer → Kafka Topic 전달 확인
Consumer AcknowledgementConsumer의 메시지 처리 확인 (Offset Commit)

Producer Confirm

KafkaTemplate 객체의 send() 메서드를 사용하면 Kafka에 메시지를 발행할 수 있다.

Producer.java
@Slf4j @Component @RequiredArgsConstructor public class Producer { public void sendEvent(CreateTaskEvent event) { ListenableFuture<SendResult<String, CreateTaskEvent>> future = kafkaTemplate.send(TOPIC_TASK, event); future.addCallback( result -> log.info("offset: {}", result.getRecordMetadata().offset()), throwable -> log.error("fail to publish", throwable) ); } }

응답 값은 ListenableFuture로, ListenableFuture 객체에 두 개의 콜백을 등록할 수 있다:

  1. Success Callback: SendResult를 인자로 받을 수 있으며, 발행한 메시지의 메타 정보ProducerRecord의 값을 참조할 수 있다.
  2. Failure Callback: 예외를 인자로 받아 실패한 메시지의 예외를 처리할 수 있다. 로그 기록, 이메일 전송 등의 처리가 가능하다.

Consumer Acknowledgement

Consumer가 메시지를 정상적으로 처리했는지 Kafka에 알리는 메커니즘이다.

구현 방법

AcknowledgingMessageListener.java
@FunctionalInterface public interface AcknowledgingMessageListener<K, V> extends MessageListener<K, V> { default void onMessage(ConsumerRecord<K, V> data) { throw new UnsupportedOperationException("Container should never call this"); } void onMessage(ConsumerRecord<K, V> var1, Acknowledgment var2); }

* 두 번째 인자를 사용하여 Consumer Ack를 구현할 수 있다.

CustomKafkaListener.java
@Override @KafkaListener( // ... containerFactory = "kafkaListenerContainerFactory" ) public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) { try { // Do something acknowledgment.acknowledge(); } catch (Exception e) { log.error("Error to receive messages", e); } }

두 번째 인자로 받은 Acknowledgment의 메서드를 호출하면 성공한 Consumer Ack가 전송된다.

설정

중요한 점은 설정에서 enable-auto-commit 설정값을 false로 지정하여 수동으로 Ack를 전송하도록 명시해야 한다.

@Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); return new DefaultKafkaConsumerFactory<>(props); }

참고 자료

[NHN FORWARD 22] 분산 시스템에서 데이터를 전달하는 효율적인 방법 

Last updated on