RabbitMQ를 사용하게 된 계기
- 소프티어 부트캠프에서 진행중인 두리번 프로젝트에서 유저가 출발하기를 한 후 3시간 뒤에 유저의 상태를 피드백 상태로 바꾸는 기능이 필요했다.
- 처음에는 이를 스프링의 TaskScheduler를 통해 구현했지만 메모리에 저장되는 방식이라 서비스 도중 서버 장애로 Restart 되는 경우가 발생한다면 전부 날아가 버린다.
- 또한 분산 서버 환경에서도 사용할 수 없다.(위와 같은 이유)
- 그래서 대체재를 찾던 도중 RabbitMQ에서 TTL 설정을 통한 지연메시지 발급이 가능하다는 것을 알게 되었고 이를 사용해 보기로 하였다.
RabbitMQ 구조도
- Exchange의 경우에는 로드밸런서처럼 앞단에서 라우팅하는 역할이라 생각하면 편하다.
- 또한 Direct, Topic, Fanout과 같은 서로 다른 기능을 하는 타입이 있다.
- Routing Key를 통해 Exchange에서 라우팅 처리를 하며 이후 Queue로 이동한다.
- Queue에 도착한 메시지들은 해당 Queue를 구독하고 있는 Consumer가 소모할 수 있게 된다.
RabbitMQ 구성
1. Exchange (익스체인지)
- 역할: 메시지를 받아서 적절한 큐(queue)로 전달하는 역할을 함.
- 비유: 우체국 (편지를 받아서 적절한 주소로 보내줌)
- 종류:
- Direct Exchange: 특정 라우팅 키와 일치하는 큐로 메시지를 전달.
- Fanout Exchange: 모든 큐에 메시지를 복사해서 보냄.
- Topic Exchange: 특정 패턴에 맞는 큐에 메시지를 전달.
- Headers Exchange: 메시지의 헤더 값에 따라 큐로 전달.
2. Queue (큐)
- 역할: 메시지를 보관했다가 소비자(Consumer)가 가져가도록 하는 공간.
- 비유: 우체통 (편지를 보관하고 있다가 수신자가 꺼내서 읽음)
- 예시
order_queue
라는 큐에order.created
메시지가 저장되어 있다가, 주문 서비스(Consumer)가 가져감.
3. Routing Key (라우팅 키)
- 역할: 메시지가 어떤 큐로 가야 하는지 결정하는 키.
- 비유: 주소 (편지가 어느 집으로 가야 하는지 알려주는 역할)
- 예시
order.created
라는 라우팅 키를 가진 메시지가 오면,order_queue
에 저장됨.
4. Binding (바인딩)
- 역할: 특정 큐와 익스체인지를 연결하는 설정.
- 비유: 주소록 (우체국이 특정 주소의 우체통과 연결되도록 설정)
- 예시
order_exchange
와order_queue
를order.created
라우팅 키로 바인딩하면, 해당 키의 메시지는order_queue
로 들어감.
Exchange와 Queue는 "다대다(Many-to-Many)" 관계를 가질 수 있다
이 관계는 Binding을 통해 설정 가능하다
RabbitMQ에서 x-
접두사 기능
x-
가 붙은 대표적인 RabbitMQ 확장 기능들
속성명 | 설명 |
---|---|
x-dead-letter-exchange |
메시지가 만료되거나 거부될 때 이동할 Dead Letter Exchange 설정 |
x-dead-letter-routing-key |
Dead Letter로 이동할 때 사용할 새로운 라우팅 키 |
x-message-ttl |
메시지의 Time-To-Live(살아있는 시간)를 설정 |
x-max-length |
큐에 저장할 최대 메시지 개수를 제한 |
x-max-length-bytes |
큐에 저장할 메시지들의 총 바이트 크기를 제한 |
x-expires |
큐 자체가 유지될 최대 시간을 설정 |
x-priority |
메시지의 우선순위를 설정 |
x-queue-mode |
큐 모드를 lazy 로 설정하여 디스크에 저장(메모리 절약) |
- x-dead-letter-exchange 속성의 경우 지연 메시지 발급에 이용한다.
- 아래에서 실제 프로젝트에서 사용했던 지연메시지 발급 설정 코드를 첨부한다.
RabbitMQ를 통한 지연메시지 발급 설정(Java, Spring)
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MemberRabbitMQConfig {
public static final String DELAY_QUEUE_NAME = "member.delay.queue";
public static final String DLX_QUEUE_NAME = "member.dlx.queue";
public static final String MEMBER_EXCHANGE_NAME = "member.exchange";
public static final String DELAY_ROUTING_KEY = "member.delay.key";
public static final String DLX_ROUTING_KEY = "member.dlx.key";
@Value("${spring.rabbitmq.ttl.member-status}")
private long pushDelayMs;
@Bean
public Queue memberDelayQueue() {
return QueueBuilder.durable(DELAY_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", MEMBER_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", DLX_ROUTING_KEY)
.withArgument("x-message-ttl", pushDelayMs)
.build();
}
@Bean
public Queue memberDlxQueue() {
return QueueBuilder.durable(DLX_QUEUE_NAME).build();
}
@Bean
public DirectExchange memberExchange() {
return new DirectExchange(MEMBER_EXCHANGE_NAME);
}
@Bean
public Binding memberDelayQueueBinding(Queue memberDelayQueue, DirectExchange memberExchange) {
return BindingBuilder.bind(memberDelayQueue)
.to(memberExchange)
.with(DELAY_ROUTING_KEY);
}
@Bean
public Binding memberDlxQueueBinding(Queue memberDlxQueue, DirectExchange memberExchange) {
return BindingBuilder.bind(memberDlxQueue)
.to(memberExchange)
.with(DLX_ROUTING_KEY);
}
}
- 코드에 대해 간단히 설명하면,
- 지연 메시지가 담기는 DELAY_QUEUE와 TTL이 만료된 메시지가 담기는 DLX_QUEUE 2가지가 있다.
- Exchange는 MEMBER_EXCHANGE 하나로 두고 이곳에 두가지 Queue들을 Binding 시켜 연결한다.
- 다만 주의할 점은 지연 발급된 메시지를 취소 시키고 싶을 때, 메시지 큐에 접근하여 삭제시키는 방법은 매우 비효율적이라 메시지를 받은 후 처리되지 않도록 로직을 구현해야한다.
언제 Exchange를 추가해야 할까?
1. 서로 다른 서비스에서 메시지를 분리할 때
상황
- 여러 개의 독립적인 서비스가 있고, 각각 다른 유형의 메시지를 처리해야 할 때.
- 예를 들어, "주문 처리"와 "결제 처리"가 분리된 서비스라면, 하나의 Exchange에서 관리하기 어려움.
설계
order.exchange
→ 주문 관련 메시지를 처리 (order.created
,order.cancelled
등)payment.exchange
→ 결제 관련 메시지를 처리 (payment.success
,payment.failed
등)
2. 같은 메시지를 여러 개의 소비자에게 동시에 보내야 할 때 (Fanout)
상황
- 하나의 메시지를 여러 개의 서비스에서 동시에 소비해야 할 때.
- 예를 들어, 회원 가입 메시지를 여러 개의 시스템에서 사용한다고 가정
- 이메일 알림 시스템
- 사용자 데이터 분석 시스템
- 마케팅 시스템
설계
user.registration.exchange
(Fanout Exchange) →email.queue
,analytics.queue
,marketing.queue
에 동시에 메시지 전달
3. 특정 패턴의 메시지만 필터링할 때 (Topic Exchange)
상황
- 메시지 타입이 여러 개일 때 특정 조건에 맞는 메시지만 구독하고 싶을 때.
- 예를 들어,
log.info
,log.warn
,log.error
와 같은 로그 메시지가 있을 때,error_logs.queue
→log.error
메시지만 받고all_logs.queue
→ 모든 로그 메시지를 받고 싶다면?
설계
log.exchange
(Topic Exchange) →log.error
,log.warn
,log.info
등 패턴 기반으로 라우팅.
예제
@Bean
public TopicExchange logExchange() {
return new TopicExchange("log.exchange");
}
log.error
메시지는"log.error"
라우팅 키를 설정한 큐로 전달log.*
와 같은 패턴을 사용하면"log.warn"
,"log.info"
도 동일한 큐에서 받을 수 있음
4. 메시지를 특정 그룹별로 격리할 때
상황
- 동일한 메시지 타입이어도 테넌트(고객)별로 분리해서 처리해야 할 때.
- 예를 들어, SaaS(Software-as-a-Service) 플랫폼에서 고객 A의 데이터는 고객 A의 시스템에서만 처리되어야 하는 경우.
설계
tenant-a.exchange
,tenant-b.exchange
를 각각 생성하여 해당 고객의 메시지를 개별 관리.- 이렇게 하면 고객별로 완전히 독립적인 메시징 시스템을 구축 가능.
5. Delay Queue (지연 메시지) 처리를 위해 추가할 때
상황
- 메시지를 일정 시간 후에 소비하도록 하고 싶을 때.
- 예를 들어, 15분 후에 자동 결제 취소 알림을 보내는 기능이 필요할 때.
설계
delay.exchange
→delay.queue
(지연 메시지를 저장)dlx.exchange
→dlx.queue
(지연이 끝난 후 실제로 처리할 큐)
정리
Exchange 추가 이유 | 설명 | 예제 |
---|---|---|
서비스별 메시지 분리 | 주문, 결제 등 서비스별로 메시지를 나눌 때 | order.exchange , payment.exchange |
하나의 메시지를 여러 곳에서 사용 | Fanout을 활용해 한 메시지를 여러 소비자가 받을 때 | user.registration.exchange (이메일, 마케팅, 분석) |
특정 패턴의 메시지만 필터링 | 로그 레벨별로 메시지를 다르게 보낼 때 | log.exchange (log.error , log.info ) |
특정 그룹(테넌트)별로 격리 | 고객 데이터를 분리하여 관리할 때 | tenant-a.exchange , tenant-b.exchange |
지연 메시지를 처리할 때 | 일정 시간 후에 메시지를 보내야 할 때 | delay.exchange , dlx.exchange |