EMQ X 规则引擎系列(十四)- 桥接消息到 Pulsar

摘要:
Pulsar消息系统简介Apache Pulsar是一个企业级发布和订阅消息系统。Pulsar旨在取代Apache Kafka多年来的主导地位。有关Pulsar的安装和使用,请参见Pulsar官方网站。有关Pulsar的更多信息以及桥接方案的比较,请参见以下内容:Match Kafka,新的大数据分析明星Pulsar。描述需要在EMQX指定的主题下将合格消息桥接至Pulsar的场景。添加响应操作。在PulsarSQL条件的输入和输出中桥接消息正确后,我们继续添加相应的操作,配置SQL语句的写入,并将过滤结果桥接到Pulsar。

Pulsar 消息系统介绍

Apache Pulsar 是一个企业级的发布订阅(pub-sub)消息系统,Pulsar 旨在取代 Apache Kafka 多年的主宰地位。Pulsar 在很多情况下提供了比 Kafka 更快的吞吐量和更低的延迟,并为开发人员提供了一组兼容的 API。

Pulsar 将高性能的流和灵活的传统队列结合到一个统一的消息模型和 API 中,实现流处理与队列处理同步进行。

Pulsar 的安装与使用详见 Pulsar 官网 ,更多 Pulsar 介绍信息与桥接方案对比详见:比拼 Kafka, 大数据分析新秀 Pulsar 到底好在哪

场景介绍

该场景需要将 EMQ X 指定主题下且满足条件的消息桥接到 Pulsar 。为了便于后续分析检索,消息内容需要进行拆分。

该场景下设备端上报信息如下:

  • 上报主题:cmd/state/:id,主题中 id 代表车辆客户端识别码

  • 消息体:

    {
      "id": "NXP-058659730253-963945118132721-22", // 客户端识别码
      "speed": 32.12, // 车辆速度
      "direction": 198.33212, // 行驶方向
      "tachometer": 3211, // 发动机转速,数值大于 8000 时才需存储
      "dynamical": 8.93, // 瞬时油耗
      "location": { // GPS 经纬度数据
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202 // 上报时间
    }
    

当上报数据发动机转速数值大于 8000 时,存储当前信息以便后续分析用户车辆使用情况。

准备工作

创建Pulsar主题

创建 emqx_rule_engine_output 主题:

./bin/pulsar-admin topics create-partitioned-topic -p 5 emqx_rule_engine_output

配置说明

创建资源

打开 EMQ X Dashboard,进入左侧菜单的 资源 页面,点击 新建 按钮,键入 Pulsar 服务器信息进行资源创建。

WX201907181431432x.png

EMQ X 集群中节点所在网络环境可能互不相同,资源创建成功后点击列表中 状态按钮,查看各个节点资源连接状况,如果节点上资源不可用,请检查配置是否正确、网络连通性,并点击 重连 按钮手动重连。

WX201907181432172x.png

创建规则

进入左侧菜单的 规则 页面,点击 新建 按钮,进行规则创建。这里选择触发事件 消息发布,在消息发布时触发该规则进行数据处理。

选定触发事件后,我们可在界面上看到可选字段及示例 SQL:

image20190716174727991.png

筛选所需字段

规则引擎使用 SQL 语句处理规则条件,该业务中我们需要将 payload 中所有字段单独选择出来,使用 payload.fieldName 格式进行选择,还需要消息上下文的 topicqosid 信息,当前 SQL 如下:

SELECT
  payload.id as client_id, payload.speed as speed, 
  payload.tachometer as tachometer,
  payload.ts as ts, id
FROM
  "message.publish"
WHERE
  topic =~ 't/#'

确立筛选条件

使用 SQL 语句 WHERE 字句进行条件筛选,该业务中我们需要定义两个条件:

  • 仅处理 cmd/state/:id 主题,使用主题通配符 =~topic 进行筛选:topic =~ 'cmd/state/+'
  • 仅处理 tachometer > 8000 的消息,使用比较符对 tachometer 进行筛选:payload.tachometer > 8000

组合上一步骤得到 SQL 如下:

SELECT
  payload.id as client_id, payload.speed as speed, 
  payload.tachometer as tachometer,
  payload.ts as ts,
  id
FROM
  "message.publish"
WHERE
  topic =~ 'cmd/state/+'
  AND payload.tachometer > 8000

使用 SQL 测试功能进行输出测试

借助 SQL 测试功能,我们可以实时查看当前 SQL 处理后的数据输出,该功能需要我们指定 payload 等模拟原始数据。

payload 数据如下,注意更改 tachometer 数值大小,以满足 SQL 条件:

{
  "id": "NXP-058659730253-963945118132721-22",
  "speed": 32.12,
  "direction": 198.33212,
  "tachometer": 9001,
  "dynamical": 8.93,
  "location": {
    "lng": 116.296011,
    "lat": 40.005091
  },
  "ts": 1563268202
}

点击 SQL 测试 切换按钮,更改 topicpayload 为场景中的信息,点击 测试 按钮查看数据输出:

image20190716184242159.png

测试输出数据为:

{
  "client_id": "NXP-058659730253-963945118132721-22",
  "id": "589A429E9572FB44B0000057C0001",
  "speed": 32.12,
  "tachometer": 9001,
  "ts": 1563268202
}

测试输出与预期相符,我们可以进行后续步骤。

添加响应动作,桥接消息到 Pulsar

SQL 条件输入输出无误后,我们继续添加相应动作,配置写入 SQL 语句,将筛选结果桥接到 Pulsar。

点击响应动作中的 添加 按钮,选择 桥接数据到Pulsar 动作,选取刚刚选定的资源,Pulsar 主题填写上文创建的emqx_rule_engine_output

WX201907181433432x.png

测试

预期结果

我们成功创建了一条规则,包含一个处理动作,动作期望效果如下:

  1. 设备向 cmd/state/:id 主题上报消息是,当消息中的 tachometer 数值超过 8000 时将命中 SQL,规则列表中 已命中 数字增加 1;
  2. Pulsar 的 emqx_rule_engine_output 主题 将增加一条消息,数值与当前消息一致。

使用 Dashboard 中的 Websocket 工具测试

切换到 工具 --> Websocket 页面,使用任意信息客户端连接到 EMQ X,连接成功后在 消息 卡片发送如下信息:

  • 主题:cmd/state/NXP-058659730253-963945118132721-22

  • 消息体:

    {
      "id": "NXP-058659730253-963945118132721-22",
      "speed": 32.12,
      "direction": 198.33212,
      "tachometer": 8081,
      "dynamical": 8.93,
      "location": {
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202
    }
    

image20190716190238252.png

点击 发送 按钮,查看得到当前规则已命中统计值为 1。

然后通过 Pulsar 命令去查看消息是否生产成功:

./bin/pulsar-client consume emqx_rule_engine_output -s "sub-name" -n 1000
----- got message -----
{"client_id":"NXP-058659730253-963945118132721-22","id":"58DEEDE7CF3D4F440000019CA0003","speed":32.12,"tachometer":8081,"ts":1563268202}

至此,我们通过规则引擎实现了使用规则引擎桥接消息到 Pulsar 的业务开发。

免责声明:文章转载自《EMQ X 规则引擎系列(十四)- 桥接消息到 Pulsar》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇程序实现AutoCAD Map3D 中的图形清理(MapClean)自动化ADO.NET事务处理下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

umeng友盟消息推送功能集成

umeng友盟消息推送功能集成(本人使用的是eclipse开发) 1.首先请自行观看友盟消息推送集成的API文档。   观看地址如下: 1 http://dev.umeng.com/push/android/integration#3_2 2.集成步骤如下    下载sdk         注意:有两种sdk如果用户已经集成支付宝的就下载no-uid版本的...

Kafka之SpringBoot集成Kafka实战

  在spring应用中如果需要订阅kafka消息,通常情况下我们不会直接使用kafka-client, 而是使用更方便的一层封装spring-kafka。  在spring-kafka在运行时会启动两类线程,一类是Consumer线程,另一类是Listener线程。前者用来直接调用kafka-client的poll()方法获取消息,后者才是调用我们代码中...

【转】国内CPU现状

首页 博客 学院 下载 图文课 论坛 APP CSDNCSDN学院 问答 商城 VIP会员 活动 招聘 ITeye GitChat 写博客 小程序 百度APP扫码 关注智能小程序 阅读体验更佳 消息 评论关注点赞回答系统通知 登录注册 我的关注 我的收藏 个人中心 帐号设置 我的博客 管理博客 我的学院 我的...

【阿里云产品公测】消息队列服务MQS使用分享

作者:阿里云用户 wiwi   消息队列MQS,顾名思义,是用于发送接收消息用的。废话不说,直接进入主题。 使用场景:服务添加了一个新功能,主要用于生成图片,本人用的开发语言是PHP,生成图片比较耗服务器性能,如果使用一台服务器生成的话,生成大量图片时速度就会很慢。 由此决定使用3台服务器同时处理,将任务分布到3台服务器中。另外有一台服...

【转】高性能网络编程4--TCP连接的关闭

TCP连接的关闭有两个方法close和shutdown,这篇文章将尽量精简的说明它们分别做了些什么。 为方便阅读,我们可以带着以下5个问题来阅读本文: 1、当socket被多进程或者多线程共享时,关闭连接时有何区别? 2、关连接时,若连接上有来自对端的还未处理的消息,会怎么处理? 3、关连接时,若连接上有本进程待发送却未来得及发送出的消息,又会怎么处理?...

iOS 各种控件默认高度

1.状态栏 状态栏一般高度为20像素,在打手机或者显示消息时会放大到40像素高,注意,两倍高度的状态栏在好像只能在纵向的模式下使用。如下图   用户可以隐藏状态栏,也可以将状态栏设置为灰色,黑色或者半透明的黑色。   如果需要隐藏状态栏可以使用调用: [[UIApplication sharedApplication] setStatusBarHidde...