[原创]阿里云RocketMQ踩过的哪些坑

摘要:
大型邮件。目前,默认情况下支持最大256KB的消息,华北地区支持最大4MB的消息2.消息轨迹。通过消息轨迹,用户可以清楚地找到从发布者发送消息并通过MQ服务器传递给消息订阅者的完整链接,这样他们就可以轻松地查找和排除问题。广播消息,允许由消费者ID标识的所有消费者消费一次消息。顺序消息允许消息使用者按照消息的发送顺序使用消息。多协议访问支持HTTP协议:它支持RESTful风格的HTTP协议来发送和接收消息,这可以解决跨语言使用MQ的问题。

 由于公司的最近开始使用RocketMQ来做支付业务处理, 便开启了学习阿里云RocketMQ的学习与实践之路, 其中踩了不少的坑, 大部份是由于没有仔细查看阿里云的技术文档而踩的坑. 但是有一个非常大的坑, 确实是阿里云的技术文档里没有提及, 在文章的最后会给大家提及.

使用过程了封装的RocketMQ类库已开源在github: https://github.com/antaintan/easyrocketmq

公司的一位同事推荐使用RocketMQ, 并给出了几个流行MQ的对比, 资料来源, 阿里云帮助文档

目前RocketMQ已经成为Apache顶级项目, 作为开源软件免费提供. 但是阿里云提供的RocketMQ是收费的, 一个Topic 2元/天, 2元100万消息.具体价格,可以参照阿里云的报价.

详细对比资料地址: https://help.aliyun.com/document_detail/52577.htmlhttps://help.aliyun.com/document_detail/52577.html

(注: 阿里云的技术文档写得不错, 可以学习到很多知识)

[原创]阿里云RocketMQ踩过的哪些坑第1张

  RocketMQ 是一个款非常强大的MQ, 它同时支持, 无序消息, 分区顺序消息, 全局顺序消息, 并支持TCP, Http协议接入.

  客户端支持Java, C++/.Net(.Net客户端是通过封装C++DLL,通过PInvoke调用来实现的, 基于x64位, 所以我们自己项目编译时也要选择x64)

特色功能

  • 事务消息,实现类似 X/Open XA 的分布事务功能,以达到事务最终一致性状态。
  • 定时(延时)消息,允许消息生产者指定消息进行定时(延时)投递,最长支持40天。
  • 大消息,目前默认支持最大 256KB 消息,华北2 地域支持最大 4MB 消息。
  • 消息轨迹,通过消息轨迹,用户能清晰定位消息从发布者发出,经由 MQ 服务端,投递给消息订阅者的完整链路,方便定位排查问题。
  • 广播消息,允许一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。
  • 顺序消息,允许消息消费者按照消息发送的顺序对消息进行消费。
  • 重置消费进度,根据时间重置消费进度,允许用户进行消息回溯或者丢弃堆积消息。

多协议接入

  • 支持 HTTP 协议:支持 RESTful 风格 HTTP 协议完成收发消息,可以解决跨语言使用 MQ 问题。
  • 支持 TCP 协议:区别于 HTTP 简单的接入方式,提供更为专业、可靠、稳定的 TCP 协议的 SDK 接入。

 注意: Java客户端的TCP协议是功能支持最全的, .Net TCP要少一些功能支持如顺序消息, 而Http协议支持的更少, 具体情况, 请详细查阅阿里云的帮助文档.

消息队列的几个核心对象:

  • Topic: 消息主题
  • Consumer: 消息消费者, 消费订阅的主题消息
  • Producer: 消息生产者, 生产不同的消息

  阿里提供的.Net类库几个核心类库:

    NSClient4CPP.lib, c++类库

    ONSClient4CPP.dll, c++类库

    ONSClient4CPP.pdb c++类库

    ons.dll: Net类库, 利用开源软件 SWIG 生成 PINVOKE 封装代码

  如果系统没有c++运行时类库, 还需要安装vc_redist.x64.exe.

下面是生产者示例代码:

private static ProducerClient producerClient = new ProducerClient(AccessKeyId, AccessKeySecret, ProducerId);

private static void Main(string[] args)
{
    producerClient.Start();

    var stopWatch = new Stopwatch();
    stopWatch.Start();

    var taskList = new List<Task>();
    for (int threadIndex = 1; threadIndex <= ProducerThreadCount; threadIndex++)
    {
        // 生产消费
        var task = Task.Factory.StartNew(() => {
            for (int messageIndex = 1; messageIndex <= MessageCountPerThread; messageIndex++)
            {
                string content = "线程ID=" + Thread.CurrentThread.ManagedThreadId + ", 我要测试rocketmq message";
                //producerClient.SendMessage(ShardingKey, Topic, content, Tag);
                producerClient.SendMessage(Topic, content, Tag);

                Console.WriteLine(content);
            }
        }, TaskCreationOptions.LongRunning);

        taskList.Add(task);
    }

    Task.WaitAll(taskList.ToArray());
    stopWatch.Stop();

    // 一定要关闭,不然会有内存泄漏
    producerClient.Shutdown();

    Console.WriteLine("发送消息:{0}条, 使用时间{1}毫秒", MessageCountPerThread * ProducerThreadCount, stopWatch.ElapsedMilliseconds);
    Console.ReadLine();
}


下面是消费者示例代码
private static PushConsumerClient consumerClient = new PushConsumerClient(AccessKeyId, AccessKeySecret, Topic, ConsumerId, SubExpression);

private static int count = 0;

private class MyMsgListener : DefaultMessageListener
{
    public override ons.Action consume(Message message, ConsumeContext context)
    {
        Console.WriteLine("消息序号: {0}, 当前线程ID = {1}, 内容为: {2}", ++count, Thread.CurrentThread.ManagedThreadId, message.getBody());
        return ons.Action.CommitMessage;
    }
}

private static void Main(string[] args)
{
    var listener = new MyMsgListener();
    consumerClient.setMessageListener(listener);
    consumerClient.Start();

    Console.ReadLine();
    consumerClient.Shutdown();
}

  消费者有一个消息监听类, 有个consume方法, 这里趟过一个大坑, 因为为了省去一个类文件, 就用了匿名方法来代替上面的监听类, 结果只要消息一条消息后, 程序就自动死, 没有任何.net错误信息, 只有一个很底层的错误, 得不到任何有价值的信息, 花了很多时间, 才发现是因为匿名方法通过PPInvoke调用会有问题.后面换成监听类, 问题就解决了.

免责声明:文章转载自《[原创]阿里云RocketMQ踩过的哪些坑》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Away3D基础之摄像机buuctf web xss之光下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

阿里云搭建wordpress生产级CMS网站实践

搭建cms内容站点时,wordpress是一个很好的选择,不用做任何开发就可以通过配置、插件获得丰富的功能。用docker容器技术部署运维都非常简单,特别是对于wordpress这种我们无需做任何开发的组件。而出于低成本考虑,公有云都是一个最佳选择,这里我选择阿里云。为了提速,wordpress前会有一个nginx作为负载均衡和web加速服务器,将静态内容...

第一章 基础设施,1.3 阿里视频云ApsaraVideo是怎样让4000万人同时狂欢的(作者:蔡华)

1.3 阿里视频云ApsaraVideo是怎样让4000万人同时狂欢的 前言 在今年的双11中,双11天猫狂欢夜的直播成为一大亮点。 根据官方披露数据,直播总观看人数超4257万,同时观看人数峰值达529万,在云端实现了高计算复杂度的H.265实时转码和窄带高清技术。其实不光是双11,直播已经成为了2016年互联网最火爆的话题。除了内容的大规模涌现,背后其...

【--RocketMQ--】RocketMQ实现事务消息

在RocketMQ4.3.0版本后,开放了事务消息这一特性,对于分布式事务而言,最常说的还是二阶段提交协议,那么RocketMQ的事务消息又是怎么一回事呢,这里主要带着以下几个问题来探究一下RocketMQ的事务消息:   事务消息是如何实现的  我们有哪些手段来监控事务消息的状态  事务消息的异常恢复机制  RocketMQ的事务消息是如何实现的 Roc...

2019 云和数据java面试笔试题 (含面试题解析)

  本人5年开发经验、18年年底开始跑路找工作,在互联网寒冬下成功拿到阿里巴巴、今日头条、云和数据等公司offer,岗位是Java后端开发,因为发展原因最终选择去了云和数据,入职一年时间了,也成为了面试官,之前面试了很多家公司,感觉大部分公司考察的点都差不多,趁空闲时间,将自己的心得记下来,希望能给正在找或者准备找工作的朋友提供一点帮助。   下面提的问题...

RocketMQ多master迁移至多master多slave模式

一、项目背景 由于当前生产环境RocketMQ机器使用年限较长,已经过保,并且其中一台曾经发生过异常宕机事件。并且早期网络规划较乱,生产、开发、测试等网络没有分开,公司决定对当前网络进行规划,区分各个环境网段、机柜,涉及到MQ集群需要迁移,由于物理机比较老旧,使用决定使用新机器替换老机器,并且之前的MQ集群为多master模式,当master宕机是会导...

2019 多益网络java面试笔试题 (含面试题解析)

  本人5年开发经验、18年年底开始跑路找工作,在互联网寒冬下成功拿到阿里巴巴、今日头条、多益网络等公司offer,岗位是Java后端开发,因为发展原因最终选择去了多益网络,入职一年时间了,也成为了面试官,之前面试了很多家公司,感觉大部分公司考察的点都差不多,趁空闲时间,将自己的心得记下来,希望能给正在找或者准备找工作的朋友提供一点帮助。   下面提的问题...