RabbitMQ 发布/订阅

摘要:
即,已发布的日志消息将转发给所有收件人。1.ExchangesRabbitMQ消息模型的核心思想是,生产者永远不会直接向队列发送任何消息。通常,生产者甚至不知道消息应该发送到哪个队列。生产者只能向转发者(Exchange)发送消息。这些规则由转发器的类型定义。

  我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式)。

      为了验证这种模式,我们准备构建一个简单的日志系统。这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志。

      在我们的日志系统中,每一个运行的接收者程序都会收到日志。然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上。本质上来说,就是发布的日志消息会转发给所有的接收者。

  1、转发器(Exchanges)

  RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。

  相反的,生产者只能发送消息给转发器(Exchange)。转发器是非常简单的,一边接收从生产者发来的消息,另一边把消息推送到队列中。转发器必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过转发器的类型进行定义。

RabbitMQ 发布/订阅第1张

  下面列出一些可用的转发器类型:

  Direct

  Topic

  Headers

  Fanout

  目前我们关注最后一个fanout,声明转发器类型的代码:

1 channel.exchangeDeclare("logs","fanout");

  fanout类型转发器特别简单,把所有它介绍到的消息,广播到所有它所知道的队列。不过这正是我们前述的日志系统所需要的。

  2、匿名转发器(nameless exchange)

  前面说到生产者只能发送消息给转发器(Exchange),我们仍然可以发送和接收消息。这是因为我们使用了一个默认的转发器,它的标识符为””。之前发送消息的代码:

1 channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

  第一个参数为转发器的名称,我们设置为”” : 如果存在routingKey(第二个参数),消息由routingKey决定发送到哪个队列。

  现在我们可以指定消息发送到的转发器:

1 channel.basicPublish( "logs","", null, message.getBytes());

  3、临时队列(Temporary queues)

  前面的博客中我们都为队列指定了一个特定的名称。能够为队列命名对我们来说是很关键的,我们需要指定消费者为某个队列。当我们希望在生产者和消费者间共享队列时,为队列命名是很重要的。
  不过,对于我们的日志系统我们并不关心队列的名称。我们想要接收到所有的消息,而且我们也只对当前正在传递的数据的感兴趣。为了满足我们的需求,需要做两件事:
  第一, 无论什么时间连接到Rabbit我们都需要一个新的空的队列。为了实现,我们可以使用随机数创建队列,或者更好的,让服务器给我们提供一个随机的名称。
  第二, 一旦消费者与Rabbit断开,消费者所接收的那个队列应该被自动删除。
  Java中我们可以使用queueDeclare()方法,不传递任何参数,来创建一个非持久的、唯一的、自动删除的队列且队列名称由服务器随机产生。

1 String queueName = channel.queueDeclare().getQueue();

  一般情况这个名称与amq.gen-JzTY20BRgKO-HjmUJj0wLg 类似。

  4、绑定(Bindings)

   RabbitMQ 发布/订阅第2张

  我们已经创建了一个fanout转发器和队列,我们现在需要通过binding告诉转发器把消息发送给我们的队列。
  channel.queueBind(queueName, “logs”, ””)参数1:队列名称 ;参数2:转发器名称

  5、完整的例子
  RabbitMQ 发布/订阅第3张
  发送端:
 1 public class EmitLog  
 2 {  
 3     private final static String EXCHANGE_NAME = "ex_log";  
 4   
 5     public static void main(String[] args) throws IOException  
 6     {  
 7         // 创建连接和频道  
 8         ConnectionFactory factory = new ConnectionFactory();  
 9         factory.setHost("localhost");  
10         Connection connection = factory.newConnection();  
11         Channel channel = connection.createChannel();  
12         // 声明转发器和类型  
13         channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );  
14           
15         String message = new Date().toLocaleString()+" : log something";  
16         // 往转发器上发送消息  
17         channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());  
18   
19         System.out.println(" [x] Sent '" + message + "'");  
20   
21         channel.close();  
22         connection.close();  
23   
24     }  
25   
26 }  

  接收端1:

 1 public class ReceiveLogsToSave  
 2 {  
 3     private final static String EXCHANGE_NAME = "ex_log";  
 4   
 5     public static void main(String[] argv) throws java.io.IOException,  
 6             java.lang.InterruptedException  
 7     {  
 8         // 创建连接和频道  
 9         ConnectionFactory factory = new ConnectionFactory();  
10         factory.setHost("localhost");  
11         Connection connection = factory.newConnection();  
12         Channel channel = connection.createChannel();  
13   
14         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
15         // 创建一个非持久的、唯一的且自动删除的队列  
16         String queueName = channel.queueDeclare().getQueue();  
17         // 为转发器指定队列,设置binding  
18         channel.queueBind(queueName, EXCHANGE_NAME, "");  
19   
20         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
21   
22         QueueingConsumer consumer = new QueueingConsumer(channel);  
23         // 指定接收者,第二个参数为自动应答,无需手动应答  
24         channel.basicConsume(queueName, true, consumer);  
25   
26         while (true)  
27         {  
28             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
29             String message = new String(delivery.getBody());  
30   
31             print2File(message);  
32         }  
33   
34     }  
35   
36     private static void print2File(String msg)  
37     {  
38         try  
39         {  
40             String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();  
41             String logFileName = new SimpleDateFormat("yyyy-MM-dd")  
42                     .format(new Date());  
43             File file = new File(dir, logFileName+".txt");  
44             FileOutputStream fos = new FileOutputStream(file, true);  
45             fos.write((msg + "
").getBytes());  
46             fos.flush();  
47             fos.close();  
48         } catch (FileNotFoundException e)  
49         {  
50             e.printStackTrace();  
51         } catch (IOException e)  
52         {  
53             e.printStackTrace();  
54         }  
55     }  
56 }  

  随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后写入日志文件。

 1 public class ReceiveLogsToConsole  
 2 {  
 3     private final static String EXCHANGE_NAME = "ex_log";  
 4   
 5     public static void main(String[] argv) throws java.io.IOException,  
 6             java.lang.InterruptedException  
 7     {  
 8         // 创建连接和频道  
 9         ConnectionFactory factory = new ConnectionFactory();  
10         factory.setHost("localhost");  
11         Connection connection = factory.newConnection();  
12         Channel channel = connection.createChannel();  
13   
14         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
15         // 创建一个非持久的、唯一的且自动删除的队列  
16         String queueName = channel.queueDeclare().getQueue();  
17         // 为转发器指定队列,设置binding  
18         channel.queueBind(queueName, EXCHANGE_NAME, "");  
19   
20         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
21   
22         QueueingConsumer consumer = new QueueingConsumer(channel);  
23         // 指定接收者,第二个参数为自动应答,无需手动应答  
24         channel.basicConsume(queueName, true, consumer);  
25   
26         while (true)  
27         {  
28             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
29             String message = new String(delivery.getBody());  
30             System.out.println(" [x] Received '" + message + "'");  
31   
32         }  
33   
34     }  
35   
36 }  

  随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后打印到控制台。

    现在把两个接收端运行,然后运行3次发送端:

  输出结果:

  发送端:

 [x] Sent '2014-7-10 16:04:54 : log something'

 [x] Sent '2014-7-10 16:04:58 : log something'

 [x] Sent '2014-7-10 16:05:02 : log something'

  接收端1:

RabbitMQ 发布/订阅第4张

  接收端2:

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received '2014-7-10 16:04:54 : log something'
 [x] Received '2014-7-10 16:04:58 : log something'
 [x] Received '2014-7-10 16:05:02 : log something'

  这个例子实现了我们文章开头所描述的日志系统,利用了转发器的类型:fanout。

  参考文档:http://blog.csdn.net/lmj623565791/article/details/37657225

免责声明:文章转载自《RabbitMQ 发布/订阅》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Django项目常见面试问题Git和Github库详细使用教程下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Semaphore 与ThreadPoolExecutor 的使用

1、 Semaphore 信号量  (阻塞) 优点:可以控制线程的数量,不会超出线程范围 缺点:当线程死锁时,永远没法释放,导致一直阻塞 在java中,提供了信号量Semaphore的支持。 Semaphore类是一个计数信号量,必须由获取它的线程释放, 通常用于限制可以访问某些资源(物理或逻辑的)线程数目。 一个信号量有且仅有3种操作,且它们全部是原子的...

Android -- TypedArray

当我们自定义View的时候,在给View赋值一些长度宽度的时候,一般都是在layout布局文件中进行的。,比如android:layout_height="wrap_content",除此之外,我们也可以自己定义属性,这样在使用的时候我们就可以使用形如 myapp:myTextSize="20sp"的方式了。 values/attrs.xml 首先要创建变...

【Bullet引擎】刚体类 —— btRigidBody

btRigidBody类主要用于刚体数据的计算。 在模拟刚体动画过程中,可以使用btRigidBody类获取所保存的刚体对象,进而控制刚体对象的旋转和位移。进行刚体模拟计算需要经常用到此类。 API:http://bulletphysics.org/Bullet/BulletFull/classbtRigidBody.html 创建刚体对象 btC...

python+opencv中最近出现的一些变化( OpenCV 官方的 Python tutorial目前好像还没有改过来?) 记一次全景图像的拼接

最近在学习过程中发现opencv有了很多变动, OpenCV 官方的 Python tutorial目前好像还没有改过来,导致大家在学习上面都出现了一些问题,现在做一个小小的罗列,希望对大家有用 做的是关于全景图像的拼接,关于sift和surf的语法之后有需要会另开文章具体阐述,此篇主要是解决大家困惑许久的问题。 笔者python3.x 首先是安装上,必须...

Tushare模块

.TuShare简介和环境安装 TuShare是一个著名的免费、开源的python财经数据接口包。其官网主页为:TuShare -财经数据接口包。该接口包如今提供了大量的金融数据,涵盖了股票、基本面、宏观、新闻的等诸多类别数据(具体请自行查看官网),并还在不断更新中。TuShare可以基本满足量化初学者的回测需求 环境安装: pip install tu...

关于Delphi XE2的FMX的一点点研究之消息篇

 Delphi XE2出来了一阵子了,里面比较抢眼的东西,除了VCLStyle这个换肤的东西之外,另外最让人眼亮的应该是FMX这个东西了。万一的博客上都连载了一票的关于FMX的使用心得了。我还是没咋去关注,因为技术这个东西,天天在变,跟着他跑,俺伤不起啊!直到今天,看了一下盒子,然后群中也有人说关于FMX在Windows下面如何来发送消息的问题,说发送不了...