보라코딩
Day 23, RabbitMQ 본문
RabbitMQ
- Advanced Message Queuing Protocol 구현한 메시지 브로커
- 데이터 바이너리 메세지 받아서 저장하고 전달하는 우체국 같은 개념
- AMQP라는 표준 MQ 프로토콜로 만들어져 있고 Cluster 구성이 쉽고 ManageUI가 제공되고 성능 뛰어남
- 확장성이 뛰어남 (ManagementUI, AutoCluster, MQTT Convert, STOMP)
- Producer : 메세지 보내는 application
- Producing : 메세지 전송
- Consumer : 메세지 받는 User Applilcation
- consuming : 메세지 수신
- Queue : MailBox로 RabbitMQ 시스템 내에 위치함. 메시지는 Queue안에 저장됨
- 여러 Producer들은 하나의 Queue를 통해 메시지를 보낼 수 있고 Consumer들은 Queue로 데이터 받을 수 있음
- Publish/Subscribe : 하나의 메시지를 여러 개의 Consumer에게 전달할 때 쓰이는 패턴
- Subscribe에서 Consumer가 메시지 수신하기 위해 Queue가 실시간으로 리스닝하게 함
- Exchange : Producer가 전달한 메시지를 Queue에 전달하는 역할. 메시지가 Queue에 직접 전달되지 않고 Exchange type 정의대로 동작함
- Bindings : Exchange와 Queue 연결해줌
- Routing : Exchange가 Queue에 메세지 전달하는 과정
- RountingKey : Exchage와 Queue가 Binding 될 때 Exchage가 Queue에 메시지를 전달할지 결정하는 기준
RabbitMQ 설치 (Window)
- Erlang 설치
- RabbitMQ 설치
RabbitMQ Command Prompt
실행rabbitmq-plugins enable rabbitmq_management
입력
http://localhost:15672
로 접속해서 guest/guest로 로그인 가능- 관리자 계정 추가하기
RabbitMQ 설치 (Ubuntu)
sudo apt-key adv - keyserver "hkps.pool.sks-keyservers.net"-- recv-keys "0x6B73A36E6026DFCA"
: 저장소 서명 키 추가sudo apt-get install apt-transport-https
: APT로 받기 위해 apt-transport-https 패키지 먼저 설치sudo apt-get update -y
: 패키지 업데이트sudo apt-get install -y erlang-*
: erlang 설치sudo apt-get install -y rabbitmq-server
: rabbitmq-server 설치rabbitmq-plugins enable rabbitmq_management
: 플러그인 설치- 사용자 추가
- rabbitmqctl add_user admin 1234
- rabbitmqctl set_user_tags admin administrator
- rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
RabbitMQ 따라하기
1. Hello World!
- Nuget 패키지에서 RabbitMQ.Client 설치
- 보내기
- 큐 생성
- 메세지 생성
- 큐에 메세지 전송(publish)
- 메세지 전송 확인
- 받기
- 큐 생성
- consumer 생성
- 메시지 수신 시 호출될 핸들러 등록(consume)
- hello 이름의 큐에서 메시지 수신
Send
using RabbitMQ.Client;
using System.Text;
namespace Send
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false, // durable : RabbitMQ 서버 재시작해도 큐 살아있게
exclusive: false, // exclusive : 하나의 connection에서만 사용되며 닫힐때 queue 닫힘
autoDelete: false, // autoDelete : 마지막 consumer가 구독 취소시 하나 이상의 consumer가 있는 queue 삭제
arguments: null); // arguments : 플러그인 또는 큐 길이 제한 등의 기능
// 메세지 생성
string message = "Hello ~ MIRERO~ ";
var body = Encoding.UTF8.GetBytes(message);
// hello라는 이름의 큐에 메세지 전송
channel.BasicPublish(exchange: "", // 기본 exchage 사용
routingKey: "hello", // 메세지 전송될 큐 지정
basicProperties: null, // 메세지 추가 속성 설정 안함
body: body); // body 속성을 바이트 배열로 설정
// 메세지 전송 확인
Console.WriteLine("[x] Sent {0}", message);
}
Console.WriteLine("Press [enter] to exit");
Console.ReadLine();
}
}
}
Receive
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
namespace Receive
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
// 메시지 수신 이벤트 처리할 consumer 생성
var consumer = new EventingBasicConsumer(channel);
// Received 이벤트에 구독해서 메시지 수신 시 호출될 핸들러 등록
consumer.Received += (model, ea) =>
{
// 바이트 배열 가져옴
var body = ea.Body.ToArray();
// 디코딩
var message = Encoding.UTF8.GetString(body);
};
// hello 이름의 큐에서 메시지 수신
channel.BasicConsume(queue: "hello",
autoAck: true, // 메시지 처리한 후 자동으로 확인
consumer: consumer);
Console.WriteLine(" Press [enter] to exit. ");
Console.ReadLine();
}
}
}
}
- Task Queue : 병렬 작업 가능
2. Work Queues
Worker
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Channels;
namespace Worker
{
internal class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue", // task_queue 이름의 큐 생성
durable: true, // RabbitMQ 서버 재시작시 큐 삭제되지 않도록
exclusive: false, // 하나의 connection에서만 사용되도록
autoDelete: false, // 마지막 consumer가 구독 취소시 큐 삭제되지 않도록
arguments: null); // 추가 설정 안함
// count 1이면 한번에 하나의 메세지만 처리 가능
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done ");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// task_queue 이름의 큐에서 메세지 수신 시작함
channel.BasicConsume(queue: "task_queue",
autoAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit ");
Console.ReadLine();
}
}
}
}
NewTask
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading.Channels;
namespace NewTask
{
internal class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// task_queue라는 이름의 큐 생성
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "task_queue", // task_queue 큐에 메시지를 전송
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
// 명령줄 인수가 있으면 그것을 메세지로 사용하고 그렇지 않으면 hello wow를 메세지로 사용함
return ((args.Length > 0) ? string.Join(" ", args) : "Hello wow~~~");
}
}
}
3. Publish / Subscribe
EmitLog
using RabbitMQ.Client;
using System.Text;
namespace EmitLog
{
internal class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes((string)message); //string 바꾸는게 맞나요
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit");
Console.ReadLine();
}
private static object GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello MIRERO~~~");
}
}
}
ReceiveLogs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
namespace ReceiveLogs
{
internal class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] {0}", message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit ");
Console.ReadLine();
}
}
}
}
4. Routing
EmitLogDirect
using RabbitMQ.Client;
using System.Text;
namespace EmitLogDirect
{
internal class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// direct_logs 이름의 직접 교환기 선언
// 라우팅키 기반으로 특정 큐 전달
channel.ExchangeDeclare(exchange: "direct_logs",
type: "direct");
// 메시지의 심각도 가져옴
var severity = (args.Length > 0) ? args[0] : "info";
// 메시지 가져옴
var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World~~~";
var body = Encoding.UTF8.GetBytes(message);
// direct_logs 교환기에 메시지 보냄
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity, // 라우팅키 지정
basicProperties: null,
body: body);
// 발행한 메시지 정보 출력
Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
}
Console.WriteLine(" Press [enter] to exit");
Console.ReadLine();
}
}
}
ReceiveLogsDirect
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
namespace ReceiveLogsDirect
{
internal class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
// direct_logs 이름의 직접 교환기 선언
// 라우팅키 기반으로 특정 큐 전달
channel.ExchangeDeclare(exchange: "direct_logs",
type: "direct");
// 큐 이름 자동 생성
var queueName = channel.QueueDeclare().QueueName;
// 명령줄에서 라우팅 키 받아오지 못한 경우 사용법 출력하고 종료
if(args.Length < 1)
{
Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit ");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
// 명령줄에서 받은 라우팅키에 해당하는 큐와 교환기 바인딩!
foreach(var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
Console.WriteLine(" [*] Waiting for messages");
// 메세지 받을 수 있는 consumer 생성
var consumer = new EventingBasicConsumer(channel);
// 메시지 수신 이벤트 핸들러 정의
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'",
routingKey, message);
};
// 큐에서 메시지 consume
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit ");
Console.ReadLine();
}
}
}
}
'개발자가 되었다?' 카테고리의 다른 글
Day 25, JS, React (0) | 2023.09.19 |
---|---|
Day 24, JS (0) | 2023.09.18 |
Day 22, Akka (0) | 2023.09.14 |
Day 21, EF Core (0) | 2023.09.13 |
Day 20, C# (0) | 2023.09.12 |