.NET Core分布式事件总线、分布式事务解决方案:CAP

摘要:
消息队列使用RabbitMQ<PackageReferenceInclude=“DotNetCore.CAP”版本=“3.1.2”/>PackageReferenceInclude=“DotNetCore.CAP.SqlServer”版本=“3.1.2”/>
简介

CAP 是一个遵循 .NET Standard 标准库的C#库,用来处理分布式事务以及提供EventBus的功能,它具有轻量级,高性能,易使用等特点。
分布式事务是在分布式系统中不可避免的一个硬性需求,CAP 没有采用两阶段提交(2PC)这种事务机制,而是采用的 本地消息表+MQ 这种经典的实现方式,这种方式又叫做 异步确保。
CAP 实现了 EventBus 中的发布/订阅,它具有 EventBus 的所有功能。也就是说你可以像使用 EventBus 一样来使用 CAP,另外 CAP 的 EventBus 是具有高可用性的, CAP 借助于本地消息表来对 EventBus 中的消息进行了持久化,这样可以保证 EventBus 发出的消息是可靠的,当消息队列出现宕机或者连接失败的情况时,消息也不会丢失。

源码:
https://github.com/dotnetcore/CAP

目前, CAP 同时支持使用 RabbitMQ,Kafka,Azure Service Bus 等进行底层之间的消息发送,你不需要具备这些消息队列的使用经验,仍然可以轻松的集成到项目中。
CAP 目前支持使用 Sql Server,MySql,PostgreSql,MongoDB 数据库的项目。
CAP 同时支持使用 EntityFrameworkCore 和 ADO.NET 的项目,你可以根据需要选择不同的配置方式。
下面是CAP在系统中的一个不完全示意图:
.NET Core分布式事件总线、分布式事务解决方案:CAP第1张

图中实线部分代表用户代码,虚线部分代表CAP内部实现。

消息表

CAP框架会在数据库中添加两个表,以保证消息在任何情况下都会被成功发送、消费
cap.Published:消息在发送到队列之前会在此表中添加一条记录,防止特殊原因导致消息未成功发送到队列
cap.Received:在从队列接收到消息后在此表添加一条记录,CAP定时轮询从此表读取未成功消费的消息,交给订阅方法处理,直到成功消费(订阅方法内部不抛出异常即成功消费);同时也能防止消息被重复消费

CAP集成到项目

添加Nuget包

数据库使用Sqlserver,消息队列使用RabbitMQ

<PackageReference Include="DotNetCore.CAP" Version="3.1.2" />
<PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="3.1.2" />
<PackageReference Include="DotNetCore.CAP.SqlServer" Version="3.1.2" />

Startup添加配置

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllersWithViews();

            services.AddCap(option =>
            {
                //如果你使用的 EF 进行数据操作,你需要添加如下配置:
                //option.UseEntityFramework<AppDbContext>();  //可选项,你不需要再次配置 option.UseSqlServer 了

                //如果你使用的ADO.NET,根据数据库选择进行配置:
                option.UseSqlServer("Server=SC-202003151209\SL1;Database=CAPDB;User=sa;Password=123456;");
                //option.UseMySql("连接字符串")

                //CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作为MQ,根据使用选择配置:
                option.UseRabbitMQ(rabbitOption => {
                    rabbitOption.HostName = "xxxx";
                    rabbitOption.Port = 5672;
                    rabbitOption.Password = "xxxx";
                    rabbitOption.UserName = "xxxx";
                    rabbitOption.VirtualHost = "xxxx";
                });
            });
        }

仪表盘

仪表盘可以显示订阅列表、发送、接收、失败消息,并且可以操作消息的重复消费
添加Nuget:

<PackageReference Include="DotNetCore.CAP.Dashboard" Version="3.1.2" />

UseDashboard:

services.AddCap(option =>
{
option.UseDashboard();
}

访问仪表盘:http://xxxx/cap

发布事件



        /// <summary>
        /// 不使用事务
        /// </summary>
        /// <param name="capBus"></param>
        /// <returns></returns>
        public IActionResult Index1([FromServices] ICapPublisher capBus)
        {
            capBus.Publish("Meshop.PayService.Refund", new RefundMessage { OrderID = 1, RefundPrice = 100M });
            return View();
        }
        /// <summary>
        /// 使用事务,自动提交
        /// </summary>
        /// <param name="capBus"></param>
        /// <returns></returns>
        public IActionResult Index2([FromServices] ICapPublisher capBus)
        {
            using (var conn = new SqlConnection("Server=SC-202003151209\SL1;Database=CAPDB;User=sa;Password=123456;"))
            {
                using (var tran = conn.BeginTransaction(capBus, true))
                {
                    var orderMaster = new OrderMaster { OrderState = 1 };
                    long id = conn.Insert(orderMaster, tran);
                    capBus.Publish("Meshop.PayService.Refund", new RefundMessage { OrderID = 1, RefundPrice = 100M });
                }
            }
            return View();
        }

订阅事件

如果是在Controller中,直接添加[CapSubscribe("")]来订阅相关消息。


        [CapSubscribe("Meshop.PayService.Refund")]
        public Task Refund(RefundMessage message)
        {
            return Task.CompletedTask;
        }

如果你的方法没有位于Controller 中,那么你订阅的类需要继承 ICapSubscribe,然后添加[CapSubscribe("")]标记:

namespace xxx.Service
{
    public interface ISubscriberService
    {
    	public void CheckReceivedMessage(DateTime time);
    }
    
    
    public class SubscriberService: ISubscriberService, ICapSubscribe
    {
    	[CapSubscribe("xxxx.services.show.time")]
    	public void CheckReceivedMessage(DateTime time)
    	{
    		
    	}
    }
}

然后在 Startup.cs 中的 ConfigureServices() 中注入你的 ISubscriberService

public void ConfigureServices(IServiceCollection services)
{
    services.AddTransient<ISubscriberService,SubscriberService>();
}

如何接收返回参数

在调用Publish方法发布消息时传一个callbackName参数,并添加订阅,异步接收返回数据,如下:

public IActionResult Index1([FromServices] ICapPublisher capBus)
        {
            capBus.Publish("Meshop.PayService.Refund", new RefundMessage { OrderID = 1, RefundPrice = 100M }, "Meshop.OrderService.Return");
            return View();
        }
[CapSubscribe("Meshop.OrderService.Return")]
        public IActionResult IndexReturn(RefundMessage message)
        {
            return null;
        }

版本号


参考:
https://www.cnblogs.com/cmliu/p/11767343.html
https://www.cnblogs.com/savorboard/p/cap.html

免责声明:文章转载自《.NET Core分布式事件总线、分布式事务解决方案:CAP》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇多表关联! AHOI/HNOI2017大佬下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

用Go写Windows桌面应用 使用Form

几个月以前看到了Go的消息,读完入门PPT之后,便有种感觉,这就是我想象中的语言。语法简单,写起来手感极好,设计则处处透着简洁。 随后便开始用它代替python写一些常用小工具,堪称得心应手。几个月以后,日益离不开了,再用别的语言时总觉得缺了点什么。 我很喜欢写桌面应用,之前一直用C++,但实在不喜欢它那复杂的设计,而且也没有找到合心意的UI库。此时心里冒...

supervisor 的使用 (fastcgi管理)

本文主要介绍 supervisor 对 fastcgi 进程的管理 fastcgi 进程的管理 在php 中,php-fpm 有主进程来管理和维护子进程的数量。但是并不是所有的服务都有类似的主进程来做子进程的维护。 在很多其他语言中,有很多比较有名的fastcgi 服务,例如py 的flup, c++ 实现的 FastCgi++等。如果这些服务在单机中启动...

[SQL Server] 数据库日志文件自动增长导致连接超时的分析

1、现象、问题描述 客户反映某客户端登陆不了,客户端程序日志显示“连接数据库超时”;检查对应的数据库服务器,日志显示“Autogrow of file '某数据库日志文件' in database '某数据库' was cancelled by user or timed out after 2391 milliseconds. Use ALTER DAT...

使用MQTTnet部署MQTT服务

使用MQTTnet部署MQTT服务   下载地址:https://github.com/chkr1011/MQTTnet 引用地址:https://www.cnblogs.com/zhaoqm999/p/12960677.html 一. 服务端 1. 创建配置参数 可以使用 `var options = new MqttServerOptions();`...

HTTPS SSL 中间人劫持攻击原理

中间人攻击(Man-in-the-Middle Attack, MITM)就是通过拦截正常的网络通信数据,并进行数据篡改和嗅探,而通信的双方却毫不知情。 假设爱丽丝(Alice)希望与鲍伯(Bob)通信。同时,马洛里(Mallory)希望拦截窃会话以进行窃听并可能在某些时候传送给鲍伯一个虚假的消息。 首先,爱丽丝会向鲍勃索取他的公钥。如果Bob将他的公钥发...

Filebeat命令参考

 Filebeat命令参考: Filebeat提供了一个命令行界面,用于启动Filebeat并执行常见任务,例如测试配置文件和加载仪表板。命令行还支持用于控制全局行为的全局标志。 命令: export 将配置或索引模板导出到stdout。 help 显示任何命令的帮助。 keystore 管...