基于Abp VNext框架设计

摘要:
当前UnitOfWork完成后,触发IEventBus的PublishAsync将在无事务环境中同步调用IEventBus中的PublishAsync。默认情况下,实现基于RabbitMq消息队列Volo.Abb。EventBus RabbitMQ实现分布式消息的发布和订阅。基于abp默认实现的DistributedEventBus无法满足以下场景:发布者生产者无法保证消息将被传递到MQ。我们引入Masstransit来提高abp的消息管理能力。事务活动补偿机制消息审核消息管道处理机制Abp框架下的事件消息集成使用MassTransit重新实现IDistributedEventBus。将用户身份信息传递给消费者。使用Asp-NetCoreWebHost作为消费者主机。初始化模块后,将注入并启动集成的MassTransit实例。

abp 通过IDistributedEventBus接口集成自IEventBus实现分布式事件消息的发布订阅。

IEventBus在什么时机触发PublishAsync?

  1. 当前UnitOfWork完成时,触发IEventBusPublishAsync
  2. 在没有事务环境下,同步调用IEventBusPublishAsync

abp 默认实现基于RabbitMq消息队列Volo.Abp.EventBus.RabbitMQ实现分布式消息的发布与订阅。

消息治理核心问题:

  1. 生产端如何保证投递成功的消息不能丢失。
  2. Mq自身如何保证消息不丢失。
  3. 消费段如何保证消费端的消息不丢失。

基于abp 默认实现的DistributedEventBus不能满足以下场景:

  1. Publisher 生产者无法保证消息一定能投递到MQ。
  2. Consumer 消费端在消息消费时,出现异常时,没有异常错误处理机制(确保消费失败的消息能重新被消费)。

我们引入Masstransit,来提升abp对消息治理能力。

Masstransit提供以下开箱即用功能:

  1. Publish/Send/Request-Response等几种消息投递机制。
  2. 多种IOC容器支持。
  3. 异常机制。
  4. Saga事务管理。
  5. 事务活动补偿机制(Courier)
  6. 消息审计
  7. 消息管道处理机制

Abp 框架下事件消息集成

  1. 使用MassTransit重新实现IDistributedEventBus
  2. 在消费端Consumer传递用户身份信息。
  3. 使用Asp.Net Core Web Host 作消费端Consumer宿主。

集成MassTransit

在Module初始化时,注入MassTransit实例,并启动。

/// <summary>
/// 配置DistributedEventBus
/// </summary>
/// <param name="context"></param>
/// <param name="configuration"></param>
/// <param name="hostingEnvironment"></param>
private void ConfigureDistributedEventBus(ServiceConfigurationContext context, IConfiguration configuration, IWebHostEnvironment hostingEnvironment)
{
    var options = context.Services.GetConfiguration().GetSection("Rabbitmq").Get<MassTransitEventBusOptions>();

    var mqConnectionString = "rabbitmq://" + options.ConnectionString;


    context.Services.AddMassTransit(mtConfig =>
    {
        //inject consumers into IOC from assembly
        mtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));


        mtConfig.AddBus(provider =>
        {
            var bus = Bus.Factory.CreateUsingRabbitMq(mqConfig =>
                {
                    var host = mqConfig.Host(new Uri(mqConnectionString), h =>
                    {
                        h.Username(options.UserName);
                        h.Password(options.Password);
                    });
                // set special message serializer
                    mqConfig.UseBsonSerializer();

                    // integrated existed logger compontent
                    mqConfig.UseExtensionsLogging(provider.GetService<ILoggerFactory>());

                    mqConfig.ReceiveEndpoint(host, "authcenter-queue", q =>
                    {
                        //set rabbitmq prefetch count
                        q.PrefetchCount = 200;

                        //set message retry policy
                        q.UseMessageRetry(r => r.Interval(3, 100));

                        q.Consumer<SmsTokenValidationCreatedEventConsumer>(provider);
                        EndpointConvention.Map<SmsTokenValidationCreatedEvent>(q.InputAddress);

                    });

                    mqConfig.ReceiveEndpoint(host, "user-synchronization", q =>
                    {
                        //set rabbitmq prefetch count
                        q.PrefetchCount = 50;
                        //q.UseRateLimit(100, TimeSpan.FromSeconds(1));
                        //q.UseConcurrencyLimit(2);

                        //set message retry policy
                        q.UseMessageRetry(r => r.Interval(3, 100));

                        q.Consumer<UserSyncEventConsumer>(provider);
                        EndpointConvention.Map<UserSyncEvent>(q.InputAddress);
                    });

                    mqConfig.ConfigureEndpoints(provider);

                    mqConfig.UseAuditingFilter(provider, o =>
                    {
                        o.ReplaceAuditing = true;
                    });
                });

            // set authtication middleware for user identity
            bus.ConnectAuthenticationObservers(provider);

            return bus;
        });
    });
}

在MassTransit中,使用IBusControl接口 StartAsyncStopAsync 来启动或停止。

使用IPublishEndpoint重新实现IDistributedEventBus接口,实现与abp分布式事件总线集成。

 public class MassTransitDistributedEventBus : IDistributedEventBus, ISingletonDependency
    {
        private readonly IPublishEndpoint _publishEndpoint;


        //protected IHybridServiceScopeFactory ServiceScopeFactory { get; }
        protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }

        public MassTransitDistributedEventBus(
            IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,
            IPublishEndpoint publishEndpoint)
        {
            //ServiceScopeFactory = serviceScopeFactory;
            _publishEndpoint = publishEndpoint;
            DistributedEventBusOptions = distributedEventBusOptions.Value;
            //Subscribe(distributedEventBusOptions.Value.Handlers);
        }

        /*
        *  Not Implementation
        */
      

        public Task PublishAsync<TEvent>(TEvent eventData)
            where TEvent : class
        {
            return _publishEndpoint.Publish(eventData);
        }

        public Task PublishAsync(Type eventType, object eventData)
        {
            return _publishEndpoint.Publish(eventData, eventType);
        }
    }


到此,我们实现了MassTransit与Abp集成。

事件消息传递User Claims

在实际业务实现过程中,我们会用消息队列实现“削峰填谷”的效果。异步消息队列中传递用户身份信息如何实现呢?

我们先看看abp在WebApi中,如何确定当前用户?

ICurrentUser 提供当前User Claims抽象。而ICurrentUser依赖于ICurrentPrincipalAccessor,在Asp.Net core中利用HttpContext User 来记录当前用户身份。

在MassTransit中,利用IPublishObserver > IConsumeObserver 生产者/消费端的观察者,来实现传递已认证的用户Claims。

    /// <summary>
    /// 生产者传递当前用户Principal
    /// </summary>
 public class AuthPublishObserver : IPublishObserver
    {
        private readonly ICurrentPrincipalAccessor _currentPrincipalAccessor;
        private readonly IClaimsPrincipalFactory _claimsPrincipalFactory;

        public AuthPublishObserver(
            ICurrentPrincipalAccessor currentPrincipalAccessor,
            IClaimsPrincipalFactory claimsPrincipalFactory)
        {
            _currentPrincipalAccessor = currentPrincipalAccessor;
            _claimsPrincipalFactory = claimsPrincipalFactory;
        }

        public Task PrePublish<T>(PublishContext<T> context) where T : class
        {
            var claimsPrincipal = _claimsPrincipalFactory
                .CreateClaimsPrincipal(
                    _currentPrincipalAccessor.Principal
                    );

            if (claimsPrincipal != null)
            {
                context.Headers.SetAuthenticationHeaders(claimsPrincipal);
            }


            return TaskUtil.Completed;

        }
        public Task PostPublish<T>(PublishContext<T> context) where T : class => TaskUtil.Completed;
        public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => TaskUtil.Completed;

    }


    /// <summary>
    /// 消费端从MqMessage Heads 中获取当前用户Principal,并赋值给HttpContext
    /// </summary>
 public class AuthConsumeObserver : IConsumeObserver
    {
        private readonly IHttpContextAccessor _httpContextAccessor;
        private readonly IServiceScopeFactory _factory;


        public AuthConsumeObserver(IHttpContextAccessor httpContextAccessor, IServiceScopeFactory factory)
        {
            _httpContextAccessor = httpContextAccessor;
            _factory = factory;
        }

        public Task PreConsume<T>(ConsumeContext<T> context) where T : class
        {
            if (_httpContextAccessor.HttpContext == null)
            {
                _httpContextAccessor.HttpContext = new DefaultHttpContext
                {
                    RequestServices = _factory.CreateScope().ServiceProvider
                };
            }

            var abpClaimsPrincipal = context.Headers.TryGetAbpClaimsPrincipal();

            if (abpClaimsPrincipal != null && abpClaimsPrincipal.IsAuthenticated)
            {
                var claimsPrincipal = abpClaimsPrincipal.ToClaimsPrincipal();

                _httpContextAccessor.HttpContext.User = claimsPrincipal;
                Thread.CurrentPrincipal = claimsPrincipal;
            }

            return TaskUtil.Completed;
        }

        public Task PostConsume<T>(ConsumeContext<T> context) where T : class
        {
            _httpContextAccessor.HttpContext = null;

            return TaskUtil.Completed;
        }

        public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
        {
            _httpContextAccessor.HttpContext = null;

            return TaskUtil.Completed;
        }
    }

使用Asp.Net Core Web Host 作消费端Consumer宿主

基于以下几点原因,我们使用Asp.Net Core Web Host 作为消息端Consumer宿主

  1. 部署在Linux环境下,Asp.Net Core Web Host 通常使用守护进程来启动服务实例,这样可以保证服务不被中断。
  2. 根据abp vnext DDD 项目分层,最大程度利用Application层应用方法,复用abp vnext 框架机制。

MassTransit 深入研究

  1. 延迟消息
  2. 限流熔断降级
  3. 批量消费
  4. Saga
References
  1. abp vnext disctributed event bus
  2. MassTransit

免责声明:文章转载自《基于Abp VNext框架设计》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Cmd不能运行,窗口闪一下就消失点双连通分量与割点下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

由于出现操作系统错误 3,进程无法读取文件D:XXXXX.pre (源: MSSQL_REPL,错误号: MSSQL_REPL20024)

最近着手做SqlServer2008的订阅发布,起初使用推送订阅很顺利,后来改成请求订阅出现了以下问题,折腾好长时间终于搞定,留下此文备日后查阅,或供遇相同问题的道友参考: 首先阐述以下问题: 1. 错误消息: 由于出现操作系统错误 3,进程无法读取文件“C:Program FilesMicrosoft SQL ServerMSSQL10_50.MSSQ...

javascript书籍集合

对于前端书籍的搜集,我是有强迫症的,就一个态度:我全都要! 下面是干货,关注公众号:撩撩前端,回复相应消息码,就可以获取对应电子书。 JavaScript.DOM高级程序设计 关注公众号"撩撩前端",回复消息:026557,获取电子书籍 JavaScript网页特效范例宝典 关注公众号"撩撩前端",回复消息:408289,获取电子书籍 JavaScript...

redis的pub/sub命令

Redis 发布订阅 Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。 Redis 客户端可以订阅任意数量的频道。 下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系: 当有新消息通过 PUBLISH...

RocketMQ源码 — 八、 RocketMQ消息重试

RocketMQ的消息重试包含了producer发送消息的重试和consumer消息消费的重试。 producer发送消息重试 producer在发送消息的时候如果发送失败了,RocketMQ会自动重试。 private SendResult sendDefaultImpl( Message msg, final Communication...

重新整理 .net core 实践篇—————中间件[十九]

前言 简单介绍一下.net core的中间件。 正文 官方文档已经给出了中间件的概念图: 和其密切相关的是下面这两个东西: IApplicationBuilder 和 RequestDelegate(HttpContext context) IApplicationBuilder : public interface IApplicationBuilde...

filebeat对接kafka

工作中遇到了,filebeat对接kafka,记下来,分享一下,也为了防止自己忘记 对于filebeat是运行在客户端的一个收集日志的agent,filebeat是一个耳朵进一个耳朵出,进的意思是监听哪个日志文件,出的意思是监听的日志内容输出到哪里去,当然,这里我们输出到kafka消息队列中,而kafka就是一个消息队列,为什么要用kafka?因为现在用的...