Dapr发布/订阅

摘要:
业务系统只需调用跟据Dapr的要求实现订阅发布即可。下图显示了Dapr发布/订阅消息传递堆栈。消息格式要启用消息路由并为每个消息提供附加上下文,Dapr使用CloudEvents1.0规范作为其消息格式。route告诉Dapr将所有主题消息发送到应用程序中的/TestPubSub端点。pub/sub主题作用域限定为每个pub/sub组件定义发布/订阅范围。

前言

前篇文章对Dapr的状态管理进行了解,本篇继续对 订阅/发布 构建块进行了解。

一、定义:

发布订阅的概念来自于事件驱动架构(EDA)的设计思想,这是一种让程序(应用、服务)之间解耦的主要方式,通过发布订阅的思想也可以实现服务之间的异步调用。而大部分分布式应用都会依赖这样的发布订阅解耦模式。

Dapr发布/订阅第1张

步骤:

  1. 发布服务器将消息发送到消息代理。
  2. 订阅服务器将绑定到消息代理上的订阅。
  3. 消息代理将消息的副本转发给感兴趣的订阅。
  4. 订阅服务器从其订阅使用消息。

但是不同的消息中间件之间存在细微的差异,项目使用不同的产品需要实现不同的实现类,虽然是明智的决策,但必须编写和维护抽象及其基础实现。此方法需要复杂、重复且容易出错的自定义代码。

Dapr为了解决这种问题,提供开箱即用的消息传送抽象和实现,封装在 Dapr 构建基块中。业务系统只需调用跟据Dapr的要求实现订阅发布即可。

二、工作原理:

Dapr 发布&订阅构建基块提供了平台无关的 API 框架来发送和接收消息。你的服务将消息发布到一个命名主题(topic)。服务订阅主题(topic)来使用消息。

服务在 Dapr Sidecar上调用 pub/sub API。 然后,Sidecar将调用一个预定义的 Dapr pub/sub 组件来封装特定的消息代理产品。 下图 显示了 Dapr 发布/订阅 消息传递堆栈。

Dapr发布/订阅第2张

三、功能:

  • 发布/订阅API

Dapr 发布&订阅构建基块提供了一个与平台无关的 API 框架来发送和接收消息。

服务将消息发布到指定主题, 业务服务订阅主题以使用消息。

服务在 Dapr sidecar 上调用 pub/sub API。然后,sidecar 调用预定义 Dapr pub/sub 组件。

任何编程平台都可以使用 Dapr 本机 API 通过 HTTP 或 gRPC 调用构建基块。若要发布消息,请进行以下 API 调用:

http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>

上述调用中有几个特定于 Dapr 的 URL 段:

    • <dapr-port>提供 Dapr sidecar 侦听的端口号。

    • <pub-sub-name>提供所选 Dapr pub/sub 组件的名称。

    • <topic>提供消息发布到的主题的名称。

  • 消息格式

要启用消息路由并为每个消息提供附加上下文,Dapr 使用CloudEvents 1.0 规范作为其消息格式。 使用 Dapr 应用程序发送的任何信息都将自动包入 Cloud Events 信封中,datacontenttype属性使用Content-Type头部值。

Dapr 实现以下 Cloud Events 字段:

    • id
    • source
    • specversion
    • type
    • datacontenttype(可选)

下面的示例显示了 CloudEvent v1.0 中序列化为 JSON 的 XML 内容:

{
    "specversion" : "1.0",
    "type" : "xml.message",
    "source" : "https://example.com/message",
    "subject" : "Test XML Message",
    "id" : "id-1234-5678-9101",
    "time" : "2020-09-23T06:23:21Z",
    "datacontenttype" : "text/xml",
    "data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>"}
  • 订阅消息

Dapr 应用程序可以订阅已发布的 topics。 Dapr 允许您的应用程序有两种方法来订阅 topics:

声明式:其中定义在外部文件中:

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  topic: test_topic  //主题
  route: /TestPubSub //路由
pubsubname: pubsub //名称
scopes:
- frontend       //为该应用启用订阅

上面的示例显示了test_topic主题的事件订阅,使用组件pubsub

    • route告诉 Dapr 将所有主题消息发送到应用程序中的/TestPubSub端点。

    • scopes为 frontend 应用启用订阅

编程方式:订阅在用户代码中定义

  • 消息传递

Dapr 保证消息传递 at-least-once 语义。 这意味着,当应用程序使用发布/订阅 API 将消息发布到主题时,Dapr 可确保此消息至少传递给每个订阅者一次(at least once)

  • 消费者群体和竞争行消费者模式

多个消费组、多个应用程序实例使用一个消费组,这些都将由 Dapr 自动处理。 当同一个应用程序的多个实例(相同的 ID) 订阅主题时,Dapr 只将每个消息传递给该应用程序的一个实例。

Dapr发布/订阅第3张

同样,如果两个不同的应用程序 (不同的 ID) 订阅同一主题,那么 Dapr 将每个消息仅传递到每个应用程序的一个实例。

  • Topic作用域:

默认情况下,支持Dapr发布/订阅组件的所有主题 (例如,Kafka、Redis、RabbitMQ) 都可用于配置该组件的每个应用程序。 为了限制哪个应用程序可以发布或订阅 topic,Dapr 提供了 topic 作用域限定。 这使您能够让应用程序允许发布哪些主题以及应用程序允许订阅哪些主题。

pub/sub 主题作用域限定

为每个 pub/sub 组件定义发布/订阅范围。 您可能有一个名为pubsub的 pub/sub 组件,它有一组范围设置,另一个pubsub2另有一组范围设置。

要使用这个主题范围,可以设置一个 pub/sub 组件的三个元数据属性:

    • spec.metadata.publishingScopes
      • 分号分隔应用程序列表& 逗号分隔的主题列表允许该 app 发布信息到主题列表
      • 如果在publishingScopes(缺省行为) 中未指定任何内容,那么所有应用程序可以发布到所有主题
      • 要拒绝应用程序发布信息到任何主题,请将主题列表留空 (app1=;app2=topic2)
      • 例如,app1=topic1;app2=topic2,topic3;app3=允许 app1 发布信息至 topic1 ,app2 允许发布信息到 topic2 和 topic3 ,app3 不允许发布信息到任何主题。
    • spec.metadata.subscriptionScopes
      • 分号分隔应用程序列表& 逗号分隔的主题列表允许该 app 订阅主题列表
      • 如果在subscriptionScopes(缺省行为) 中未指定任何内容,那么所有应用程序都可以订阅所有主题
      • 例如,app1=topic1;app2=topic2,topic3允许 app1 订阅 topic1 ,app2 可以订阅 topic2 和 topic3
    • spec.metadata.allowedTopics
      • 一个逗号分隔的允许主题列表,对所有应用程序。
      • 如果未设置allowedTopics(缺省行为) ,那么所有主题都有效。subscriptionScopespublishingScopes如果存在则仍然生效。
      • publishingScopessubscriptionScopes可用于与allowedTopics的 conjuction ,以添加限制粒度
  • 消息生存时间:

Dapr 可以在每个消息的基础上设置超时。 表示如果消息未从 Pub/Sub 组件读取,则消息将被丢弃。 这是为了防止未读消息的积累。 在队列中超过配置的 TTL 的消息就可以说它挂了。  

四、.NET Core 应用

1、设置Pub/Sub组件:

本机默认下安装了Redis Staram,在 Windows 上打开%UserProfile%\.dapr\components\pubsub.yaml组件文件以验证:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""

2、实现发布/订阅功能 :

添加控制器(PubSubController)

[Route("api/[controller]")]
[ApiController]
public classPubSubController : ControllerBase
{
    private readonly ILogger<PubSubController>_logger;
    private readonlyDaprClient _daprClient;
    public PubSubController(ILogger<PubSubController>logger, DaprClient daprClient)
    {
        _logger =logger;
        _daprClient =daprClient;
    }

    /// <summary>
    ///发布消息
    /// </summary>
    /// <returns></returns>
    [HttpGet("pub")]
    public async Task<ActionResult>PubAsync()
    {
        var data = new WeatherForecast() { Summary = "city", Date =DateTime.Now };
        await _daprClient.PublishEventAsync<WeatherForecast>("pubsub", "test_topic", data);
        returnOk();
    }

    /// <summary>
    ///消费消息
    /// </summary>
    /// <returns></returns>
    [Topic("pubsub", "test_topic")]
    [HttpPost("sub")]
    public async Task<ActionResult>Sub()
    {
        Stream stream =Request.Body;
        byte[] buffer = new byte[Request.ContentLength.Value];
        stream.Position = 0L;
        await stream.ReadAsync(buffer, 0, buffer.Length);
        string content =Encoding.UTF8.GetString(buffer);
        _logger.LogInformation("testsub" +content);
        returnOk(content);
    }
}

Startup.cs中调整:

public classStartup
{
    publicStartup(IConfiguration configuration)
    {
        Configuration =configuration;
    }
    public IConfiguration Configuration { get; }
    //This method gets called by the runtime. Use this method to add services to the container.
    public voidConfigureServices(IServiceCollection services)
    {
        //注入Dapr
services.AddControllers().AddDapr();
    }
    //This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public voidConfigure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        //使用CoudEvent
app.UseCloudEvents();
        if(env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }
        app.Use((context, next) =>{
            context.Request.EnableBuffering();
            returnnext();
        });
        app.UseRouting();
        app.UseAuthorization();
        app.UseEndpoints(endpoints =>{
            endpoints.MapControllers()
            //订阅处理
endpoints.MapSubscribeHandler();;
        });
    }
}

3、dapr运行程序:

dapr run --dapr-http-port 3501 --app-port 5001  --app-id frontend dotnet  .\FrontEnd.dll

4、调用发布命令:

http://127.0.0.1:3501/v1.0/invoke/frontend/method/api/pubsub/pub

5、通过Dapr cli 发布消息:

dapr publish --publish-app-id frontend --pubsub pubsub --topic test_topic --data '{"date":"0001-01-01T00:00:00","temperatureC":0,"temperatureF":32,"summary":null}'

总结

pub/sub 模式可帮助你分离分布式应用程序中的服务。 Dapr 发布&订阅构建基块简化了在应用程序中实现此行为。

通过 Dapr pub/sub,可以将消息发布到特定主题。构建基块还将查询服务,以确定 (订阅) 主题。

可以通过 HTTP 或特定于语言的 SDK 之一(例如用于 Dapr 的 .NET SDK)本机使用 Dapr pub/sub。 .NET SDK 与 ASP.NET 平台紧密集成。

使用 Dapr,可以将受支持的消息代理产品插入应用程序。 然后,无需更改应用程序的代码,即可交换消息代理。

免责声明:文章转载自《Dapr发布/订阅》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Kali学习笔记36:AVWS10的使用Appium小试下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

使用VBA数组公式——Excel之VBA(11)

一、 认识VBA数组及常用操作 引例:计算所属区域的总金额 Sub test() Dim i, k Dim t t = Timer '获取时间值 For i = 2 To 200000 If Range("g" & i) = Range("n5") Then k = k + Range("j" & i)...

RocketMQ(一)原理和实战!

一、RocketMQ的安装 1、文档 官方网站 http://rocketmq.apache.org GitHub https://github.com/apache/rocketmq 2、下载 wget https://mirror.bit.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-relea...

在ASP.NET 2.0中使用样式、主题和皮肤

ASP.NET 2.0的主题和皮肤特性使你能够把样式和布局信息存放到一组独立的文件中,总称为主题(Theme)。接下来我们可以把这个主题应用到任何站点,用于改变该站点内的页面和控件的外观和感觉。通过改变主题的内容,而不用改变站点的单个页面,就可以轻易地改变站点的样式。主题也可以在开发者之间共享。 ASP.NET包含了大量的用于定制应用程序的页面和控件的外观...

Kafka提交offset机制

在kafka的消费者中,有一个非常关键的机制,那就是offset机制。它使得Kafka在消费的过程中即使挂了或者引发再均衡问题重新分配Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。它好比看一本书中的书签标记,每次通过书签标记(offset)就能快速找到该从哪里开始看(消费)。 Kafka对于offset的处理有两种提交方式:(1) 自...

[CSAPP-II] 链接[符号解析和重定位] 静态链接 动态链接 动态链接接口

1 平台 1.1 硬件 Table 1. 硬件(lscpu) Architecture: i686(Intel 80386) Byte Order: Little Endian 1.2 操作系统 Table 2. 操作系统类型 操作系统(cat /proc/version) 位数(uname -a) Linux version 3....

有赞 Flink 实时任务资源优化探索与实践

简介: 目前有赞实时计算平台对于 Flink 任务资源优化探索已经走出第一步。 随着 Flink K8s 化以及实时集群迁移完成,有赞越来越多的 Flink 实时任务运行在 K8s 集群上,Flink K8s 化提升了实时集群在大促时弹性扩缩容能力,更好的降低大促期间机器扩缩容的成本。同时,由于 K8s 在公司内部有专门的团队进行维护, Flink K8s...