게임 옥션 서버

RabbitMQ를 적용한 메시지 처리 방식 구현

KimGeonWoo 2023. 10. 23. 21:02

현재 프로젝트에서 MessageQueue를 사용하기 전에는 서비스 계층에서 Mapper를 사용한 쿼리문을 다루었습니다.

DBMS에 장애가 발생하여 입찰시 구매자의 돈은 소모되었으나 판매자가 등록한 상품을 획득하지 못하는 데이터 손실 혹은 장애 복구 후 A사용자가 먼저 즉시구매한 상품이 B사용자가 상품을 받을 수 있는 상황같은 장애복구시의 순서 보장같은 문제들을 예방하고, 해결하기위해 MessageQueue중 하나인 RabbitMQ를 도입하였습니다.

RabbitMQ는 고가용성과 비동기성, 확장성 등의 장점들을 가지고있는 메시지 큐 중 하나이며, 이번 포스팅에서 RabbitMQ란 어떤건지, 그리고 어떻게 구현을 하였는지에 대해 다뤄보겠습니다.

메시지 큐

메시지 큐(Message Queue)는 어플리케이션에서 다른 어플리케이션이 사용할 수 있도록 생성하는 데이터 패킷들을 전송되는 순서(FIFO)를 보장하며, 이용하는 어플리케이션에서 해당 메시지를 처리할 수 있는 상태가 될때까지 저장하는 큐를 의미합니다.

 

메시지 큐의 특징 :

  • 메시지 큐는 메시지를 임시로 저장하는 버퍼를 제공한다.
  • 메시지는 생산자(Producer)를 통해 메시지를 특정 큐 대기열에 추가하며, 소비자(Consumer)를 통해 지정한 큐의 대기열에 저장되어있는 메시지의 작업을 수행한다. 
  • Producer와 Consumer의 관계는 1:1 혹은 1:N 관계일 수 있다.

※FIFO(First-In-First-Out) : 선입 선출의 형태로 먼저 들어온 데이터가 먼저 사용되는 방식

 

메시지 큐의 장점 :

  • 비동기 : 생산자(Producer)는 EnQueue시 응답을 기다리지 않고 다른 작업을 수행 가능하다.
  • 고가용성(High Availability) : 클러스터를 구성하여 고가용성을 유지할 수 있다.
  • 데이터 유실 방지 : 큐 저장한 메시지는 서버 장애가 발생시에도 전달되지 않은 메시지를 삭제하지 않는다.
  • 확장성 : 낮은 결합성을 유지함으로써 시스템 일부가 장애가 발생하더라도 전체적으로 영향을 받지 않는다.

 

RabbitMQ

 

RabbitMQ의 정의 및 주요 특징 : 

  • Erlang으로 AMQP 프로토콜을 구현한 오픈소스 메시지 브로커 방법중 하나로 비동기 형태를 가지고 있다.
  • AMQP(Advanced Message Queuing Protocol) : 클라이언트가 메시지 미들웨어 브로커와 통신할 수 있도록 해주는 메시지 프로토콜
  • Queue : 선입선출(FIFO)의 구조로 먼저 들어온 메시지를 먼저 전송하는 형태를 가지고 있다.
  • Producer -> Broker[Exchange - Binding - Queue] -> Consumer 형태를 가지고 있다.
  • Exchange : Producer들에게 전달받은 메시지들이 어떤 Queue에게 전송할 것인지에 대한 역할을 담당한다.
    • Direct Exchange : BindingKey를 사용하여 특정 Queue에 직접 전송한다.
    • Topic Exchange : RoutingPattern과 매칭하여 각각의 패턴에 맞는 Queue와 매칭한다.
    • Fanout Exchange : 메시지의 내용을 고려하지 않고 바인딩된 모든 Queue에 같은 메시지를 전송한다.
  • RoutingKey : Exchange로 전달된 메시지가 어떤 큐에 전송할지를 담당하는 역할
  • Binding : 생성된 Exchange에서 전달받은 메시지를 원하는 Queue에게 전달하기위해 정의하는 규칙이다.

RabbitMQ의 장점 :

  • RabbitMQ는 메시지 전송 및 게시자 확인을 통한 메시지 전달을 보장한다.
  • 낮은 결합성을 통한 DB서버가 응답할때까지 대기하지 않아 DB장애시에도 어플리케이션은 정상작동할 수 있다.
  • 모니터링을 통한 데이터 전송 확인 및 장애 발생시의 문제점을 확인하고 대처할 수 있다.

 

MessageQueue 도입

준비

  • Gradle.build
	implementation 'org.springframework.boot:spring-boot-starter-amqp'
  • application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

 

 

RabbitMQConfig

    @Value("${spring.rabbitmq.host}")
    private String rabbitmqHost;

    @Value("${spring.rabbitmq.port}")
    private int rabbitmqPort;

    @Value("${spring.rabbitmq.username}")
    private String rabbitmqUsername;

    @Value("${spring.rabbitmq.password}")
    private String rabbitmqPassword;

    public static final String bidQueueName = "Bid_Queue";
    public static final String exchangeName = "Bid_Exchange";
    public static final String routingKey = "Bid_Routing";

@Value("${}") : application.properties에서 설정한 내용을 각각의 변수에 주입한다.

 

    @Bean
    public Queue queue() {
        return QueueBuilder.durable(bidQueueName)
                .withArgument(deadExchangeName, "")
                .withArgument(deadRoutingKey, deadBidQueueName)
                .build();
    }

    @Bean
    public DirectExchange exchange() {

        return new DirectExchange(exchangeName);
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    }


    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(rabbitmqPort);
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        return connectionFactory;
    }

@Bean을 주입한 메서드들은 RabbitMQ에서 사용하려는 Queue와 Exchange방식들을 Binding을 사용하여 설정을 하고있으며 ConnectionFactory에서는 사용되는 큐들이 어느곳에서 사용할 지에대한 설정을 하고 있습니다.

 

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

RabbitTemplate를 사용하여 어디로 전송할지, 그리고 직렬화와 역직렬화의 방식에 대한 설정입니다.

RestAPI 환경에서는 Request정보들을 JSON형식으로 받아와서 사용하여 Jackson2JsonMessageConverter()를 사용하였습니다.

 

 

 

MQProducer 

@Component
@RequiredArgsConstructor
public class BidMQProducer {

    private final RabbitTemplate rabbitTemplate;

    public void ProduceBidData(BidWithUserDTO bidWithUserDTO)
    {
        /**
         * RabbitMQConfig에서 빈으로 설정한 RabbitTemplate를 사용하여 메시지를 큐에 전송합니다.
         * RabbitTemplate는 SpringAMCP 프레임워크를 사용하여 고수준으로 사용이 가능한 Produce방법중 하나입니다.

         * */

        rabbitTemplate.convertAndSend(RabbitMQConfig.exchangeName,RabbitMQConfig.routingKey, bidWithUserDTO);
    }

}

 

 

RabbitTemplate : BidMQProducer 객체를 통해 받아오는 BidWithUserDTO의 정보를 직렬화를 통해 메시지로 변환, 전송 하기위해 사용합니다.

 

.convertAndSend : RabbitMQConfig 클래스에서 설정한 setMessageConverter를 기반으로 직렬화를 메시지 큐에 전송합니다.

 

 

MQConsumer

    @RabbitListener(queues = RabbitMQConfig.bidQueueName)
    public void receiveMessageQueueWithBidData(BidWithUserDTO bidWithUserDTO) {
        if (bidWithUserDTO.getPirceGold() == bidWithUserDTO.getBid().getPrice()) {
            bidMapper.updateInstantBid(bidWithUserDTO);

        }

        bidMapper.updateUserGold(bidWithUserDTO.getUserInfo().getUserId(),
                -bidWithUserDTO.getPirceGold());

        if(bidWithUserDTO.getBid().getHighestBidderId() != null)
        {
            bidMapper.updateUserGold(bidWithUserDTO.getBid().getHighestBidderId(),
                    bidWithUserDTO.getBid().getPresentPrice());
        }
    }

@RabbitListener의 동작순서

1. RabbitMQConfig에서 빈으로 등록한 Queue,Exchange,Routing을 사용하여 RabbitMQ에 메시지를 전송한다.(Producer)

2. Producer에서 전송된 메시지를 queue Attribute를 사용하여 맵핑한 큐와 일치한 메시지를 받아온다.

3. @RabbitListener 메서드의 파라미터를 역직렬화하여 객체 혹은 문자열 등으로 변환하여 받아온다.

4. 전송받아 역직렬화 한 메시지를 메서드 내의 로직을 수행한다.

 

비즈니스 로직내에서 Message 객체를 전송하는 예시코드

@Service
@AllArgsConstructor
public class BidService {
    private BidMQProducer bidMQProducer;
    
    public Bid updateItemWithBid(int bidId, String userId, int priceGold) {
       
        Bid bidItem = bidItemDAO.readBidWithCache(bidId);
        
        RequestUserInfo userInfo = userMapper.readUserInfo(userId);
        
        BidWithUserDTO bidWithUserDTO = BidWithUserDTO.builder()
                .bid(bidItem)
                .userInfo(userInfo)
                .pirceGold(priceGold)
                .build();
                
        bidMQProducer.ProduceBidData(bidWithUserDTO);
    }

BidMQProducer의 객체를 생성하여 해당 객체의 메서드를 호출, 메시지로 전송하려는 정보들을 파라미터로 전송하여 사용합니다.

 

결과

BidService내의 bidMQProducer를 호출시에 메시지가 정상적으로 전송되어있는것을 확인할 수 있습니다.

이로써 game-auction-server 모듈과 사용하는 DB서버가 분리되어 낮은결합성을 유지하여 확장성이 향상,

DBMS에서 장애가 발생하더라도 전송하려는 메시지들은 큐에 저장되어 데이터 유실방지,

그리고 Producer는 Enqueue시에 응답을 기다리지 않고 실행하는것과 동시에 다른 작업을 처리할수있다는 비동기성의 장점을 가질수 있게 되었습니다.

 

다음 포스팅에서는  application.yml의 설정을 통해 처리하려는 메시지가 정상적으로 처리되지 않을경우에 DeadLetterQueue방식을 사용하여 메시지를 재처리하는 방법에 대해 알아보겠습니다.