DotNetCore.CAP

摘要:
运行以下命令以安装CAPPM>在项目中//选择并安装您正在使用的数据库>根据需要;安装PackageDotNetCore.CAP.MySqlPM>服务。AddCap(x=>x.UsePostgreSql(“数据库连接字符串”);安装PackageDotNetCore.CAP.Dashboardservices.AddCap(x=>

CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。

在我们构建 SOA 或者 微服务系统的过程中,我们通常需要使用事件来对各个服务进行集成,在这过程中简单的使用消息队列并不能保证数据的最终一致性, CAP 采用的是和当前数据库集成的本地消息表的方案来解决在分布式系统互相调用的各个环节可能出现的异常,它能够保证任何情况下事件消息都是不会丢失的。

你同样可以把 CAP 当做 EventBus 来使用,CAP提供了一种更加简单的方式来实现事件消息的发布和订阅,在订阅以及发布的过程中,你不需要继承或实现任何接口。

 

CAP集在ASP.NET Core 微服务架构中的一个示意图:

架构预览

DotNetCore.CAP第1张

新建项目添加以下包:

运行以下下命令在你的项目中安装 CAP

PM> Install-Package DotNetCore.CAP

CAP 支持 Kafka、RabbitMQ、AzureServiceBus 等消息队列,可以按需选择下面的包进行安装:

PM> Install-Package DotNetCore.CAP.Kafka

PM> Install-Package DotNetCore.CAP.RabbitMQ

PM> Install-Package DotNetCore.CAP.AzureServiceBus

CAP 提供了 Sql Server, MySql, PostgreSQL,MongoDB 的扩展作为数据库存储:

// 按需选择安装你正在使用的数据库

PM> Install-Package DotNetCore.CAP.SqlServer

PM> Install-Package DotNetCore.CAP.MySql

PM> Install-Package DotNetCore.CAP.PostgreSql

PM> Install-Package DotNetCore.CAP.MongoDB

Configuration

首先配置CAP到 Startup.cs 文件中,如下:

public void ConfigureServices(IServiceCollection services)
{
    ......

    services.AddDbContext<AppDbContext>();

    services.AddCap(x =>
    {
        //如果你使用的 EF 进行数据操作,你需要添加如下配置:
        x.UseEntityFramework<AppDbContext>();  //可选项,你不需要再次配置 x.UseSqlServer 了
		
        //如果你使用的ADO.NET,根据数据库选择进行配置:
        x.UseSqlServer("数据库连接字符串");
        x.UseMySql("数据库连接字符串");
        x.UsePostgreSql("数据库连接字符串");


        //如果你使用的 MongoDB,你可以添加如下配置:
        x.UseMongoDB("ConnectionStrings");  //注意,仅支持MongoDB 4.0+集群
	
        //CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作为MQ,根据使用选择配置:
        x.UseRabbitMQ("ConnectionStrings");
        x.UseKafka("ConnectionStrings");
        x.UseAzureServiceBus("ConnectionStrings");
    });
Dashboard:可选安装

CAP 2.1+ 以上版本中提供了仪表盘(Dashboard)功能,你可以很方便的查看发出和接收到的消息。除此之外,你还可以在仪表盘中实时查看发送或者接收到的消息。

使用一下命令安装 Dashboard:

PM> Install-Package DotNetCore.CAP.Dashboard

services.AddCap(x =>
{
    //...
    
    // 注册 Dashboard
    x.UseDashboard();
    
    // 注册节点到 Consul
    x.UseDiscovery(d =>
    {
        d.DiscoveryServerHostName = "localhost";
        d.DiscoveryServerPort = 8500;
        d.CurrentNodeHostName = "localhost";
        d.CurrentNodePort = 5800;
        d.NodeId = 1;
        d.NodeName = "CAP No.1 Node";
    });
});

仪表盘默认的访问地址是:http://localhost:xxx/cap,你可以在d.MatchPath配置项中修改cap路径后缀为其他的名字。

发布

在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 进行消息发送

public class PublishController : Controller
{
    private readonly ICapPublisher _capBus;


    public PublishController(ICapPublisher capPublisher)
    {
        _capBus = capPublisher;
    }
    
    //不使用事务
    [Route("~/without/transaction")]
    public IActionResult WithoutTransaction()
    {
        _capBus.Publish("xxx.services.show.time", DateTime.Now);
	
        return Ok();
    }


    //Ado.Net 中使用事务,自动提交
    [Route("~/adonet/transaction")]
    public IActionResult AdonetWithTransaction()
    {
        using (var connection = new MySqlConnection(ConnectionString))
        {
            using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
            {
                //业务代码


                _capBus.Publish("xxx.services.show.time", DateTime.Now);
            }
        }
        return Ok();
    }

    //EntityFramework 中使用事务,自动提交
    [Route("~/ef/transaction")]
    public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
    {
        using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
        {
            //业务代码


            _capBus.Publish("xxx.services.show.time", DateTime.Now);
        }
        return Ok();
    }
}

订阅

Action Method

在 Action 上添加 CapSubscribeAttribute 来订阅相关消息。


public class PublishController : Controller
{
    [CapSubscribe("xxx.services.show.time")]
    public void CheckReceivedMessage(DateTime datetime)
    {
        Console.WriteLine(datetime);
    }
}


Service Method


如果你的订阅方法没有位于 Controller 中,则你订阅的类需要继承 ICapSubscribe:


namespace xxx.Service
{
    public interface ISubscriberService
    {
        public void CheckReceivedMessage(DateTime datetime);
    }


    public class SubscriberService: ISubscriberService, ICapSubscribe
    {
        [CapSubscribe("xxx.services.show.time")]
        public void CheckReceivedMessage(DateTime datetime)
        {
  

然后在 Startup.cs 中的 ConfigureServices() 中注入你的 ISubscriberService 类

public void ConfigureServices(IServiceCollection services)


{


//注意: 注入的服务需要在 `services.AddCap()` 之前


services.AddTransient<ISubscriberService,SubscriberService>();

services.AddCap(x=>{});

订阅者组

订阅者组的概念类似于 Kafka 中的消费者组,它和消息队列中的广播模式相同,用来处理不同微服务实例之间同时消费相同的消息。

当CAP启动的时候,她将创建一个默认的消费者组,如果多个相同消费者组的消费者消费同一个Topic消息的时候,只会有一个消费者被执行。 相反,如果消费者都位于不同的消费者组,则所有的消费者都会被执行。

相同的实例中,你可以通过下面的方式来指定他们位于不同的消费者组。

[CapSubscribe("xxx.services.show.time", Group = "group1" )]
public void ShowTime1(DateTime datetime)
{
}

[CapSubscribe("xxx.services.show.time", Group = "group2")]
public void ShowTime2(DateTime datetime)
{
}

ShowTime1 和 ShowTime2 处于不同的组,他们将会被同时调用。

PS,你可以通过下面的方式来指定默认的消费者组名称:

services.AddCap(x =>
{
    x.DefaultGroup = "default-group-name";  
});

免责声明:文章转载自《DotNetCore.CAP》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇SAS | 数据读入思路及代码安装XP和Ubuntu双系统问题——Ubuntu安装时无法识别原有系统下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

使用MQTTBox连接MindSphere IoT Extention

==静态模板== https://cumulocity.com/guides/device-sdk/mqtt/#mqtt-static-templates  ==创建连接== 打开MQTTBox,在Client创建界面中输入相应信息: 【MQTT Client Name】Client识别名 【MQTT Client Id】可使用 “Generate a r...

Netty游戏服务器之六服务端登录消息处理

客户端unity3d已经把消息发送到netty服务器上了,那么ServerHandler类的public void channelRead(ChannelHandlerContext ctx, Object msg) 就会触发, 所有我们在这里吧消息发送至各自处理的类,这里呢我根据不同的消息类型,定义了不同的消息分派类。如login消息就制定LoingDi...

使用MQTTnet部署MQTT服务

使用MQTTnet部署MQTT服务   下载地址:https://github.com/chkr1011/MQTTnet 引用地址:https://www.cnblogs.com/zhaoqm999/p/12960677.html 一. 服务端 1. 创建配置参数 可以使用 `var options = new MqttServerOptions();`...

RocketMQ 消息偏移量 Offset 和 CommitLog

消息偏移量 Offset 概念 message queue 是无限长的数组,一条消息进来下标就会涨1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。 message queue 中的 maxOffset 表示消息的最大 offset,...

Spring Cloud Stream学习笔记

目录 1 环境 2 简介 3 初见 1 创建项目 添加web rabbitmq stream依赖 2 rabbitmq配置 3 消息接收器 4 在rabbitmq中发送消息 5 查看结果 4 自定义消息通道 1 自定义接口 2 自定义接收器 3 controller进行测试 4 消息输入输出(通道对接) 5 启动、访问 5 消息分组 1...

SpringBoot+RabbitMQ学习笔记(二)使用RabbitMQ的三种交换器之Direct

一丶简介 Direct Exchange 处理路由键。需要将一个队列绑定到交换器上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换器上要求路由键 “test”,则只有被标记为“test”的消息才被转发,不会转发test.aaa,也不会转发dog.123,只会转发test。  业务场景,系统日志处理场景: 1.微服务产生日...