본문 바로가기
Skills/Springboot Chat

Spring boot 채팅 구현 #5 RabbitMQ 기반 메시지 큐 구현

by Hoseok 2023. 6. 23.
728x90
반응형

 

 

RabbitMQ 설치

 

우선 RabbitMQ를 설치해보자.

 

https://www.rabbitmq.com/download.html

 

Downloading and Installing RabbitMQ — RabbitMQ

Downloading and Installing RabbitMQ The latest release of RabbitMQ is 3.12.0. See change log for release notes. See RabbitMQ support timeline to find out what release series are supported. Experimenting with RabbitMQ on your workstation? Try the community

www.rabbitmq.com

 

윈도우 사용자라면,

 

 

 

exe 파일로 설치가 가능하다.

 

 

 

만약 erlang이 설치되어 있지 않다면, erlang을 설치하라고 뜰 것이다.

 

*erlang은 함수형 언어인데, RabbitMQ가 erlang으로 만들어졌기 때문에 필요하다.

 

https://www.erlang.org/downloads

 

Index - Erlang/OTP

The official home of the Erlang Programming Language

www.erlang.org

 

 

erlang을 설치하자.

 

 

erlang이 설치가 다 되면, RabbitMQ도 설치하고, /sbin 폴더를 환경변수까지 등록해주자.

 

 

기본 포트는 5672이다.

 

 

 

아래를 실행하면 서버가 실행된다.

 

 

cmd창을 켜고 rabbitmq-plugins enable rabbitmq_management 명령어를 실행하면 GUI로 실행할 수 있다.

 

포트는 15672이고, 최초 username, password는 guest이다.

 

연결된 서버가 없으면 no connections 라고 뜬다.

 

 

연결된 모습

 

 

Messaging Queue

 

 

우선 이런 구조로 Messaging Queue를 구현해볼 것이다.

 

* Messaging Queue 사용하는 이유는 크게 2가지가 있다.

 

1. 분산 시스템에서 유용하다. 즉, 컴포넌트 간에 메세지를 전송하고, 수신하는 시스템에서 유용하다.

 

2. 비동기 통신이 가능하다. 메세지를 큐에 저장하고, 수신자는 큐에서 메세지를 받아오는 구조이므로, 비동기적으로 다른 일을 처리하는 것이 가능하다.

 

 

코드

 

build.gradle

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.1.0'
    id 'io.spring.dependency-management' version '1.1.0'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
    sourceCompatibility = '17'
}

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    compileOnly 'org.projectlombok:lombok'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.amqp:spring-rabbit-test'
}

tasks.named('test') {
    useJUnitPlatform()
}

 

 

application.yml

spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    port: 5672
    template:
      reply-timeout: 60000

 

 

RabbitmqConfig

package com.example.rabbitmq.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {

    private static final String topicExchangeName = "my_topic";

    private static final String queueName = "my_queue";

    private static final String routingKey = "foo.bar.#";

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.template.reply-timeout}")
    private Integer replyTimeout;

    // durable: true는 rabbitmq가 재실행되어도 큐가 소멸되지 않는다
    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange).with(routingKey);
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter);
        rabbitTemplate.setReplyTimeout(replyTimeout);
        return rabbitTemplate;
    }

    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    MessageConverter messageConverter() {
        ObjectMapper objectMapper = new ObjectMapper();
        return new Jackson2JsonMessageConverter(objectMapper);
    }

}

 

ConnectionFactory는 RabbitMQ와 연결하기 위한 빈이다.

 

MessageConverter는 JSON 형태의 데이터를 파싱하기 위한 빈이다.

 

RabbitTemplateRabbitMQ와 비동기적 접근을 하게 해주는 Helper class이다.

ConnectionFactory 빈과 MessageConverter 빈을 등록해주자.

 

나머지 코드들은 amqp 프로토콜에 대해 알고 있다면 충분히 이해할 수 있을 것이다.

 

amqp 프로토콜에 대해 공부가 필요하다면, 아래 필자의 블로그를 참조하자.

 

https://white-developer.tistory.com/79

 

Spring boot 채팅 구현 #3 AMQP 개념 정리

이 포스팅을 작성하기 위해, RabbitMQ 공식 사이트를 참조하였습니다. https://www.rabbitmq.com/tutorials/amqp-concepts.html AMQP 0-9-1 Model Explained — RabbitMQ AMQP 0-9-1 Model Explained This guide provides an overview of the AMQP 0

white-developer.tistory.com

 

 

Message

package com.example.rabbitmq.model;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    private String routingKey;
    private String sender;
    private String message;
}

자신이 원하는 형태의 클래스로 만들면 된다.

 

Message 객체에 routingKey를 담아서 전송하는 식으로 만들어 보았다.

 

 

Producer

package com.example.rabbitmq.procon;

import com.example.rabbitmq.model.Message;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Slf4j
public class Producer {

    private final RabbitTemplate rabbitTemplate;
    private final TopicExchange topicExchange;

    public void sendMessage(Message message) {
        rabbitTemplate.convertAndSend(topicExchange.getName(), message.getRoutingKey(), message);
        log.info("메세지 발신 성공");
    }
}

 

RabbitTemplateconvertAndSend 메서드를 사용해서, exchange, routing key, 전달할 message 객체를 넣어준다.

 

메서드가 정상적으로 발동됐다면 info 로그가 찍힐 것이다.

 

 

Consumer

package com.example.rabbitmq.procon;

import com.example.rabbitmq.model.Message;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@RabbitListener(queues = "#{queue.name}")
@Slf4j
public class Consumer {

    @RabbitHandler
    public void receiveMessage(final Message message) {

        log.info("메세지 수신 성공! " + message.getSender() + ": " + message.getMessage());
    }
}

 

클래스에 @RabbitListener를 달아주고, @RabbitHandler를 메서드에 붙여주면, 해당 메서드를 타켓 메서드로 인식한다.

 

매개변수에 final을 붙인 이유는 내부적으로 해당 변수를 재할당해서 사용하는 경우를 막기 위함이다.

 

역시 메세지를 정상적으로 수신했다면, 로그에 출력해볼 것이다.

 

 

ChatController

package com.example.rabbitmq.controller;

import com.example.rabbitmq.model.Message;
import com.example.rabbitmq.procon.Producer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class ChatController {

    private final Producer producer;

    @PostMapping("/chat")
    public void sendMessage(@RequestBody Message message) {
        producer.sendMessage(message);
    }

}

 

마지막으로 컨트롤러 코드이다.

 

테스트

 

테스트는 apic으로 해보았다.

 

아래와 같이 Post 요청을 해보자.

 

참고로 routingKey를 "foo.bar.#" 으로 설정해 놓았기 때문에,

 

foo.bar이든 foo.bar.ok이든 foo.bar.ok.ok이든 상관없이 전송된다.

 

 

 

RabbitMQ GUI에서도 메세지가 전송된 것을 확인할 수 있다.

 

Publish까지 0.2s 걸린 것도 확인할 수 있다.

 

 

스프링부트 서버에서도 정상적으로 로그가 출력된 것을 확인할 수 있다.

 

 

 

 

Reference

 

https://www.springcloud.io/post/2022-03/messaging-using-rabbitmq-in-spring-boot-application/#gsc.tab=0

 

Messaging with RabbitMQ in Spring Boot Application

In this article, we will learn about RabbitMQ and explore its common use-cases. We will also walk through a step-by-step guide to implement messaging using RabbitMQ in a Spring Boot Application and will see how to publish and consume messages in a queue us

www.springcloud.io

 

https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/package-summary.html

 

org.springframework.amqp.rabbit (Spring AMQP 3.0.5 API)

package org.springframework.amqp.rabbit Provides top-level classes for Spring Rabbit.

docs.spring.io

 

728x90
반응형