RabbitMQ 설치
우선 RabbitMQ를 설치해보자.
https://www.rabbitmq.com/download.html
윈도우 사용자라면,
exe 파일로 설치가 가능하다.
만약 erlang이 설치되어 있지 않다면, erlang을 설치하라고 뜰 것이다.
*erlang은 함수형 언어인데, RabbitMQ가 erlang으로 만들어졌기 때문에 필요하다.
https://www.erlang.org/downloads
erlang을 설치하자.
erlang이 설치가 다 되면, RabbitMQ도 설치하고, /sbin 폴더를 환경변수까지 등록해주자.
기본 포트는 5672이다.
아래를 실행하면 서버가 실행된다.
cmd창을 켜고 rabbitmq-plugins enable rabbitmq_management 명령어를 실행하면 GUI로 실행할 수 있다.
포트는 15672이고, 최초 username, password는 guest이다.
연결된 서버가 없으면 no connections 라고 뜬다.
연결된 모습
Messaging Queue
우선 이런 구조로 Messaging Queue를 구현해볼 것이다.
* Messaging Queue 사용하는 이유는 크게 2가지가 있다.
1. 분산 시스템에서 유용하다. 즉, 컴포넌트 간에 메세지를 전송하고, 수신하는 시스템에서 유용하다.
2. 비동기 통신이 가능하다. 메세지를 큐에 저장하고, 수신자는 큐에서 메세지를 받아오는 구조이므로, 비동기적으로 다른 일을 처리하는 것이 가능하다.
코드
build.gradle
plugins {
id 'java'
id 'org.springframework.boot' version '3.1.0'
id 'io.spring.dependency-management' version '1.1.0'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
java {
sourceCompatibility = '17'
}
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
}
tasks.named('test') {
useJUnitPlatform()
}
application.yml
spring:
rabbitmq:
host: localhost
username: guest
password: guest
port: 5672
template:
reply-timeout: 60000
RabbitmqConfig
package com.example.rabbitmq.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
private static final String topicExchangeName = "my_topic";
private static final String queueName = "my_queue";
private static final String routingKey = "foo.bar.#";
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.template.reply-timeout}")
private Integer replyTimeout;
// durable: true는 rabbitmq가 재실행되어도 큐가 소멸되지 않는다
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(routingKey);
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
rabbitTemplate.setReplyTimeout(replyTimeout);
return rabbitTemplate;
}
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
MessageConverter messageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
return new Jackson2JsonMessageConverter(objectMapper);
}
}
ConnectionFactory는 RabbitMQ와 연결하기 위한 빈이다.
MessageConverter는 JSON 형태의 데이터를 파싱하기 위한 빈이다.
RabbitTemplate은 RabbitMQ와 비동기적 접근을 하게 해주는 Helper class이다.
ConnectionFactory 빈과 MessageConverter 빈을 등록해주자.
나머지 코드들은 amqp 프로토콜에 대해 알고 있다면 충분히 이해할 수 있을 것이다.
amqp 프로토콜에 대해 공부가 필요하다면, 아래 필자의 블로그를 참조하자.
https://white-developer.tistory.com/79
Message
package com.example.rabbitmq.model;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
@Getter
@AllArgsConstructor
@NoArgsConstructor
public class Message {
private String routingKey;
private String sender;
private String message;
}
자신이 원하는 형태의 클래스로 만들면 된다.
Message 객체에 routingKey를 담아서 전송하는 식으로 만들어 보았다.
Producer
package com.example.rabbitmq.procon;
import com.example.rabbitmq.model.Message;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class Producer {
private final RabbitTemplate rabbitTemplate;
private final TopicExchange topicExchange;
public void sendMessage(Message message) {
rabbitTemplate.convertAndSend(topicExchange.getName(), message.getRoutingKey(), message);
log.info("메세지 발신 성공");
}
}
RabbitTemplate의 convertAndSend 메서드를 사용해서, exchange, routing key, 전달할 message 객체를 넣어준다.
메서드가 정상적으로 발동됐다면 info 로그가 찍힐 것이다.
Consumer
package com.example.rabbitmq.procon;
import com.example.rabbitmq.model.Message;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@RabbitListener(queues = "#{queue.name}")
@Slf4j
public class Consumer {
@RabbitHandler
public void receiveMessage(final Message message) {
log.info("메세지 수신 성공! " + message.getSender() + ": " + message.getMessage());
}
}
클래스에 @RabbitListener를 달아주고, @RabbitHandler를 메서드에 붙여주면, 해당 메서드를 타켓 메서드로 인식한다.
매개변수에 final을 붙인 이유는 내부적으로 해당 변수를 재할당해서 사용하는 경우를 막기 위함이다.
역시 메세지를 정상적으로 수신했다면, 로그에 출력해볼 것이다.
ChatController
package com.example.rabbitmq.controller;
import com.example.rabbitmq.model.Message;
import com.example.rabbitmq.procon.Producer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
public class ChatController {
private final Producer producer;
@PostMapping("/chat")
public void sendMessage(@RequestBody Message message) {
producer.sendMessage(message);
}
}
마지막으로 컨트롤러 코드이다.
테스트
테스트는 apic으로 해보았다.
아래와 같이 Post 요청을 해보자.
참고로 routingKey를 "foo.bar.#" 으로 설정해 놓았기 때문에,
foo.bar이든 foo.bar.ok이든 foo.bar.ok.ok이든 상관없이 전송된다.
RabbitMQ GUI에서도 메세지가 전송된 것을 확인할 수 있다.
Publish까지 0.2s 걸린 것도 확인할 수 있다.
스프링부트 서버에서도 정상적으로 로그가 출력된 것을 확인할 수 있다.
Reference
https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/package-summary.html
'Skills > Springboot Chat' 카테고리의 다른 글
Spring boot 채팅 구현 #4 STOMP 기반 채팅 구현 (0) | 2023.06.15 |
---|---|
Spring boot 채팅 구현 #3 AMQP 개념 정리 (0) | 2023.06.15 |
Spring boot 채팅 구현 #2 Web Socket, STOMP, Message Broker 개념 정리 (0) | 2023.06.14 |
Spring boot 채팅 구현 #1 HTTP, Polling, Long Polling 개념 정리 (0) | 2023.06.14 |