보라코딩

Day 23, RabbitMQ 본문

개발자가 되었다?

Day 23, RabbitMQ

new 보라 2023. 9. 15. 18:27

RabbitMQ

  • Advanced Message Queuing Protocol 구현한 메시지 브로커
  • 데이터 바이너리 메세지 받아서 저장하고 전달하는 우체국 같은 개념
  • AMQP라는 표준 MQ 프로토콜로 만들어져 있고 Cluster 구성이 쉽고 ManageUI가 제공되고 성능 뛰어남
  • 확장성이 뛰어남 (ManagementUI, AutoCluster, MQTT Convert, STOMP)
  • Producer : 메세지 보내는 application
  • Producing : 메세지 전송
  • Consumer : 메세지 받는 User Applilcation
  • consuming : 메세지 수신
  • Queue : MailBox로 RabbitMQ 시스템 내에 위치함. 메시지는 Queue안에 저장됨
    • 여러 Producer들은 하나의 Queue를 통해 메시지를 보낼 수 있고 Consumer들은 Queue로 데이터 받을 수 있음
  • Publish/Subscribe : 하나의 메시지를 여러 개의 Consumer에게 전달할 때 쓰이는 패턴
    • Subscribe에서 Consumer가 메시지 수신하기 위해 Queue가 실시간으로 리스닝하게 함
  • Exchange : Producer가 전달한 메시지를 Queue에 전달하는 역할. 메시지가 Queue에 직접 전달되지 않고 Exchange type 정의대로 동작함
  • Bindings : Exchange와 Queue 연결해줌
  • Routing : Exchange가 Queue에 메세지 전달하는 과정
  • RountingKey : Exchage와 Queue가 Binding 될 때 Exchage가 Queue에 메시지를 전달할지 결정하는 기준

 

RabbitMQ 설치 (Window)

  1. Erlang 설치
  2. RabbitMQ 설치
  3. RabbitMQ Command Prompt 실행
    • rabbitmq-plugins enable rabbitmq_management 입력
  4. http://localhost:15672로 접속해서 guest/guest로 로그인 가능
  5. 관리자 계정 추가하기

 

 

RabbitMQ 설치 (Ubuntu)

  1. sudo apt-key adv - keyserver "hkps.pool.sks-keyservers.net"-- recv-keys "0x6B73A36E6026DFCA" : 저장소 서명 키 추가
  2. sudo apt-get install apt-transport-https : APT로 받기 위해 apt-transport-https 패키지 먼저 설치
  3. sudo apt-get update -y : 패키지 업데이트
  4. sudo apt-get install -y erlang-* : erlang 설치
  5. sudo apt-get install -y rabbitmq-server : rabbitmq-server 설치
  6. rabbitmq-plugins enable rabbitmq_management : 플러그인 설치
  7. 사용자 추가
    • rabbitmqctl add_user admin 1234
    • rabbitmqctl set_user_tags admin administrator
    • rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

 

RabbitMQ 따라하기

1. Hello World!

  • Nuget 패키지에서 RabbitMQ.Client 설치
  • 보내기
    • 큐 생성
    • 메세지 생성
    • 큐에 메세지 전송(publish)
    • 메세지 전송 확인
  • 받기
    • 큐 생성
    • consumer 생성
    • 메시지 수신 시 호출될 핸들러 등록(consume)
    • hello 이름의 큐에서 메시지 수신

 

Send

using RabbitMQ.Client;
using System.Text;

namespace Send
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                        durable: false, // durable : RabbitMQ 서버 재시작해도 큐 살아있게
                                        exclusive: false, // exclusive : 하나의 connection에서만 사용되며 닫힐때 queue 닫힘
                                        autoDelete: false, // autoDelete : 마지막 consumer가 구독 취소시 하나 이상의 consumer가 있는 queue 삭제
                                        arguments: null); // arguments : 플러그인 또는 큐 길이 제한 등의 기능

                // 메세지 생성
                string message = "Hello ~ MIRERO~ ";
                var body = Encoding.UTF8.GetBytes(message);

                // hello라는 이름의 큐에 메세지 전송
                channel.BasicPublish(exchange: "", // 기본 exchage 사용
                    routingKey: "hello", // 메세지 전송될 큐 지정
                    basicProperties: null, // 메세지 추가 속성 설정 안함
                    body: body); // body 속성을 바이트 배열로 설정

                // 메세지 전송 확인
                Console.WriteLine("[x] Sent {0}", message);
            }

            Console.WriteLine("Press [enter] to exit");
            Console.ReadLine();
        }
    }
}

 

Receive

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;


namespace Receive
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                    durable: false,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);

                // 메시지 수신 이벤트 처리할 consumer 생성
                var consumer = new EventingBasicConsumer(channel);

                // Received 이벤트에 구독해서 메시지 수신 시 호출될 핸들러 등록
                consumer.Received += (model, ea) =>
                {
                    // 바이트 배열 가져옴
                    var body = ea.Body.ToArray();

                    // 디코딩
                    var message = Encoding.UTF8.GetString(body);
                };

                // hello 이름의 큐에서 메시지 수신
                channel.BasicConsume(queue: "hello",
                                    autoAck: true, // 메시지 처리한 후 자동으로 확인
                                    consumer: consumer);

                Console.WriteLine(" Press [enter] to exit. ");
                Console.ReadLine();
            }
        }
    }
}

 

  • Task Queue : 병렬 작업 가능



2. Work Queues

Worker

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Channels;

namespace Worker
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue", // task_queue 이름의 큐 생성
                                    durable: true, // RabbitMQ 서버 재시작시 큐 삭제되지 않도록
                                    exclusive: false, // 하나의 connection에서만 사용되도록
                                    autoDelete: false, // 마지막 consumer가 구독 취소시 큐 삭제되지 않도록
                                    arguments: null); // 추가 설정 안함

                // count 1이면 한번에 하나의 메세지만 처리 가능
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                Console.WriteLine(" [*] Waiting for messages.");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);

                    int dots = message.Split('.').Length - 1;
                    Thread.Sleep(dots * 1000);

                    Console.WriteLine(" [x] Done ");

                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };

                // task_queue 이름의 큐에서 메세지 수신 시작함
                channel.BasicConsume(queue: "task_queue",
                                    autoAck: false,
                                    consumer: consumer);

                Console.WriteLine(" Press [enter] to exit ");
                Console.ReadLine();
            }

        }


    }
}

 

NewTask

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading.Channels;

namespace NewTask
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    // task_queue라는 이름의 큐 생성
                    channel.QueueDeclare(queue: "task_queue",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    var message = GetMessage(args);
                    var body = Encoding.UTF8.GetBytes(message);

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;


                    channel.BasicPublish(exchange: "",
                        routingKey: "task_queue", // task_queue 큐에 메시지를 전송
                        basicProperties: properties,
                        body: body);

                    Console.WriteLine(" [x] Sent {0}", message);
                }
            }
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

        private static string GetMessage(string[] args)
        {
            // 명령줄 인수가 있으면 그것을 메세지로 사용하고 그렇지 않으면 hello wow를 메세지로 사용함
            return ((args.Length > 0) ? string.Join(" ", args) : "Hello wow~~~");
        }
    }
}



3. Publish / Subscribe

EmitLog

using RabbitMQ.Client;
using System.Text;

namespace EmitLog
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");

                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes((string)message);         //string 바꾸는게 맞나요
                channel.BasicPublish(exchange: "logs",
                                routingKey: "",
                                basicProperties: null,
                                body: body);

                Console.WriteLine(" [x] Sent {0}", message);

            }

            Console.WriteLine(" Press [enter] to exit");
            Console.ReadLine();
        }

        private static object GetMessage(string[] args)
        {
            return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello MIRERO~~~");
        }
    }
}

 

ReceiveLogs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

namespace ReceiveLogs
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");

                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                exchange: "logs",
                                routingKey: "");

                Console.WriteLine(" [*] Waiting for logs.");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                      autoAck: true,
                                      consumer: consumer);

                Console.WriteLine(" Press [enter] to exit ");
                Console.ReadLine();
            }
        }
    }
}



4. Routing

EmitLogDirect

using RabbitMQ.Client;
using System.Text;

namespace EmitLogDirect
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                // direct_logs 이름의 직접 교환기 선언
                // 라우팅키 기반으로 특정 큐 전달
                channel.ExchangeDeclare(exchange: "direct_logs",
                                            type: "direct");

                // 메시지의 심각도 가져옴
                var severity = (args.Length > 0) ? args[0] : "info";

                // 메시지 가져옴
                var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World~~~";

                var body = Encoding.UTF8.GetBytes(message);

                // direct_logs 교환기에 메시지 보냄
                channel.BasicPublish(exchange: "direct_logs",
                                     routingKey: severity, // 라우팅키 지정
                                     basicProperties: null,
                                     body: body);

                // 발행한 메시지 정보 출력
                Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
            }
            Console.WriteLine(" Press [enter] to exit");
            Console.ReadLine();
        }
    }
}

 

ReceiveLogsDirect

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

namespace ReceiveLogsDirect
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() {  HostName = "localhost" };
            using(var connection = factory.CreateConnection()) 
            using(var channel = connection.CreateModel())
            {
                // direct_logs 이름의 직접 교환기 선언
                // 라우팅키 기반으로 특정 큐 전달
                channel.ExchangeDeclare(exchange: "direct_logs",
                                        type: "direct");

                // 큐 이름 자동 생성
                var queueName = channel.QueueDeclare().QueueName;

                // 명령줄에서 라우팅 키 받아오지 못한 경우 사용법 출력하고 종료
                if(args.Length < 1)
                {
                    Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                            Environment.GetCommandLineArgs()[0]);
                    Console.WriteLine(" Press [enter] to exit ");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }

                // 명령줄에서 받은 라우팅키에 해당하는 큐와 교환기 바인딩!
                foreach(var severity in args)
                {
                    channel.QueueBind(queue: queueName,
                                      exchange: "direct_logs",
                                      routingKey: severity);
                }

                Console.WriteLine(" [*] Waiting for messages");

                // 메세지 받을 수 있는 consumer 생성
                var consumer = new EventingBasicConsumer(channel);

                // 메시지 수신 이벤트 핸들러 정의
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" [x] Received '{0}':'{1}'",
                                        routingKey, message);
                };

                // 큐에서 메시지 consume
                channel.BasicConsume(queue: queueName,
                                        autoAck: true,
                                        consumer: consumer);

                Console.WriteLine(" Press [enter] to exit ");
                Console.ReadLine();
            }
        }
    }
}



'개발자가 되었다?' 카테고리의 다른 글

Day 25, JS, React  (0) 2023.09.19
Day 24, JS  (0) 2023.09.18
Day 22, Akka  (0) 2023.09.14
Day 21, EF Core  (0) 2023.09.13
Day 20, C#  (0) 2023.09.12