Skip to Content
공부시스템 설계분산 시스템에서 데이터를 전달하는 효율적인 방법RabbitMQ를 사용한 전달 방법

RabbitMQ를 사용한 전달 방법

RabbitMQ 개요

RabbitMQ는 AMQP 프로토콜을 구현한 메시지 브로커다. Exchange가 하나 있고 뒤에 Queue가 달려 있는 구조다.

컴포넌트역할
Exchange메시지를 받아 적절한 Queue로 라우팅
Queue메시지를 저장
Routing KeyExchange가 Queue를 선택하는 기준

Publish-Subscribe 패턴을 지원한다. Ack (성공), Nack (실패) 응답 메커니즘을 제공한다.

메시지 처리 보장 메커니즘

Publisher에서 “나 잘 메시지를 집어넣었어”라는 과정이 Producer Confirm이고, Consumer 쪽에서 “응 나 메시지 잘 받았고 잘 처리했어”라고 하는 과정이 Consumer Acknowledgement다.

메커니즘설명
Producer ConfirmProducer → Exchange → Queue 전달 확인
Consumer AcknowledgementConsumer의 메시지 처리 확인

메시지 흐름

Producer가 메시지를 발행하면 Exchange가 먼저 받고, 적절한 Queue에 집어넣는다. Queue에 들어가면 Consumer가 메시지를 받아서 처리한다.

  • Exchange가 Queue로 라우팅하는데 성공하면 Ack, 실패하면 Nack를 Producer에게 응답한다. (Producer Confirm)
  • Consumer가 메시지를 처리하고 성공하면 Ack, 실패하면 Nack를 Queue에 응답한다. (Consumer Acknowledgement)

Producer Confirm

Producer가 발행한 메시지가 Exchange를 거쳐 Queue에 정상적으로 라우팅되었는지 확인하는 메커니즘이다.

Exchange가 “응 나 잘 라우팅 했거든”, “라우팅 하는데 실패했거든”이라는 Ack나 Nack 메시지를 응답할 수 있다. Producer 입장에서는 Ack를 받았으면 “어 잘 들어갔겠구나”, Nack를 받았으면 “실패했겠구나”라고 알 수 있다.

구현 방법

CorrelationData 클래스를 사용한다. Producer Confirm을 확인할 수 있는 기본 클래스로, Spring에서 제공한다 (org.springframework.amqp). CorrelationData가 중간 매개체 역할을 한다.

RabbitTemplate을 사용하여 메시지를 발행할 때 CorrelationData 객체를 같이 넘길 수 있고, 잘 처리됐는지 확인할 때 발행 시점에 만든 CorrelationData를 확인할 수 있다.

메시지 발행 시 CorrelationData 사용

MessagePublisher.java
@Service public class MessagePublisher { public void sendMessage(CreateTaskEvent createTaskEvent) throws JsonProcessingException { String json = objectMapper.writeValueAsString(createTaskEvent); rabbitTemplate.sned( EXCHANGE_NAME, ROUTING_KEY, new Message(json.getBytes(StandardCharsets.UTF_8)), new CorrelationData(UUID.randomUUID().toString()) ); } }

Confirm Callback 등록

RabbitTemplate의 setConfirmCallback에 구현체를 넣어준다. 3개 인자를 사용하여 Producer Confirm을 확인할 수 있다:

  • correlationData: 메시지 발행 시 생성한 객체
  • ack: 성공/실패 (boolean)
  • cause: 실패 원인 (ack=false일 때)
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); // ... rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (!ack) { Message message = correlationData.getReturned().getMessage(); byte[] body = message.getBody(); log.error("Fail to produce. ID: {}", correlationData.getId()); } }); return rabbitTemplate; }

Callback이다 보니까 실시간으로 즉각적으로 대응하기는 어렵다. 하지만 적어도 어떤 메시지가 실패했는지는 알 수 있다.

설정

Producer Confirm을 사용하려면 별도의 설정이 필요하다.

  1. Spring Boot 사용 시 application.properties에 설정:
application.properties
spring.rabbitmq.publisher-confirm-type=correlated
  1. 순수 Spring 사용 시 Connection Factory에 설정:
@Bean public ConnectionFactory rabbitConnectionFactory() { CachingConnectionFactory cf = new CachingConnectionFactory(); // ... cf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); return cf; }

Consumer Acknowledgement

Consumer가 메시지를 정상적으로 처리했는지 Queue에 알리는 메커니즘이다. Consumer가 Ack를 날리게 되면 Queue의 데이터는 사라진다.

구현 방법

Channel 객체를 사용한다. 메시지 Queue와 애플리케이션의 커넥션을 추상화한 클래스로, Publish나 Listen 모두에 사용할 수 있다. Ack와 Nack를 직접 보낼 수 있는 메서드를 제공한다 (basicAck, basicNack).

@RabbitListener에서 Channel 사용

MessageListener.java
@Component public class MessageListener { @RabbitListener(queues = "dooray.task") public void receiveMessage(Message message, Channel channel) { // 수동 ACK 전송 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 수동 NACK 전송 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }

일반적으로 리스너를 구현할 때 Message 인자만 정의한다. 하지만 Channel 객체를 인자로 선언하면 Spring Framework가 타입을 보고 적절한 객체를 주입해준다.

Channel 없어도 내부 비즈니스 로직에서 Exception이 발생하면 Spring Framework가 Nack를 날려준다. 정상적으로 잘 수행하면 자동으로 잘 처리된다. 하지만 직접적으로 수동으로 Ack와 Nack를 전송할 수 있다.

Message Listener 인터페이스를 구현하여 Connection Factory에 직접 넣어주는 경우에는 ChannelAwareMessageListener 인터페이스를 사용하면 Channel 객체를 인자로 받을 수 있다.

Dead Letter Queue

문제와 해결책

Consumer에 버그가 있어서 계속 Nack만 발생하면, Queue에 처리 안 된 메시지가 계속 쌓이고, Producer가 생산한 새로운 메시지도 계속 추가되어 Queue 크기가 폭발한다.

이런 경우 Dead Letter를 설정한다. Queue에서 정상적으로 처리하지 못한 메시지를 Dead Letter Exchange로 넘기고, Dead Letter Exchange는 Routing Key를 활용해서 Dead Letter Queue에 던져준다.

RabbitMQ에서 Dead Letter로 이동하는 조건은 크게 3가지다:

  1. requeue=false로 Reject/Nack
  2. Queue의 메시지가 오래됨 (TTL 만료)
  3. Queue가 가득 참 (Max Length 초과)

재시도 정책

명시적으로 3번 실패했을 때 Dead Letter로 보내도록 코딩할 수 있다. RetryInterceptorBuilder를 사용해서 setAdviceChain에 넣어준다.

@Bean public SimpleRabbitListenerContainerFactory retryContainerFactory(ConnectionFactory connFactory) { // ... var containerFactory = new SimpleRabbitListenerContainerFactory(); containerFactory.setConnectionFactory(connFactory); containerFactory.setAdviceChain( RetryInterceptorBuilder.stateless() .maxAttempts(3) .backOffOptions(1000, 2, 2000) .recoverer(new RejectAndDontRequeueRecoverer()) .build() ); return containerFactory; }
  • maxAttempts(3): 최대 3번만 실행
  • backOffOptions(1000, 2, 2000): 첫 번째 간격 1초, 배수 2배, 최대 간격 2초
  • recoverer: RejectAndDontRequeueRecoverer 사용 (원본 Queue에 재큐잉하지 않고 Dead Letter로)

재시도 동작 방식:

만약 10개를 처리할 때 하나 문제가 생겼으면 계속해서 퍼다 나른다. Consumer에서 멱등성 있게 코딩을 해 놓으면 정상적으로 처리된다.

Dead Letter 처리

@RabbitListener( queues = "dooray.member.deadletter", containerFactory = "deadLetterContainerFactory" ) public void onDeadLetterMessage(Map<String, Object> rawEvent) { // Alert failure or fallback }

일반적으로 사용하는 @RabbitListener를 하나 달아서 Dead Letter 메시지를 받는다.

Dead Letter 메시지를 받으면 알림을 날리거나 폴백 처리를 할 수 있고, 알림을 받으면 발행해야 되는 메시지를 로그로 남겨서 수동으로라도 보낼 수 있다.

참고 자료

[NHN FORWARD 22] 분산 시스템에서 데이터를 전달하는 효율적인 방법 

Last updated on