보라코딩
Day 23, RabbitMQ 본문
RabbitMQ
- Advanced Message Queuing Protocol 구현한 메시지 브로커
 - 데이터 바이너리 메세지 받아서 저장하고 전달하는 우체국 같은 개념
 - AMQP라는 표준 MQ 프로토콜로 만들어져 있고 Cluster 구성이 쉽고 ManageUI가 제공되고 성능 뛰어남
 - 확장성이 뛰어남 (ManagementUI, AutoCluster, MQTT Convert, STOMP)
 - Producer : 메세지 보내는 application
 - Producing : 메세지 전송
 - Consumer : 메세지 받는 User Applilcation
 - consuming : 메세지 수신
 - Queue : MailBox로 RabbitMQ 시스템 내에 위치함. 메시지는 Queue안에 저장됨
- 여러 Producer들은 하나의 Queue를 통해 메시지를 보낼 수 있고 Consumer들은 Queue로 데이터 받을 수 있음
 
 - Publish/Subscribe : 하나의 메시지를 여러 개의 Consumer에게 전달할 때 쓰이는 패턴
- Subscribe에서 Consumer가 메시지 수신하기 위해 Queue가 실시간으로 리스닝하게 함
 
 - Exchange : Producer가 전달한 메시지를 Queue에 전달하는 역할. 메시지가 Queue에 직접 전달되지 않고 Exchange type 정의대로 동작함
 - Bindings : Exchange와 Queue 연결해줌
 - Routing : Exchange가 Queue에 메세지 전달하는 과정
 - RountingKey : Exchage와 Queue가 Binding 될 때 Exchage가 Queue에 메시지를 전달할지 결정하는 기준
 
RabbitMQ 설치 (Window)
- Erlang 설치
 - RabbitMQ 설치
 RabbitMQ Command Prompt실행rabbitmq-plugins enable rabbitmq_management입력
http://localhost:15672로 접속해서 guest/guest로 로그인 가능- 관리자 계정 추가하기
 

RabbitMQ 설치 (Ubuntu)
sudo apt-key adv - keyserver "hkps.pool.sks-keyservers.net"-- recv-keys "0x6B73A36E6026DFCA": 저장소 서명 키 추가sudo apt-get install apt-transport-https: APT로 받기 위해 apt-transport-https 패키지 먼저 설치sudo apt-get update -y: 패키지 업데이트sudo apt-get install -y erlang-*: erlang 설치sudo apt-get install -y rabbitmq-server: rabbitmq-server 설치rabbitmq-plugins enable rabbitmq_management: 플러그인 설치- 사용자 추가
- rabbitmqctl add_user admin 1234
 - rabbitmqctl set_user_tags admin administrator
 - rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
 
 
RabbitMQ 따라하기
1. Hello World!

- Nuget 패키지에서 RabbitMQ.Client 설치
 - 보내기
- 큐 생성
 - 메세지 생성
 - 큐에 메세지 전송(publish)
 - 메세지 전송 확인
 
 
- 받기
- 큐 생성
 - consumer 생성
 - 메시지 수신 시 호출될 핸들러 등록(consume)
 - hello 이름의 큐에서 메시지 수신
 
 
Send
using RabbitMQ.Client;
using System.Text;
namespace Send
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                        durable: false, // durable : RabbitMQ 서버 재시작해도 큐 살아있게
                                        exclusive: false, // exclusive : 하나의 connection에서만 사용되며 닫힐때 queue 닫힘
                                        autoDelete: false, // autoDelete : 마지막 consumer가 구독 취소시 하나 이상의 consumer가 있는 queue 삭제
                                        arguments: null); // arguments : 플러그인 또는 큐 길이 제한 등의 기능
                // 메세지 생성
                string message = "Hello ~ MIRERO~ ";
                var body = Encoding.UTF8.GetBytes(message);
                // hello라는 이름의 큐에 메세지 전송
                channel.BasicPublish(exchange: "", // 기본 exchage 사용
                    routingKey: "hello", // 메세지 전송될 큐 지정
                    basicProperties: null, // 메세지 추가 속성 설정 안함
                    body: body); // body 속성을 바이트 배열로 설정
                // 메세지 전송 확인
                Console.WriteLine("[x] Sent {0}", message);
            }
            Console.WriteLine("Press [enter] to exit");
            Console.ReadLine();
        }
    }
}
Receive
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
namespace Receive
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                    durable: false,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);
                // 메시지 수신 이벤트 처리할 consumer 생성
                var consumer = new EventingBasicConsumer(channel);
                // Received 이벤트에 구독해서 메시지 수신 시 호출될 핸들러 등록
                consumer.Received += (model, ea) =>
                {
                    // 바이트 배열 가져옴
                    var body = ea.Body.ToArray();
                    // 디코딩
                    var message = Encoding.UTF8.GetString(body);
                };
                // hello 이름의 큐에서 메시지 수신
                channel.BasicConsume(queue: "hello",
                                    autoAck: true, // 메시지 처리한 후 자동으로 확인
                                    consumer: consumer);
                Console.WriteLine(" Press [enter] to exit. ");
                Console.ReadLine();
            }
        }
    }
}

- Task Queue : 병렬 작업 가능
 
2. Work Queues

Worker
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Channels;
namespace Worker
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue", // task_queue 이름의 큐 생성
                                    durable: true, // RabbitMQ 서버 재시작시 큐 삭제되지 않도록
                                    exclusive: false, // 하나의 connection에서만 사용되도록
                                    autoDelete: false, // 마지막 consumer가 구독 취소시 큐 삭제되지 않도록
                                    arguments: null); // 추가 설정 안함
                // count 1이면 한번에 하나의 메세지만 처리 가능
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                Console.WriteLine(" [*] Waiting for messages.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);
                    int dots = message.Split('.').Length - 1;
                    Thread.Sleep(dots * 1000);
                    Console.WriteLine(" [x] Done ");
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                // task_queue 이름의 큐에서 메세지 수신 시작함
                channel.BasicConsume(queue: "task_queue",
                                    autoAck: false,
                                    consumer: consumer);
                Console.WriteLine(" Press [enter] to exit ");
                Console.ReadLine();
            }
        }
    }
}
NewTask
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading.Channels;
namespace NewTask
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    // task_queue라는 이름의 큐 생성
                    channel.QueueDeclare(queue: "task_queue",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);
                    var message = GetMessage(args);
                    var body = Encoding.UTF8.GetBytes(message);
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    channel.BasicPublish(exchange: "",
                        routingKey: "task_queue", // task_queue 큐에 메시지를 전송
                        basicProperties: properties,
                        body: body);
                    Console.WriteLine(" [x] Sent {0}", message);
                }
            }
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
        private static string GetMessage(string[] args)
        {
            // 명령줄 인수가 있으면 그것을 메세지로 사용하고 그렇지 않으면 hello wow를 메세지로 사용함
            return ((args.Length > 0) ? string.Join(" ", args) : "Hello wow~~~");
        }
    }
}
3. Publish / Subscribe

EmitLog
using RabbitMQ.Client;
using System.Text;
namespace EmitLog
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");
                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes((string)message);         //string 바꾸는게 맞나요
                channel.BasicPublish(exchange: "logs",
                                routingKey: "",
                                basicProperties: null,
                                body: body);
                Console.WriteLine(" [x] Sent {0}", message);
            }
            Console.WriteLine(" Press [enter] to exit");
            Console.ReadLine();
        }
        private static object GetMessage(string[] args)
        {
            return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello MIRERO~~~");
        }
    }
}
ReceiveLogs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
namespace ReceiveLogs
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");
                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                exchange: "logs",
                                routingKey: "");
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                      autoAck: true,
                                      consumer: consumer);
                Console.WriteLine(" Press [enter] to exit ");
                Console.ReadLine();
            }
        }
    }
}
4. Routing

EmitLogDirect
using RabbitMQ.Client;
using System.Text;
namespace EmitLogDirect
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                // direct_logs 이름의 직접 교환기 선언
                // 라우팅키 기반으로 특정 큐 전달
                channel.ExchangeDeclare(exchange: "direct_logs",
                                            type: "direct");
                // 메시지의 심각도 가져옴
                var severity = (args.Length > 0) ? args[0] : "info";
                // 메시지 가져옴
                var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World~~~";
                var body = Encoding.UTF8.GetBytes(message);
                // direct_logs 교환기에 메시지 보냄
                channel.BasicPublish(exchange: "direct_logs",
                                     routingKey: severity, // 라우팅키 지정
                                     basicProperties: null,
                                     body: body);
                // 발행한 메시지 정보 출력
                Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
            }
            Console.WriteLine(" Press [enter] to exit");
            Console.ReadLine();
        }
    }
}
ReceiveLogsDirect
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
namespace ReceiveLogsDirect
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() {  HostName = "localhost" };
            using(var connection = factory.CreateConnection()) 
            using(var channel = connection.CreateModel())
            {
                // direct_logs 이름의 직접 교환기 선언
                // 라우팅키 기반으로 특정 큐 전달
                channel.ExchangeDeclare(exchange: "direct_logs",
                                        type: "direct");
                // 큐 이름 자동 생성
                var queueName = channel.QueueDeclare().QueueName;
                // 명령줄에서 라우팅 키 받아오지 못한 경우 사용법 출력하고 종료
                if(args.Length < 1)
                {
                    Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                            Environment.GetCommandLineArgs()[0]);
                    Console.WriteLine(" Press [enter] to exit ");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }
                // 명령줄에서 받은 라우팅키에 해당하는 큐와 교환기 바인딩!
                foreach(var severity in args)
                {
                    channel.QueueBind(queue: queueName,
                                      exchange: "direct_logs",
                                      routingKey: severity);
                }
                Console.WriteLine(" [*] Waiting for messages");
                // 메세지 받을 수 있는 consumer 생성
                var consumer = new EventingBasicConsumer(channel);
                // 메시지 수신 이벤트 핸들러 정의
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" [x] Received '{0}':'{1}'",
                                        routingKey, message);
                };
                // 큐에서 메시지 consume
                channel.BasicConsume(queue: queueName,
                                        autoAck: true,
                                        consumer: consumer);
                Console.WriteLine(" Press [enter] to exit ");
                Console.ReadLine();
            }
        }
    }
}
'개발자가 되었다?' 카테고리의 다른 글
| Day 25, JS, React (0) | 2023.09.19 | 
|---|---|
| Day 24, JS (0) | 2023.09.18 | 
| Day 22, Akka (0) | 2023.09.14 | 
| Day 21, EF Core (0) | 2023.09.13 | 
| Day 20, C# (0) | 2023.09.12 |