不恰当使用线程池处理 MQ 消息引起的故障

摘要:
这种失败实际上是在分析之前遇到的。分析了当时转储的堆后,发现用于处理MQ消息的线程池的队列长度达到了百万级,占用了超过1.3G的内存,无法回收。目前,该程序的实现如下:关联系统将消息推送到MQ,然后我们从MQ中提取消息进行处理;对于每种类型的消息,都有一个线程负责从MQ中提取消息。在提取消息后,封装到线程池中的任务将提交到相应的线程池中执行。这不仅控制了线程池占用的内存,而且当消息处理线程池无法处理消息时,还允许多个线程处理消息。

现状

业务部门反应网站访问特别慢,负责运维监控的同事说MQ消息队列积压了,中间件的说应用服务器内存占用很高,GC 一直回收不了内存,GC 线程占了近 100% 的 CPU,其他的基本上都在等待,数据库很正常,完全没压力。没啥办法,线程、堆 dump 出来后,重启吧,然后应用又正常了。

分析

这种故障之前其实也碰到过了,分析了当时 dump 出来的堆后发现,处理 MQ 消息的线程池的队列长度达百万级别,占用了超过 1.3G 内存,这些内存都是没法回收的。

程序的实现目前是这样的:关联系统把消息推送到 MQ 上,我们再从 MQ 上拉消息下来处理;每种类型的消息都有一个线程负责从 MQ 上拉消息,拉下来后封装成线程池的任务提交给相应的线程池去执行。代码可以简化为:

  package net.coderbee.mq.demo;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MQListener {
     public ExecutorService executor = Executors.newFixedThreadPool(8);

     public void onMessage(final Object message) {
          executor.execute(new Runnable() {
               @Override
               public void run() {
                    // 耗时且复杂的消息处理逻辑
                    complicateHanlde(message);
               }
          });
     }

     private void complicateHanlde(Object message) {
     }
}

这个实现就是导致故障的根源, Executors.newFixedThreadPool(8) 创建的线程池的任务队列是无边界的:

  public static ExecutorService newFixedThreadPool(int nThreads) {
     return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
}

当时是关联系统出故障了,他们恢复后,往 MQ 里狂推消息,我们系统里面的 MQListener 不断地从 MQ 拉消息下来,直接塞进线程池里,由于线程池处理消息的速度远远慢于消息进入的速度,所以线程池的队列不断增长,直到把所有的堆内存都占用了,这时不断引发 FullGC,但每次 FullGC 都没法回收到内存,应用也就挂死在那了。

之前那次故障也是线程池队列积压导致的,引起的原因是消息处理逻辑调用了外部接口,由于外部接口的响应非常慢,严重拖慢了消息的处理进度,改成异步调用之后好了些。但问题的根源并没有解决,就像昨天关联系统狂推消息后,我们的系统还是挂了。

解决方法

我的思路其实很简单,MQ 是用来系统间解耦的,也是一个缓冲,目前的实现是把处理消息的线程池又用作一个 MQ 了,消息不能不受控地进入线程池的任务队列,所以,要换成使用定长的阻塞队列,队列满了就暂停拉取消息。把线程池替换成:

  private int nThreads = 8;
private int MAX_QUEUQ_SIZE = 2000;
private ExecutorService executor = new ThreadPoolExecutor(nThreads,
          nThreads, 0L, TimeUnit.MILLISECONDS,
          new ArrayBlockingQueue<Runnable>(MAX_QUEUQ_SIZE),
          new ThreadPoolExecutor.CallerRunsPolicy());

把线程池队列满的时候直接让调用者(也就是 MQListener)执行任务,这样即延缓了消息拉取的速度,当 MQListener 再去拉取消息时,发现线程池有空间时可以提交到线程池,让线程池的工作线程去处理,它继续保持拉取速度。

这样既控制了线程池占用的内存,又可以让消息处理线程池处理不过来时多一个线程处理消息。

由于上面的代码采用调用者执行的方式,那么要考虑消息处理的顺序问题,比如一个订单的处理可能有多个步骤,对应多条 MQ 消息,那么要考虑这些步骤如果乱序了是否可以接受,因为第3步骤的处理消息可能被 MQListener 处理了,而第2步的处理消息还积压在线程池里。

网上看到的,其实也是我遇到的问题! 记录下 !感谢!

免责声明:文章转载自《不恰当使用线程池处理 MQ 消息引起的故障》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇消息队列-推/拉模式学习 &amp;amp; ActiveMQ及JMS学习Android Studio 自定义签名,代码段快捷键下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

第18章-使用WebSocket和STOMP实现消息功能

Spring 4.0为WebSocket通信提供了支持,包括: 发送和接收消息的低层级API; 发送和接收消息的高级API; 用来发送消息的模板; 支持SockJS,用来解决浏览器端、服务器以及代理不支持WebSocket的问题。 1 使用Spring的低层级WebSocket API 按照其最简单的形式,WebSocket只是两个应用之间通信的通道。...

ES6+转ES5

  本人近期接到一个天大的“好消息”:zxbc项目某些客户为保险业等种种原因要支持IE……  2013年,ES6草案冻结,2015年6月,ES6正式通过,成为国际标准。都9102啦,Chrome还好啦,升级到最新版本,大部分ES6还是ok的,但是万恶之源IE呢?作为一个前端开发者,兼容万恶的IE,顿时,胸中万马奔腾,此处省略十万字……  无奈之举,撸起袖子...

基于Abp VNext框架设计

abp 通过IDistributedEventBus接口集成自IEventBus实现分布式事件消息的发布订阅。 IEventBus在什么时机触发PublishAsync? 当前UnitOfWork完成时,触发IEventBus的PublishAsync 在没有事务环境下,同步调用IEventBus的PublishAsync abp 默认实现基于Ra...

Java转码工具native2ascii

背景:在做Java开发的时候,常常会出现一些乱码,或者无法正确识别或读取的文件,比如常见的validator验证用的消息资源(properties)文件就需要进行Unicode重新编码。原因是java默认的编码方式为Unicode,而我们的计算机系统编码常常是GBK等编码。需要将系统的编码转换为java正确识别的编码问题就解决了。 1、native2asc...

C#WebBrowser控件使用教程与技巧收集

C#WebBrowser控件使用教程与技巧收集--苏飞收集 先来看看常用的方法 Navigate(string urlString):浏览urlString表示的网址 Navigate(System.Uri url):浏览url表示的网址 Navigate(string urlString, string targetFrameName, byt...

TWinControl的消息覆盖函数大全(41个WM_函数和31个CM_函数,它的WndProc就处理鼠标(转发)、键盘(取消拖动)、焦点、和WM_NCHITTEST一共4类消息)

注意,这些函数只有Private一种形式(也就是不允许覆盖,但仍在动态表格中): 其中TWinControl对TControl有10个消息进行了覆盖(红色标记),其中有2个是WM_消息,8个是CM_消息。 TWinControl = class(TControl) private //41个windows消息,几乎全部消息都是私有函数(...