RabbitMQ---6、客户端 API 的简介

摘要:
接口和类定义核心的API的接口和类在RabbitMQ下定义。客户端命名空间:用户和RabbitMQServer ConnectionFactory之间的连接:尽管私有命名空间的成员通常可以使用库的应用程序,应用程序不能依赖于在库版本中保持稳定的私有命名空间中出现的任何类。NET客户端使用比其他客户端更严格的AMQP0-9-1URI规范。

1、主要的命名空间,接口和类

  定义核心的API的接口和类被定义在RabbitMQ.Client这个命名空间下面:

  所以要想使用RabbitMQ的功能,需要以下代码
 
   using RabbitMQ.Client;

   【1】、核心API的接口和类如下:

    IModel:表示一个符合AMQP 0-9-1 协议的通道,并且提供了很多的操作方法

    IConnection:表示一个符合AMQP 0-9-1协议的连接对象,用户和RabbitMQ 服务端的连接

    ConnectionFactory:可以创建一个IConnection对象的实例。

    IBasicConsumer:表示一个消息的消费者,或者是使用者。

   【2】、其他有用的接口和类包含如下:

    DefaultBasicConsumer:通常用作消费者的基类,如果要编写自己的消费者程序,可以从该类继承。

   【3】、除RabbitMQ.Client之外的公共命名空间包括:   

    RabbitMQ.Client.Events:作为客户端库一部分的各种事件和事件处理程序。

       包括EventingBasicConsumer,一个基于C#事件处理程序构建的消费者实现。

    RabbitMQ.Client.Exceptions: 对用户可见的一些异常对象。

    所有其他命名空间都保留用于库的私有实现细节,尽管私有命名空间的成员通常可以使用该库的应用程序使用,以便允许开发人员实现其在库实现中发现的故障或设计错误的解决方法。 应用程序不能依赖于在库的版本中保持稳定的私有命名空间中出现的任何类,接口,成员变量等。


2、创建到代理的连接

  要连接到RabbitMQ,需要实例化一个ConnectionFactory并将其配置为使用所需的主机名,虚拟主机和凭据。 然后使用ConnectionFactory.CreateConnection()打开一个连接。 以下两个代码片段连接到hostName上的RabbitMQ节点:

复制代码
ConnectionFactory factory = new ConnectionFactory();
   // "guest"/"guest" by default, limited to localhost connections
   factory.UserName = user;
   factory.Password = pass;
   factory.VirtualHost = vhost;
   factory.HostName = hostName;
   IConnection conn = factory.CreateConnection();


   ConnectionFactory factory = new ConnectionFactory();
   factory.Uri = "amqp://user:pass@hostName:port/vhost";
   IConnection conn = factory.CreateConnection();
复制代码


   由于.NET客户端使用比其他客户端更严格的AMQP 0-9-1 URI规范解释,因此在使用URI时必须小心。 特别是,主机部分不能被忽略,具有空名称的虚拟主机不可寻址。 所有出厂属性都有默认值。 如果属性在创建连接之前保持未分配,则将使用属性的默认值:

复制代码
Username
    "guest"
   Password
    "guest"
   Virtual host
    "/"
   Hostname
    "localhost"
   port
    5672 是针对一般而言的连接, 5671 是针对 TLS 连接的 
复制代码


   IConnection接口可以打开一个通道:

 IModel channel = conn.CreateModel();


   通道Channel用于接收和发送消息

3、使用消息交换机和队列

    客户端应用程序将与消息交换机和消息队列(AMQP 0-9-1的高级构建块)配合工作。 【消息交换机】和【消息队列】在使用之前必须先声明他们。 声明任何类型的对象只是确保其中一个名称存在,如有必要,创建它。 继续前面的例子,以下代码声明一个消息交换机和一个队列,然后将它们绑定在一起。

model.ExchangeDeclare(exchangeName, ExchangeType.Direct);
   model.QueueDeclare(queueName, false, false, false, null);
   model.QueueBind(queueName, exchangeName, routingKey, null);


  这将声明了以下两个对象:

  【1】、非持久、非自动删除的、交换类型为“direct”的消息交换机;

  【2】、非持久、非自动删除、非排他的消息队列

  可以通过使用附加参数来定制消息交换机。 然后,上面的代码使用给定的路由键将队列绑定到消息交换机。 请注意,许多通道API(IModel)方法重载。 ExchangeDeclare方便的简单形式使用合理的默认值。 还有更多的表单具有更多的参数,可以根据需要修改这些默认值,并在需要时进行完全控制。 在整个API中使用这种“短版本,长版本”模式。

4、发布消息

  要将消息发布到消息交换机,请使用IModel.BasicPublish,如下所示:

 byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
  model.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);


  为了精细控制,您可以使用重载变量来指定强制标志,或指定消息属性:

复制代码
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
  IBasicProperties props = model.CreateBasicProperties();
  props.ContentType = "text/plain";
  props.DeliveryMode = 2;
  model.BasicPublish(exchangeName,
                   routingKey, props,
                   messageBodyBytes);
复制代码


   这将发送一个带有发送模式为2(持久性)和内容类型是“text / plain”的消息。 有关可用消息属性的更多信息,请参阅IBasicProperties接口的定义。

   在以下示例中,我们使用自定义标头发布消息:

复制代码
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

   IBasicProperties props = model.CreateBasicProperties();
   props.ContentType = "text/plain";
   props.DeliveryMode = 2;
   props.Headers = new Dictionary<string, object>();
   props.Headers.Add("latitude",  51.5252949);
   props.Headers.Add("longitude", -0.0905493);

   model.BasicPublish(exchangeName,
                   routingKey, props,
                   messageBodyBytes);
复制代码


   下面的示例设置消息过期:

复制代码
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

   IBasicProperties props = model.CreateBasicProperties();
   props.ContentType = "text/plain";
   props.DeliveryMode = 2;
   props.Expiration = "36000000"

   mode.BasicPublish(exchangeName,
                  routingKey, props,
                  messageBodyBytes);
复制代码


5、获取个人消息(“拉API”)

   要检索单个消息,请使用IModel.BasicGet。 返回的值是BasicGetResult的实例,可以从中提取基本属性和消息体:

复制代码
bool noAck = false;
BasicGetResult result = channel.BasicGet(queueName, noAck);
if (result == null) {
    // No message available at this time.
} else {
    IBasicProperties props = result.BasicProperties;
    byte[] body = result.Body;
    ...
复制代码


由于noAck = false,您还必须调用IModel.BasicAck来确认您已成功接收并处理该消息:

  ...
    // acknowledge receipt of the message
    channel.BasicAck(result.DeliveryTag, false);
}


请注意,使用此API获取消息效率相对较低。 如果您希望RabbitMQ将消息推送给客户端,请参阅下一节。

6、通过订阅检索邮件(“推送API”)

   接收消息的另一种方法是使用IBasicConsumer接口建立订阅。 然后,消息将在其到达时自动发送,而不必主动请求。 实现【消费者】的一种方法是使用便利类EventingBasicConsumer,它将传送和其他【消费者】生命周期事件以C#事件:

复制代码
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                {
                    var body = ea.Body;
                    // ... process the message
                    channel.BasicAck(ea.DeliveryTag, false);
                };
String consumerTag = channel.BasicConsume(queueName, false, consumer);
复制代码


   另一个选项是根据子类DefaultBasicConsumer,覆盖方法,或者直接实现IBasicConsumer。 你一般会想实现核心方法IBasicConsumer.HandleBasicDeliver。 更复杂的消费者将需要实施进一步的方法。 特别地,HandleModelShutdown使通道/连接关闭。 【消费者】还可以实现HandleBasicCancelOk以获得取消通知。 DefaultBasicConsumer的ConsumerTag属性可用于检索服务器生成的使用者标签,如果没有提供给原始的IModel.BasicConsume调用。 您可以使用IModel.BasicCancel取消活跃的消费者:

channel.BasicCancel(consumerTag);


在调用API方法时,您总是通过【消费者】标签来引用【消费者】,【消费者】标签可以是客户端或服务器生成的,如AMQP 0-9-1规范文档中所述。

7、【消费者】的并发注意事项

    在当前实现中,每个IConnection实例都由一个从套接字读取的后台线程支持,并将生成的事件分派到应用程序。 如果启用了心跳,则从3.5.0版开始,它们将以.NET定时器方式实现。 因此,通常使用此库的应用程序中至少有两个线程处于活动状态:

应用程序线程

     包含应用程序逻辑,并调用IModel方法来执行协议操作。

I / O活动线程

     隐藏并由IConnection实例完全管理。

     在应用程序中可以看到线程模型的本质的一个地方是在库中的应用程序注册的任何回调。 这种回调包括:

     任何IBasicConsumer方法
     在IModel上的BasicReturn事件
     IConnection,IModel等各种关机事件

8、【消费者】回调和订阅

    从版本3.5.0开始,应用程序回调处理程序可以调用阻塞操作(如IModel.QueueDeclare或IModel.BasicCancel)。 IBasicConsumer回调同时被调用。 但是,每通道操作命令被保留。 换句话说,如果消息A和B按照同一个频道的顺序传送,则按照此顺序进行处理。 如果消息A和B在不同的信道上传送,则可以以任何顺序(或并行)处理消息。 由【.NET运行时】提供的、默认的TaskScheduler的任务分发器中调用【消费者】的回调。

9、使用自定义任务计划程序

   通过设置ConnectionFactory.TaskScheduler可以使用自定义任务调度程序:

复制代码
public class CustomTaskScheduler:TaskScheduler
{
   // ...
}

var cf = new ConnectionFactory();
cf.TaskScheduler = new CustomTaskScheduler();
复制代码


例如,这可以用于通过自定义TaskScheduler来限制并发程度。

10、在多线程中共享通道Chanel

作为经验法则,IModel实例不应同时被多个线程使用:应用程序代码应该保持对IModel实例的线程所有权的清晰认识。 如果多个线程需要访问特定的IModel实例,应用程序应该强制执行互斥。 实现此目的的一种方法是为IModel的所有用户锁定实例本身:

IModel ch = RetrieveSomeSharedIModelInstance();
lock (ch) {
  ch.BasicPublish(...);
}


IModel操作错误一系列的症状包括但不限于,

   在线上发送无效帧序列(例如,如果同时运行多个BasicPublish操作,则发生)和/或从类RpcContinuationQueue中的方法抛出NotSupportedException异常,提示“管道的请求是被禁止的【Pipelining of requests forbidden】”(在同时运行多个AMQP 0-9-1同步操作(如ExchangeDeclare)的情况下发生)。

11、处理不可路由的消息

如果发布了一个设置了“强制”标志的消息,但未能送达,则代理将消息返回给发送的客户端(通过basic.return AMQP 0-9-1命令)。 要获得此类通知,客户端可以订阅IModel.BasicReturn事件。 如果没有附加事件的监听器,则返回的消息将被静默地丢弃。

model.BasicReturn +=
  new RabbitMQ.Client.Events.BasicReturnEventHandler(...);


如果客户端把一条标识为“mandatory”的消息发送到了类型为【Direct】的【消息交换机Exchange】,但是这个exchange还没有绑定到一个消息队列的时候,BasicReturn事件就会被触发。

12、从RabbitMQ断开连接

要断开连接,只需关闭通道和连接:

channel.Close(200, "Goodbye");
conn.Close();


请注意,关闭通道是很好的做法,但不是必要的 - 当底层连接关闭时,它将自动完成。 在某些情况下,您可能希望连接上的最后一个打开的通道关闭的时候连接也关闭, 要实现此目的,请将IConnection.AutoClose属性设置为true,但仅在创建第一个通道后:

IConnection conn = factory.CreateConnection(...);
IModel channel = conn.CreateModel();
conn.AutoClose = true;


当AutoClose为true时,最后关闭的通道也将导致连接关闭。 如果在创建任何通道之前设置为true,则连接将在那时关闭。

好了,暂时翻译到此吧,其实还有一些没翻译完,暂时就不翻译了,有时间再继续。

免责声明:文章转载自《RabbitMQ---6、客户端 API 的简介》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇驱动:电阻屏触摸芯片NS2009System.DllNotFoundException: Unable to load shared library 'libdl' or one of its dependencies .NET Core 图片操作在 Linux/Docker 下的坑下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

VUE 或者JS 常用数据类型及方法:字符串、数组、对象

字符串 1. 字符串转换  2. 字符串分割  3. 字符串替换  4. 获取字符串长度 var mystr="qingchenghuwoguoxiansheng,woaishenghuo,woaiziji"; var arrLength=mystr.length; 5. 字符串切割 有三种可以从字符串中抽取和切割的方法: 第一种,slice()函...

参数估计

根据样本推断总体的分布或退而求其次推断分布的数字特征,都称为统计推断,它是统计 学的核心 ,这之前我们就说过,你作样本的分析不是最终目的,我们最终是要得到总体 的。 而统计推断的问题又可以分为两大类,一类是估计问题,另一类是假设检验问题。很多实际问题中,总体 的分布类型已知,但它包含一个或几个未知的参数,总体的参数完全 由所含参数决定,这样就需要对未知参数...

深入分析Java反射(四)-动态代理

动态代理的简介 Java动态代理机制的出现,使得Java开发人员不用手工编写代理类,只要简单地指定一组接口及委托类对象,便能动态地获得代理类。代理类会负责将所有的方法调用分派到委托对象上反射执行,在分派执行的过程中,开发人员还可以按需调整委托类对象及其功能,这是一套非常灵活有弹性的代理框架。Java动态代理实际上通过反射技术,把代理对象和被代理对象(真实对...

ActiveMQ教程(简介与安装)

  ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。   一、ActiveMq的特性:   ⒈ 多种语言和协议编写客户端。语言: Java,...

iOS开发之使用Runtime给Model类赋值

  本篇博客算是给网络缓存打个基础吧,本篇博客先给出简单也是最容易使用的把字典转成实体类的方法,然后在给出如何使用Runtime来给Model实体类赋值。本篇博客会介绍一部分,主要是字典的key与Model的属性名相同时,使用Runtime来进行赋值,下篇博客会给出字典key的值和Model的名字不同时的解决方案,并给出使用Runtime打印实体类属性值的...

4.14Java游戏小项目之键盘控制原理

4.14Java游戏小项目之键盘控制原理本质理解 键盘和程序交互原理: 每次按下键、松开键触发响应的键盘事件。 按下和松开用true和false表示 将事件封装到KeyEvent对象中 识别按下哪个键就是对键盘进行编号 编号通过KeyEvent对象来查询 本项目有四个方向,设置四个基本方向,按下=true,松开=false,左右移动只需要对其进行加、...