CountDownLatch实例的await()方法

摘要:
转移自:http://blog.sina.com.cn/s/blog_4bed7e340101atnf.html我两年前写的程序有一个bug,当时我觉得这是无法解释的。这是因为线程偶尔会死掉,当时没有问题。因此,当时的对策是启动一个监控线程,一旦发现线程死机,就重新启动它,并在今天再次查看此代码。发现存在如下漏洞代码:publicvoidstartOneBusiness(finalStringbusinessID

转自:http://blog.sina.com.cn/s/blog_4bed7e340101atnf.html

两年前写的程序,出了一个当时觉得莫名其妙的bug,就是线程偶尔会死掉,当时也看不出有什么
问题所以当时的对策是起了一个监控线程,发现线程死掉就重启一个
今天回头再去看这段代码,发现确实有漏洞

代码如下

public void startOneBusiness(final String businessID) {
final IBusiness business = getOneBusiness(businessID);
business.strat();
Thread thread = new Thread(new Runnable() {
public void run() {
try {
business.setBusinessThread(Thread.currentThread());
while (business.getThreadStat() == IBusiness.START) {
try {
business.setLastRuntime(System.currentTimeMillis());
business.doTask();
Thread.sleep(business.getThreadSleepTime());
} catch (Throwable e) {
LogDebug.debug(business.getBusinessName()
+ " doTask 致命错误", e);
// 发邮件
AppException appException = new AppException(e);
MailUtil.sendMail(business.getBusinessName()
+ " doTask 致命错误",
appException.getMessage(), business
.getAddresses());
}
}
business.end();
} catch (Throwable e) {
LogDebug.debug(business.getBusinessName() + " doTask 致命错误",
e);
// 发邮件
AppException appException = new AppException(e);
MailUtil.sendMail(business.getBusinessName()
+ " doTask 致命错误", appException.getMessage(),
business.getAddresses());
} finally {
business.end();
}
}

});
IBusiness.threadPool.submit(thread);

}

public void doTask() {
System.out
.println("***************************************************");
System.out.println(getBusinessName() + " 开始");

long b = System.currentTimeMillis();
try {
T taskData = getTaskData();
if (taskData != null) {
final CountDownLatch countDownLatch = new CountDownLatch(
getRealThreadCount(taskData));
System.out.println(getBusinessName() + " 线程数:"
+ countDownLatch.getCount() + " 总任务数:"
+ getTaskCount(taskData));
final CountDownLatch startLatch = new CountDownLatch(1);// 确保在所有线程开始执行任务前,所有准备工作都已经完成,一旦准备工作完成了就调用startLatch.countDown()
for (int i = 0; i < countDownLatch.getCount(); i++) {
final T curTaskData = getCurTaskData(i, taskData);
final int m = i;
Runnable thread = new Runnable() {
public void run() {
long b1 = System.currentTimeMillis();
try {
startLatch.await();
dealTaskData(curTaskData);
} catch (Throwable e) {//注意不要跑飞了~~~
LogDebug.debug(
getBusinessName() + " doTask 异常", e);
// 发邮件
AppException appException = new AppException(e);
sendMail(getBusinessName() + " doTask 异常",
appException.getMessage(),
getAddresses());
} finally {
System.out
.println(getBusinessName()
+ " Thread-"
+ (m + 1)
+ " end "
+ " 耗时:"
+ (System.currentTimeMillis() - b1)
+ " 处理任务数:"
+ getTaskCount(curTaskData));
countDownLatch.countDown();
}
}

};
IBusiness.threadPool.submit(thread);// 这里一定要用线程池,否则当线程数超过tomcat限制,会导致线程数小于countDownLatch
}
startLatch.countDown();// 所有线程开始执行任务
countDownLatch.await();
}
} catch (Exception e) {
LogDebug.debug(getBusinessName() + " doTask 异常", e);
// 发邮件
AppException appException = new AppException(e);
sendMail(getBusinessName() + " doTask 异常", appException
.getMessage(), getAddresses());
} catch (Error e) {
LogDebug.debug(getBusinessName() + " doTask 致命错误", e);
// 发邮件
AppException appException = new AppException(e);
sendMail(getBusinessName() + " doTask 致命错误", appException
.getMessage(), getAddresses());
}
finally {
if(isMetrics()){
try{
THREAD_MONITOR = ThreadMonitor.getThreadMonitor();
Calendar calendar=Calendar.getInstance();//必须 Calendar.getInstance(),否则时间都一样
CellInfo cellInfo =new CellInfo();
cellInfo.setApplication(getSystemName());
cellInfo.setKey(getBusinessName());
cellInfo.setServer(getLocal());
cellInfo.setExecuteCount(1);
cellInfo.setCostTime(new Long(System.currentTimeMillis() - b).intValue());
cellInfo.setCreateDate(sdFormat.format(calendar.getTime()));
THREAD_MONITOR.addInfo(cellInfo);
}catch (Throwable e) {
System.out.println("监控异常!!!!!!!!!");
e.printStackTrace();
}
}
System.out.println(getBusinessName() + " 结束 " + " 耗时:"
+ (System.currentTimeMillis() - b));
System.out
.println("*************************************************** ");
}

}


startOneBusiness中的任务是主任务
doTask中的任务是子任务
里面用了两个CountDownLatch实例做主任务和子任务同步,还用到了线程池,
用的这个Executors.newFixedThreadPool固定线程池,
主任务和子任务都放到了线程池中

漏洞:
Executors.newFixedThreadPool这是个有固定活动线程数。当提
交到池中的任务数大于固定活动线程数时,任务就会放到阻塞队列中等待。
主任务和子任务都放到了线程池中,就有下面的情况发生,
主任务调用一次CountDownLatch实例的await()方法时,当前线程就会一直占用一个活动线程,如果多次调用,
那么就会一直占用多个活动线程,如果调用次数大于固定活动线程数,那么就可能造成阻塞队列
中某些子任务一直不被执行,CountDownLatch实例的countDown()的方法一直不被调用,那么对应的
主任务所在线程就会无限等待,与死锁现像一样

解决办法是避免主任务不要和子任务放到同一线程池

另外还有另外一种情况,就是子任务有可以因为其它原因,不去执行CountDownLatch实例的
countDown()方法,造成主任务所在线程无限等待

解决办法是最好不要用CountDownLatch实例的await(),归避长时间阻塞线程的风险,任何多线程应用程序都有死锁风险,改用CountDownLatch实例的await(long timeout, TimeUnit unit),设定超时时间,如果超时,
将返回false,这样我们得知超时后,可以做异常处理,而await()是void类型,没有返回值,
我们无法得知超时信息

另转载如下文字,转自http://www.duote.com/tech/5/12004.html

虽然线程池能大大提高服务器的并发性能,但使用它也会存在一定风险。与所有多线程应用程序一样,用线程池构建的应用程序容易产生各种并发问题,如对共享资源的竞争和死锁。此外,如果线程池本身的实现不健壮,或者没有合理地使用线程池,还容易导致与线程池有关的死锁、系统资源不足和线程泄漏等问题。

1.死锁

任何多线程应用程序都有死锁风险。造成死锁的最简单的情形是,线程A持有对象X的锁,并且在等待对象Y的锁,而线程B持有对象Y的锁,并且在等待对象X的锁。线程A与线程B都不释放自己持有的锁,并且等待对方的锁,这就导致两个线程永远等待下去,死锁就这样产生了。

虽然任何多线程程序都有死锁的风险,但线程池还会导致另外一种死锁。在这种情形下,假定线程池中的所有工作线程都在执行各自任务时被阻塞,它们都在等待某个任务A的执行结果。而任务A依然在工作队列中,由于没有空闲线程,使得任务A一直不能被执行。这使得线程池中的所有工作线程都永远阻塞下去,死锁就这样产生了。

2.系统资源不足

如果线程池中的线程数目非常多,这些线程会消耗包括内存和其他系统资源在内的大量资源,从而严重影响系统性能。

3.并发错误

线程池的工作队列依靠wait()和notify()方法来使工作线程及时取得任务,但这两个方法都难于使用。如果编码不正确,可能会丢失通知,导致工作线程一直保持空闲状态,无视工作队列中需要处理的任务。因此使用这些方法时,必须格外小心,即便是专家也可能在这方面出错。最好使用现有的、比较成熟的线程池。例如,直接使用java.util.concurrent包中的线程池类。

4.线程泄漏

使用线程池的一个严重风险是线程泄漏。对于工作线程数目固定的线程池,如果工作线程在执行任务时抛出 RuntimeException 或Error,并且这些异常或错误没有被捕获,那么这个工作线程就会异常终止,使得线程池永久失去了一个工作线程。如果所有的工作线程都异常终止,线程池就最终变为空,没有任何可用的工作线程来处理任务。

导致线程泄漏的另一种情形是,工作线程在执行一个任务时被阻塞,如等待用户的输入数据,但是由于用户一直不输入数据(可能是因为用户走开了),导致这个工作线程一直被阻塞。这样的工作线程名存实亡,它实际上不执行任何任务了。假如线程池中所有的工作线程都处于这样的阻塞状态,那么线程池就无法处理新加入的任务了。[nextpage]

5.任务过载

当工作队列中有大量排队等候执行的任务时,这些任务本身可能会消耗太多的系统资源而引起系统资源缺乏。

综上所述,线程池可能会带来种种风险,为了尽可能避免它们,使用线程池时需要遵循以下原则。

(1)如果任务A在执行过程中需要同步等待任务B的执行结果,那么任务A不适合加入到线程池的工作队列中。如果把像任务A一样的需要等待其他任务执行结果的任务加入到工作队列中,可能会导致线程池的死锁。

(2)如果执行某个任务时可能会阻塞,并且是长时间的阻塞,则应该设定超时时间,避免工作线程永久的阻塞下去而导致线程泄漏。在服务器程序中,当线程等待客户连接,或者等待客户发送的数据时,都可能会阻塞。可以通过以下方式设定超时时间:

调用ServerSocket的setSoTimeout(int timeout)方法,设定等待客户连接的超时时间;

对于每个与客户连接的Socket,调用该Socket的setSoTimeout(int timeout)方法,设定等待客户发送数据的超时时间。

(3)了解任务的特点,分析任务是执行经常会阻塞的I/O操作,还是执行一直不会阻塞的运算操作。前者时断时续地占用CPU,而后者对CPU具有更高的利用率。预计完成任务大概需要多长时间?是短时间任务还是长时间任务?

根据任务的特点,对任务进行分类,然后把不同类型的任务分别加入到不同线程池的工作队列中,这样可以根据任务的特点,分别调整每个线程池。

(4)调整线程池的大小。线程池的最佳大小主要取决于系统的可用CPU的数目,以及工作队列中任务的特点。假如在一个具有 N 个CPU的系统上只有一个工作队列,并且其中全部是运算性质(不会阻塞)的任务,那么当线程池具有 N 或 N+1 个工作线程时,一般会获得最大的 CPU 利用率。

如果工作队列中包含会执行I/O操作并常常阻塞的任务,则要让线程池的大小超过可用CPU的数目,因为并不是所有工作线程都一直在工作。选择一个典型的任务,然后估计在执行这个任务的过程中,等待时间(WT)与实际占用CPU进行运算的时间(ST)之间的比例WT/ST。对于一个具有N个CPU的系统,需要设置大约N×(1+WT/ST)个线程来保证CPU得到充分利用。

当然,CPU利用率不是调整线程池大小过程中唯一要考虑的事项。随着线程池中工作线程数目的增长,还会碰到内存或者其他系统资源的限制,如套接字、打开的文件句柄或数据库连接数目等。要保证多线程消耗的系统资源在系统的承载范围之内。

(5)避免任务过载。服务器应根据系统的承载能力,限制客户并发连接的数目。当客户并发连接的数目超过了限制值,服务器可以拒绝连接请求,并友好地告知客户:服务器正忙,请稍后再试。

免责声明:文章转载自《CountDownLatch实例的await()方法》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇BZOJ2594 [Wc2006]水管局长数据加强版 【LCT维护最小生成树】归并排序(python实现)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

HBase的Write Ahead Log (WAL) —— 整体架构、线程模型

解决的问题 HBase的Write Ahead Log (WAL)提供了一种高并发、持久化的日志保存与回放机制。每一个业务数据的写入操作(PUT / DELETE)执行前,都会记账在WAL中。 如果出现HBase服务器宕机,则可以从WAL中回放执行之前没有完成的操作。 本文主要探讨HBase的WAL机制,如何从线程模型、消息机制的层面上,解决这些问题: 1...

c#实现多线程代码例子

相信大家都有用过网际快车等下载资源的经历,它里面是可以设置线程数的(近年版本默认是10,曾经默认是5)。它会将文件分成与线程数相同的部分,然后每个线程下载自己的那一部分,这样下载效率就有可能提高。相信大家都有加多线程数,提升下载效率的经历。但细心的用户会发现,在带宽一定的情况下,并不是线程越多,速度越快,而是在某一点达到峰值。在C#中用多线程并不难实现。它...

.net 多线程的使用(Thread)

 上篇 net 同步异步   中篇 多线程的使用(Thread)   下篇 net 任务工厂实现异步多线程 Thread多线程概述  上一篇我们介绍了net 的同步与异步,我们异步演示的时候使用的是委托多线程来实现的。今天我们来细细的剖析下 多线程。 多线程的优点:可以同时完成多个任务;可以使程序的响应速度更快;可以让占用大量处理时间的任务或当前没有进...

synchronized 的真正含义

@synchronized 锁的永远是对象 ,只针对于对象,只能锁对象,常量等是不能加synchronized,一旦加编译也不会通过 @synchronized 锁对象中的非static 就是锁调用该方法的对象,是整个对象 @synchronized 锁对象中的static 方法就是锁整个类 ,和 synchronized(x.class) 和 锁静态代码...

网络编程,从socket到epoll

网络编程,从socket到epoll 参考链接:https://www.bilibili.com/video/BV11Z4y157RY?p=2&spm_id_from=pageDriver socket基本知识: socket分类: socekt提供了流和数据报两种通信机制,即流socket和数据报socket。 简单的socket通信流程: 先...

JUC 并发编程--04 常用的辅助类CountDownLatch , CyclicBarrier , Semaphore , 读写锁 , 阻塞队列,CompletableFuture(异步回调)

CountDownLatch 相当于一个减法计数器, 构造方法指定一个数字,比如6, 一个线程执行一次,这个数字减1, 当变为0 的时候, await()方法,才开始往下执行,, 看这个例子 CyclicBarrier 的用法, 字面意思:循环栅栏, 这是构造方法, 第一个参数parties 是线程数量, 第二个参数是barrierAction:...