中移4G模块-ML302-OpenCpu开发-(MQTT连接阿里云-RRPC通讯)

摘要:
物联网平台开发了一套基于MQTT协议的请求和响应同步机制,可以在不改变MQTT协议情况下实现同步通信。术语解释RRPC:恢复RPC。RRPC可以实现服务器向设备侧请求并使设备侧能够响应的功能。RRPC响应消息:设备回复云的消息。RRPC订阅主题:设备订阅RRPC消息时传递的主题,包括通配符。RRPC原理的具体流程如下:物联网平台在接收到来自用户服务器的RRPC呼叫后,向设备发送RRPC请求消息。消息体是用户传入的数据。主题是由物联网平台定义的主题,其中包含唯一的RRPC消息ID。

B站:https://space.bilibili.com/309103931

中移4G模块-ML302专栏:https://blog.csdn.net/qq_33259323/category_10453372.html

中移4G模块-ML302文集:https://www.bilibili.com/read/readlist/rl328642

1.中移4G模块-ML302-OpenCpu开发-(固件编译和烧录)

https://blog.csdn.net/qq_33259323/article/details/108586847

https://www.bilibili.com/read/cv7876504

2.中移4G模块-ML302-OpenCpu开发-(MQTT连接阿里云)

https://blog.csdn.net/qq_33259323/article/details/108638945

https://www.bilibili.com/read/cv7876527

2.1中移4G模块-ML302-OpenCpu开发-(MQTT连接阿里云-订阅主题)

https://blog.csdn.net/qq_33259323/article/details/108960540

https://www.bilibili.com/read/cv7879954

2.2中移4G模块-ML302-OpenCpu开发-(MQTT连接阿里云-接收和发送数据)

https://blog.csdn.net/qq_33259323/article/details/108964810

https://www.bilibili.com/read/cv7886836

2.3中移4G模块-ML302-OpenCpu开发-(MQTT连接阿里云-RRPC通讯)

https://blog.csdn.net/qq_33259323/article/details/108965071

https://www.bilibili.com/read/cv7888259

3.中移4G模块-ML302-OpenCpu开发-串口开发

https://blog.csdn.net/qq_33259323/article/details/108974888

https://www.bilibili.com/read/cv7888865

4.中移4G模块-ML302-OpenCpu开发-51单片机串口转I2C

https://blog.csdn.net/qq_33259323/article/details/109020642

https://www.bilibili.com/read/cv7922942

5.中移4G模块-ML302-OpenCpu开发-MCP23017输入/输出

https://blog.csdn.net/qq_33259323/article/details/109109136

https://www.bilibili.com/read/cv7969395

7.中移4G模块-ML302-OpenCpu开发-PCF8591测量电压

https://blog.csdn.net/qq_33259323/article/details/109109266

https://www.bilibili.com/read/cv7969365

8.中移4G模块-ML302-OpenCpu开发-GPIO

https://blog.csdn.net/qq_33259323/article/details/108960947

https://www.bilibili.com/read/cv7877192

9.中移4G模块-ML302-OpenCpu开发-ADC

https://blog.csdn.net/qq_33259323/article/details/109020864

https://www.bilibili.com/read/cv7922958

10.中移4G模块-ML302-OpenCpu开发-CJSON

https://blog.csdn.net/qq_33259323/article/details/109020898

https://www.bilibili.com/read/cv7923020

11.中移4G模块-ML302-OpenCpu开发-HTTP

https://blog.csdn.net/qq_33259323/article/details/109020904

https://www.bilibili.com/read/cv7923054

中移4G模块-ML302-OpenCpu开发-(MQTT连接阿里云-RRPC通讯)

什么是RRPC通信

MQTT协议是基于PUB/SUB的异步通信模式,不适用于服务端同步控制设备端返回结果的场景。物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。物联网平台提供API给服务端,设备端只需要按照固定的格式回复PUB消息,服务端使用API,即可同步获取设备端的响应结果。

名词解释

  • RRPC:Revert-RPC。RPC(Remote Procedure Call)采用客户机/服务器模式,用户不需要了解底层技术协议,即可远程请求服务。RRPC则可以实现由服务端请求设备端并能够使设备端响应的功能。
  • RRPC 请求消息:云端下发给设备端的消息。
  • RRPC 响应消息:设备端回复给云端的消息。
  • RRPC 消息ID:云端为每次RRPC调用生成的唯一消息ID。
  • RRPC 订阅Topic:设备端订阅RRPC消息时传递的Topic,含有通配符。

RRPC原理

RRPC

具体流程如下:

  1. 物联网平台收到来自用户服务器的RRPC调用,下发一条RRPC请求消息给设备。消息体为用户传入的数据,Topic为物联网平台定义的Topic,其中含有唯一的RRPC消息ID。
  2. 设备收到下行消息后,按照指定Topic格式(包含之前云端下发的唯一的RRPC消息ID)回复一条RRPC响应消息给云端,云端提取出Topic中的消息ID,和之前的RRPC请求消息匹配上,然后回复给用户服务器。
  3. 如果调用时设备不在线,云端会给用户服务器返回设备离线的错误;如果设备没有在超时时间内(8秒内)回复RRPC响应消息,云端会给用户服务器返回超时错误。

RRPC通信相关Topic

RRPC通信相关Topic格式如下:

  • RRPC请求消息Topic:/sys/${YourProductKey}/${YourDeviceName}/rrpc/request/${messageId}
  • RRPC响应消息Topic:/sys/${YourProductKey}/${YourDeviceName}/rrpc/response/${messageId}
  • RRPC订阅Topic:/sys/${YourProductKey}/${YourDeviceName}/rrpc/request/+

以上内容来自https://help.aliyun.com/document_detail/90567.html

由上面可以看出需要在topic中获得messageId,然后在发送回去

如果你的ML302模块还没有连上阿里云可以看一下这个,中移4G模块-ML302-OpenCpu开发-(MQTT连接阿里云)

https://blog.csdn.net/qq_33259323/article/details/108638945

https://www.bilibili.com/read/cv7876527

1.订阅RRPC example_subscribe_rrpc函数

int example_subscribe_rrpc(void *handle){
    int res = 0;
    const char *fmt = "/sys/%s/%s/rrpc/request/+";
    char *topic = NULL;
    int topic_len = 0;

    topic_len = strlen(fmt) + strlen(DEMO_PRODUCT_KEY) + strlen(DEMO_DEVICE_NAME) + 1;
    topic = HAL_Malloc(topic_len);
    if (topic == NULL) {
        cm_printf("[ALIYUN]: memory not enough
");
        return -1;
    }
    memset(topic, 0, topic_len);
    HAL_Snprintf(topic, topic_len, fmt, DEMO_PRODUCT_KEY, DEMO_DEVICE_NAME);

    cm_printf("topic:%s 
",topic);

    res = IOT_MQTT_Subscribe(handle, topic, IOTX_MQTT_QOS0, example_message_arrive_rrpc, NULL);
    if (res < 0) {
        cm_printf("[ALIYUN]: subscribe failed
");
        HAL_Free(topic);
        return -1;
    }

    HAL_Free(topic);
    return 0;
}

  2.接收数据 example_message_arrive_rrpc函数

char DEMO_RRPC_SessionId[19];

void example_message_arrive_rrpc(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg){
    iotx_mqtt_topic_info_t     *topic_info = (iotx_mqtt_topic_info_pt) msg->msg;
    char * sendMessage = NULL;
    const char * sendMessage_fmt = "%d";
    int sendMessage_len = 20;
    int cm_test_read_gpio = 0;

    cm_printf("example_message_arrive_rrpc 
");

    switch (msg->event_type) {
        case IOTX_MQTT_EVENT_PUBLISH_RECEIVED:
            /* print topic name and topic message */
            cm_printf("[ALIYUN]: Message Arrived:");
            cm_printf("Topic  : %.*s", topic_info->topic_len, topic_info->ptopic);
            cm_printf("Payload: %.*s", topic_info->payload_len, topic_info->payload);
            cm_printf("
");

            // 提取sessionId
            strncpy(DEMO_RRPC_SessionId,topic_info->ptopic+46,19);
            cm_printf("[ALIYUN]: sessionId: %s",DEMO_RRPC_SessionId);


            //接收到的数据:topic_info->payload
            //干啥干啥干啥

            // 发送
            example_publish_rrpc(sendMessage);

            break;
        default:
            break;
    }
}

3.发送数据 example_publish_rrpc函数

int example_publish_rrpc(unsigned char * payload)
{
    int             res = 0;
    const char     *fmt = "/sys/%s/%s/rrpc/response/%s";
    char           *topic = NULL;
    int             topic_len = 0;
    //char           *payload = "{"message":"hello!"}";]

    cm_printf("MQTT发送信息:%s",payload);

    topic_len = strlen(fmt) + strlen(DEMO_PRODUCT_KEY) + strlen(DEMO_DEVICE_NAME) + 1 +strlen(DEMO_RRPC_SessionId);
    topic = HAL_Malloc(topic_len);
    if (topic == NULL) {
        cm_printf("[ALIYUN]: memory not enough
");  
        return -1;
    }
    memset(topic, 0, topic_len);
    HAL_Snprintf(topic, topic_len, fmt, DEMO_PRODUCT_KEY, DEMO_DEVICE_NAME,DEMO_RRPC_SessionId);

    cm_printf("[ALIYUN]: example_publish_rrpc: %s",topic);


    res = IOT_MQTT_Publish_Simple(0, topic, IOTX_MQTT_QOS0, payload, strlen(payload));
    if (res < 0) {
        cm_printf("[ALIYUN]: publish failed, res = %d
", res);
        HAL_Free(topic);
        return -1;
    }

    HAL_Free(topic);
    return 0;
}

免责声明:文章转载自《中移4G模块-ML302-OpenCpu开发-(MQTT连接阿里云-RRPC通讯)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇【微信小程序】再次授权地理位置getLocation+openSetting使用Atlantis(POJ1151+线段树+扫描线)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

emqx ws转成wss

主要是在nginx里加 把8083转成443 下面是完整nginx配置 server { listen 443; server_name localhost; root html; index...

kafka中的消费组

一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少。最近Kafka社区邮件组已经在讨论是否应该正式使用新版本consumer替换老版本,笔者也觉得时机成熟了,于是写下这篇文章讨论并总结一下新版本consumer的些许设计理念,希望能把consumer这点事说清楚,从而对广大使用者有所帮助。 在开始之前,...

kafka服务端和客户端均无法消费

一直报错: 1 partitions have leader brokers without a matching listener, including [cloudeasy.cmdb.cancel.manage-0] 查看zk服务端,报错如下: 原因是 watch 太多且路径太长导致重连的时候超过了 ZK maxBuffer 的限制 把 客户端从 3...

scanf函数的使用

函数原型编辑 1 intscanf( constchar*format, ... ); scanf()函数是格式化输入函数,它从标准输入设备(键盘) 读取输入的信息。 其调用格式为: scanf("<格式化字符串>",<地址表>); 函数 scanf() 是从标准输入流 stdio 中读内容的通用子程序,可以读入全部固...

CString的GetBuffer用法,GetBuffer本质,GetBuffer常见问题解决方法 .

char *GetBuffer(n)当n大于0时,是为CString变量分配一个长度为n的字节数组,返回值是这个数组的地址当n等于0时,返回CString变量本身拥有的字符串数组的头ReleaseBuffer一般用在GetBuffer,因为在调用了GetBuffer后变量本身会给自己上锁,于是所有能改变自身值的函数都不能用(如果Left,Mid),要用Re...

Keil Debug (printf) Viewer

Debug (printf) Viewer Home » µVision Windows » Debug (printf) Viewer The Debug (printf) Viewer window displays data streams that are transmitted sequentially through the ITM Stimu...