rabbitmq系列——(2 多生产多消费)

摘要:
订单的多任务处理可以允许生产商横向扩展并支持多个服务器;生产者集群架构、消费者集群架构;在同一队列的情况下,消息被划分,并设置了平衡消费者:channel。BasicQos(0,1,false)//预取数量设置为1;设置false nuget:1。生产者使用RabbitMQMsgProducer.MessageProducer;使用Microsoft.Extensions.C

 订单多任务处理

 能够让生产者的横向扩展,支持多个服务器;  生产者集群架构,消费端集群架构;

 同一个队列的话,消息是被瓜分掉的

  设置均衡消费端:

  channel.BasicQos(0, 1, false);
  //预取数量设置为1个; 设置false

  rabbitmq系列——(2 多生产多消费)第1张

nuget :

rabbitmq系列——(2 多生产多消费)第2张

1. 生产者

using RabbitMQMsgProducer.MessageProducer;
using Microsoft.Extensions.Configuration;
using System;
using System.IO;

namespace RabbitMQMsgProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    //多生产者多消费者
                    //订单任务处理
                    IConfigurationRoot config = new ConfigurationBuilder()
                     .SetBasePath(Directory.GetCurrentDirectory())
                     .AddCommandLine(args)//支持命令行参数
                     .Build();

                    string strMinute = config["minute"];  //什么时候开始执行
                    string Num = config["num"]; //生产者编号 
                    int minute = Convert.ToInt32(strMinute);
                    bool flg = true;
                    while (flg)
                    {
                        if (DateTime.Now.Minute == minute)
                        {
                            Console.WriteLine($"到{strMinute}分钟,开始写入消息。。。");
                            flg = false;
                            MultiProducerMsg.Send(Num);
                        }
                    }
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}
using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQMsgProducer.MessageProducer
{
    public class MultiProducerMsg
    {
        public static void Send(string Num)
        {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = "localhost";
            connectionFactory.UserName = "guest";
            connectionFactory.Password = "guest";
            string exchangeName = "MultiProducerMsgExchange";
            string queueName = "MultiProducerMsgQueue";
            using (IConnection connection = connectionFactory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    // 声明 exchange 类型为 Direct,durable 持久化为true
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    // 声明 queue
                    channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    // 绑定
                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: string.Empty, arguments: null);
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine($"the producer {Num} is ready.");
                    while (true)
                    {
                        string msg = $"producer{Num} : message {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss fff")} .";
                        byte[] body = Encoding.UTF8.GetBytes(msg);
                        // 发送消息
                        channel.BasicPublish(exchange: exchangeName, routingKey: string.Empty, basicProperties: null, body: body);
                        Console.WriteLine($"the message : {msg} is send .");
                        Thread.Sleep(200);
                    }
                }
            }
        }
    }
}

2. 消费者

using RabbitMQMsgConsumer001.MessageConsumer;
using System;
using System.Threading.Tasks;

namespace RabbitMQMsgConsumer001
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // 多生产多消费
                    // 规模处理任务清单操作
                    Task.Run(() => { MultiConsumerMsg.Receive001(); });
                    Task.Run(() => { MultiConsumerMsg.Receive002(); });
                    Task.Run(() => { MultiConsumerMsg.Receive003(); });
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQMsgConsumer001.MessageConsumer
{
    public class MultiConsumerMsg
    {
        public static void Receive001()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "guest";
            factory.Password = "guest";
            string queueName = "MultiProducerMsgQueue";
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"the consumer 001 received: {message}");
                        };
                        channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }
                }
            }
        }

        public static void Receive002()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "guest";
            factory.Password = "guest";
            string queueName = "MultiProducerMsgQueue";
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"the consumer 002 received: {message}");
                        };
                        channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }
                }
            }
        }

        public static void Receive003()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "guest";
            factory.Password = "guest";
            string queueName = "MultiProducerMsgQueue";
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"the consumer 003 received: {message}");
                        };
                        channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }
                }
            }
        }
    }
}

3. 运行效果

dotnet RabbitMQMsgProducer.dll num=001 minute=22

dotnet RabbitMQMsgConsumer001.dll 

rabbitmq系列——(2 多生产多消费)第3张

免责声明:文章转载自《rabbitmq系列——(2 多生产多消费)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇基于WPF系统框架设计(3)-Fluent Ribbon界面布局UOS 如何安装xdroid【x86】下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

springCloud组件详细解析

1 springcloud有哪些组件?  Eureka 服务注册中心  Ribbon 负载均衡  Zuul 网关  Fegin 客户端Web  Hsytri 熔断器  Bus 消息总线  Config 统一配置中心 2 什么是自我保护模式  springCould的服务注册中心会监控微服务的心跳。如果检测到心跳,那么这个时候服务注册中心进入自我保护模式...

SAP PI 如何实现消息定义查询

XI/PI系统的查询在7.1以前一直是让人头痛的,很多PI顾问也遇到过类似的需求,客户需要能够按关键字查询消息(message),有时PI顾问自己也需要根据关键字段查找来确定问题出在哪一条消息上,但这对于7.1以前来说是非常困难的,我在blog中以前提到过通过report的查询payload,确定message NO,但这个很多次都失灵。 在PI7.1...

PHP消息队列实现及应用

参考:https://www.cnblogs.com/JAYIT/p/10579888.html 目前对消息队列并不了解其原理,本篇文章主要是通过慕课网学习归纳的一些笔记,为后续学习打下基础。 众所周知在对网站设计的时候,会遇到给用户“群发短信”,“订单系统有大量的日志”,“秒杀设计”等,服务器没法处理这种瞬间迸发的压力,这种情况要保证系统正常有效的使用,...

ubuntu下DNS原理及相关设置

1.DNS原理分析如下: 当 DNS 客户机需要查询程序中使用的名称时,它会查询本地DNS 服务器来解析该名称。客户机发送的每条查询消息都包括3条信息,以指定服务器应回答的问题。● 指定的 DNS 域名,表示为完全合格的域名 (FQDN) 。● 指定的查询类型,它可根据类型指定资源记录,或作为查询操作的专门类型。● DNS域名的指定类别。对于DNS 服务器...

Perl模式匹配

       Perl 内置的模式匹配让你能够简便高效地搜索大量的数据。不管你是在一个巨型的商业门户站点上用于扫描每日感兴趣的珍闻报道,还是在一个政府组织里用于精确地描述人口统计(或者人类基因组图),或是在一个教育组织里用于在你的 web 站点上生成一些动态信息,Perl 都是你可选的工具。这里的一部分原因是 Perl 的数据库联接能力,但是更重要的原因是...

Spring Cloud Stream

1 前言 在实际的企业开发中,消息中间件是至关重要的组件之一。消息中间件主要解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性的架构。不同的中间件其实现方式,内部结构是不一样的。如常见的RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,如RabbitMQ有Exchange,Kafka有Topic、Partitio...