C# Издатель-Подписчик через RabbitMQ

250
07 февраля 2018, 10:15

Решил разобраться с меж сервисным взаимодействием через брокер сервис очереди RabbitMQ. И подвернулась задача где его можно использовать для обмена данными между программами.

Есть 4 физических компьютера в сети:

1 - Издатель. раз в секунду отправляет состояние модели.

2 - Брокер сервер с установленным и запущенным сервисом RabbitMq

3 - Подписчик 1. Получает состояния модели.

4 - Подписчик 2. Получает состояния модели.

Для быстрого тестирования создал 2 проекта WinForms.

Server.Publisher

   public partial class Form1 : Form
{
    public ConnectionFactory ConnectionFactory { get; set; }
    public IConnection Connect { get; set; }
    public IModel Channel { get; set; }
    public bool IsConnectToBus { get; set; }
    public bool IsSubscriberNotified { get; set; }// Подписчики оповещены
    public Form1()
    {
        InitializeComponent();
    }
    private void btnConnect_Click(object sender, EventArgs e)
    {
        try
        {
            var hostName = @"192.168.1.40";
            ConnectionFactory = new ConnectionFactory
            {
                HostName = hostName,
                Port = 5672,
                UserName = "user",
                Password = "password",
                VirtualHost = "/",
                AutomaticRecoveryEnabled = true,
                TopologyRecoveryEnabled = true,
                NetworkRecoveryInterval = TimeSpan.FromSeconds(3)
            };
            //Подключение
            Connect = ConnectionFactory.CreateConnection();
            //Создание канала обмена
            Channel = Connect.CreateModel();
            Channel.ExchangeDeclare(exchange: "dataExchange", type: "direct");
            IsConnectToBus = Connect.IsOpen;
            tb_isConnect.Text = IsConnectToBus.ToString();
        }
        catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException ex)
        {
            MessageBox.Show($"Соединение прерванно {ex.ToString()}");
        }
        catch (Exception ex)
        {
            MessageBox.Show($"ИСКЛЮЧЕНИЕ {ex.ToString()}");
        }
    }
    private void BtnSend_Click(object sender, EventArgs e)
    {
        var message = tb_message.Text;
        if (string.IsNullOrEmpty(message))
        {
            MessageBox.Show("Mrssage = NULL");
            return;
        }
        var routingKey = tb_route.Text;
        if (string.IsNullOrEmpty(routingKey))
        {
            MessageBox.Show("routingKey = NULL");
            return;
        }
        try
        {
            var body = Encoding.UTF8.GetBytes(message);
            Channel.BasicPublish(exchange: "dataExchange", routingKey: routingKey, basicProperties: null, body: body);
        }
        catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException ex)
        {
            MessageBox.Show($"Соединение прерванно {ex.ToString()}");
        }
        catch (Exception ex)
        {
            MessageBox.Show($"ИСКЛЮЧЕНИЕ {ex.ToString()}");
        }
    }

    private void btn_DisConnect_Click(object sender, EventArgs e)
    {
        Channel?.Close(200, "Goodbye");
        Connect?.Close();
    }

    protected override void OnClosed(EventArgs e)
    {
        Channel?.Close(200, "Goodbye");
        Connect?.Close();
        base.OnClosed(e);
    }
}

Client.Consumer

    public partial class Form1 : Form
{
    public ConnectionFactory ConnectionFactory { get; set; }
    public IConnection Connect { get; set; }
    public IModel Channel { get; set; }
    public bool IsConnectToBus { get; set; }
    public bool IsSubscriberNotified { get; set; }// Подписчики оповещенны
    public Form1()
    {
        InitializeComponent();
    }
    private void btnConnect_Click(object sender, EventArgs e)
    {
        try
        {
            var hostName = @"192.168.1.40";
            ConnectionFactory = new ConnectionFactory
            {
                HostName = hostName,
                Port = 5672,
                UserName = "user",
                Password = "password",
                VirtualHost = "/",
                AutomaticRecoveryEnabled = true,
                TopologyRecoveryEnabled = true,
                NetworkRecoveryInterval = TimeSpan.FromSeconds(3)
            };
            //Подключение
            Connect = ConnectionFactory.CreateConnection();
            //Создание канала обмена
            Channel = Connect.CreateModel();
            Channel.ExchangeDeclare(exchange: "dataExchange", type: "direct");
            var queueName = Channel.QueueDeclare().QueueName;
            Channel.QueueBind(queue: queueName, exchange: "dataExchange", routingKey: tb_route.Text);
            //Подписка на событие получения данных
            var consumer = new EventingBasicConsumer(Channel);
            consumer.Received += Consumer_Received;  
            Channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
            IsConnectToBus = Connect.IsOpen;
            tb_isConnect.Text = IsConnectToBus.ToString();
        }
        catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException ex)
        {
            MessageBox.Show($"Соединение прерванно {ex.ToString()}");
        }
        catch (Exception ex)
        {
            MessageBox.Show($"ИСКЛЮЧЕНИЕ {ex.ToString()}");
        }
    }
    private void Consumer_Received(object model, BasicDeliverEventArgs ea)
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body);
        var routingKey = ea.RoutingKey;
        tb_ReciveData.Text = message;
    }
    private void btn_DisConnect_Click(object sender, EventArgs e)
    {
        Channel?.Close(200, "Goodbye");
        Connect?.Close();
    }

    protected override void OnClosed(EventArgs e)
    {
        Channel?.Close(200, "Goodbye");
        Connect?.Close();
        base.OnClosed(e);
    }
}

ВОПРОСЫ:

1.Как издатель поймет что его подписчики уведомлены?

  1. Как издатель и подписчик отреагируют на выключение из сети Брокер сервиса RabbitMQ? Т.е. мне нужно просто выставить состояние "IsConnect = false". Сработает какое-то событие или я пойму только по Exception при отправке от издателя? Как тогда понять подписчику что созданная очередь для этого обмена больше не доступна?

  2. Будет ли RabbitMQ пытаться восстанавливать соединение в случае выключения из сети Брокер сервиса RabbitMQ если выставлено AutomaticRecoveryEnabled = true, TopologyRecoveryEnabled = true?

    Как я пойму что соединение восстановлено "IsConnect = true"?

READ ALSO
Свойства класса в C# [требует правки]

Свойства класса в C# [требует правки]

Мне потребовалось создать свой список, на подобии List<T> в C#Не спрашивайте зачем, просто примите как дaное

254
Быстрый доступ к закрытому массиву

Быстрый доступ к закрытому массиву

Есть некоторый класс, содержащий массив с координатами в виде закрытого поля, и абстрактный класс для создания расширений:

228
Как можно время на сервере сделать локальным?

Как можно время на сервере сделать локальным?

Раньше у меня хостинг бы в Москве, в моей временной зоне, и сайт работал как ожидалосьСейчас переезжаем на новый хостинг (SmarterAsp, Shared) у которого...

217
Реализация WCF контрактов

Реализация WCF контрактов

ЗдравствуйтеПару дней уже не могу решить проблему

222