C#利用RabbitMQ实现点对点消息传输

摘要:
RabbitMQ作为消息代理,负责接收和转发消息。拉比MQ可以比作邮箱、邮局和邮递员。本文主要使用一个简单的示例来简要描述RabbitMQ消息传输的相关内容,这仅用于学习和共享。如果有缺点,请改正。RabbitMQ设置RabbitMQ-通过交换机将消息转发到相应的队列,因此队列需要与交换机绑定。

RabbitMQ做为消息代理,负责接收和转发消息,可以将RabbitMQ比喻为一个邮筒、一个邮局和一个邮递员。本文主要以一个简单的小例子,简述RabbitMQ实现消息传输的相关内容,仅供学习分享使用,如有不足之处,还请指正。

消息队列模型

所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

C#利用RabbitMQ实现点对点消息传输第1张

RabbitMQ设置

RabbitMQ是通过交换机将消息转发到对应队列,所以队列需要和交换机进行绑定。本例将队列绑定到默认的amq.direct交换机,并设置Routing key,如下图所示:

C#利用RabbitMQ实现点对点消息传输第2张

RabbitMQ动态库安装

通过NuGet包管理器进行安装RabbitMQ.Client,如下所示:

C#利用RabbitMQ实现点对点消息传输第3张

RabbitMQ.Client相关知识点

  • ConnectionFactory:构造一个实例,主要创建连接。
  • IConnection:表示一个基于AMQP协议的连接。
  • IModel:表示一个RabbitMQ通道,可用于声明一个队列,然后开始消费。
  • EventingBasicConsumer:基于独立事件监听的基础消费者,可以监听并接收消息。
  • 生产者基本步骤:1. 创建连接 2. 基于连接创建通道 3. 基于通道声明队列,4. 开始生产并发布消息
  • 消费者基本步骤:1. 创建连接 2. 基于连接创建通道 3. 基于通道声明队列,4. 创建消费者,5. 绑定通道和消费者,并开始消费

示例效果图

本例主要有一个生产者,一个消费者,通过消息队列进行消息转发和接收。

生产者负责消息发送,如下图所示:

C#利用RabbitMQ实现点对点消息传输第4张

消费者负责消息接收,如下图所示:

C#利用RabbitMQ实现点对点消息传输第5张

核心代码

代码结构:主要包括生产者,消费者,公共基础代码,如下所示:

C#利用RabbitMQ实现点对点消息传输第6张

RabbitMqHelper主要创建连接,如下所示:

 1     public class RabbitMqHelper
 2     {
 3         
 4         /// <summary>
 5         /// 创建连接
 6         /// </summary>
 7         /// <returns></returns>
 8         public IConnection GetConnection()
 9         {
10             try
11             {
12                 var factory = new ConnectionFactory()
13                 {
14                     HostName = "127.0.0.1",
15                     Port = 5672,
16                     UserName = "guest",
17                     Password = "guest",
18                     VirtualHost = "/ShortMsgHost"
19                 };
20                 var conn = factory.CreateConnection();
21                 return conn;
22             }
23             catch (Exception ex) {
24                 throw ex;
25             }
26         }
27 
28    
29 
30     }

RabbmitMqSendHelper用于发送消息,如下所示:

 1     public class RabbmitMqSendHelper : RabbitMqHelper
 2     {
 3         /// <summary>
 4         /// 发送消息
 5         /// </summary>
 6         /// <param name="msg"></param>
 7         /// <returns></returns>
 8         public bool SendMsg(string msg)
 9         {
10             try
11             {
12                 using (var conn = GetConnection())
13                 {
14                     using (var channel = conn.CreateModel())
15                     {
16                         channel.QueueDeclare(queue: "ShortMsgQueue",
17                                      durable: true,
18                                      exclusive: false,
19                                      autoDelete: false,
20                                      arguments: null);
21                         var body = Encoding.UTF8.GetBytes(msg);
22 
23                         channel.BasicPublish(exchange: "amq.direct",
24                                              routingKey: "ShortMsgKey",
25                                              basicProperties: null,
26                                              body: body);
27 
28                         //Console.WriteLine(" [x] Sent {0}", message);
29                     };
30                 };
31                 return true;
32             }
33             catch (Exception ex)
34             {
35                 throw ex;
36             }
37         }
38     }

RabbitMqReceiveHelper主要用于接收信息,如下所示:

 1     public class RabbitMqReceiveHelper : RabbitMqHelper
 2     {
 3         public RabbitMqReceiveEventHandler OnReceiveEvent;
 4 
 5         private IConnection conn;
 6 
 7         private IModel channel;
 8 
 9         private EventingBasicConsumer consumer;
10 
11         public bool StartReceiveMsg()
12         {
13             try
14             {
15                 conn = GetConnection();
16 
17                 channel = conn.CreateModel();
18 
19                 channel.QueueDeclare(queue: "ShortMsgQueue",
20                                 durable: true,
21                                 exclusive: false,
22                                 autoDelete: false,
23                                 arguments: null);
24 
25                 consumer = new EventingBasicConsumer(channel);
26                 consumer.Received += (model, ea) =>
27                 {
28                     var body = ea.Body.ToArray();
29                     var message = Encoding.UTF8.GetString(body);
30                     //Console.WriteLine(" [x] Received {0}", message);
31                     if (OnReceiveEvent != null)
32                     {
33                         OnReceiveEvent(message);
34                     }
35                 };
36                 channel.BasicConsume(queue: "ShortMsgQueue",
37                                         autoAck: true,
38                                         consumer: consumer);
39                 return true;
40             }
41             catch (Exception ex)
42             {
43                 throw ex;
44             }
45         }
46     }

关于RabbitMQ的基础知识介绍,可参考前几篇博文。

备注

浣溪沙·堤上游人逐画船

欧阳修 〔宋代〕

堤上游人逐画船,拍堤春水四垂天。绿杨楼外出秋千。
白发戴花君莫笑,六幺催拍盏频传。人生何处似尊前! 

免责声明:文章转载自《C#利用RabbitMQ实现点对点消息传输》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇fastJson与jackson性能对比TCP keepalive长连接心跳保活下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Springboot项目集成JPush极光推送(Java SDK)

1.由于项目的需求,需要在Android APP上实现消息推送功能,所以引用了极光推送(官网:https://www.jiguang.cn/,  文档:http://docs.jiguang.cn/) 2.极光推送是经过考验的大规模app推送平台,极光推送目前每天推送消息数超过20亿条。 开发者集成SDK后,可以通过调用API推送消息。同时,极光推送提供可...

RabbitMQ 的配置文件

Ubuntu系统上RabbitMQ的配置文件应该存储在/etc/rabbitmq/rabbitmq.conf 如果没有,在RabbitMQ的启动log里面会有如下的信息: Starting RabbitMQ 3.7.7 on Erlang 21.0 Copyright (C) 2007-2018 Pivotal Software, Inc. Licen...

golang-nsq高性能消息队列

前言 tips:如果本文对你有用,请爱心点个赞,提高排名,让这篇文章帮助更多的人。谢谢大家!比心❤~ 如果解决不了,可以在文末加我微信,进群交流。 NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。 NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。 N...

Windows消息机制详解

消息是指什么? 消息系统对于一个win32程序来说十分重要,它是一个程序运行的动力源泉。一个消息,是系统定义的一个32位的值,他唯一的定义了一个事件,向 Windows发出一个通知,告诉应用程序某个事情发生了。例如,单击鼠标、改变窗口尺寸、按下键盘上的一个键都会使Windows发送一个消息给应用程序。 消息本身是作为一个记录传递给应用程序的,这个记录中包含...

Linux 安装 erlang 和 rabbitmq

1. 更新基本系统 安装任何软件包之前,建议使用以下命令更新软件包和存储库 yum -y update 2. 安装Erlang 由于RabbitMQ是基于Erlang(面向高并发的语言)语言开发,所以在安装RabbitMQ之前,需要先安装Erlang。在本教程中我们将安装最新版本的Erlang到服务器中。 Erlang在默认的YUM存储库中不可用,因此您...

使用wireshark抓包分析SOCKS5协议

目录 编写SOCKS5服务器运行代码(参考自Python编写socks5服务器) 使用SOCKS5服务器脚本和curl命令 分析抓取到的数据包理解SOCKS5协议的工作过程(感谢socks5代理服务器协议的说明让我预先知道SOCKS5协议数据消息传递的机理) 通信软件课选择了分析SOCKS5协议,想看一下这个协议在网络通信中是如何进行的,遂抓包...