ForkJoin全解2:forkjoin实际工作流程与实现

摘要:
稍后,任务通过ForkJoinThread执行,因此任务中的fork是一个内部操作,它调用push将任务提交到工作队列。在forkjoin框架中,worker更准确地引用具有所有者的workQueue。在forkjoin中,通过externalPush创建的workQueue没有所有者。

1、相关概念解释
 

1.1 “内部”和外部
 

当一个操作是在非ForkjoinThread的线程中进行的,则称该操作为外部操作。比如我们前面执行pool.invoke,invoke内又执行externalPush。由于invoke是在非ForkjoinThread线程中进行的(这里是在main线程中进行),所以是一个外部操作,调用的是externalPush。之后task的执行是通过ForkJoinThread来执行的,所以task中的fork就是内部操作,调用的是push,把任务提交到工作队列。其实fork的实现是类似下面这样的:

if(Thread.currentThread() instanceof ForkJoinThread){
         push(this)

}else{
         externaPush(this)

}

即fork会根据执行自身的线程是否是ForkJoinThread的实例来判断是处于外部还是内部。那为何要区分内外部?

任何线程都可以使用ForkJoin框架,但是对于非ForkJoinThread的线程,它到底是怎样的,ForkJoin无法控制,也无法对其优化。因此区分出内外部,这样方便ForkJoin框架对任务的执行进行控制和优化

1.2 Worker
从之前的叙述中,我们很可能会把线程跟worker等同起来,所以这里要明确指出两者是不同的。forkjoin框架中,worker更确切的指的是有owner的workQueue。forkjoin中,通过externalPush创建的workQueue是没有owner的。

总之,不是所有workQueue都有owner,而有owner的workQueue就是worker。

2、整体工作流程图


3、关键属性解释
3.1 pool的config属性
config是int类型的,其高16位存储mode,低16位存储你指定的并发度。mode有2个值:LIFO_QUEUE = 0,即高16位为0;以及FIFO_QUEUE  = 1 << 16,即高16位为1。mode主要用于控制用什么方法来获取任务,如果是先进先出,则用poll方法获取任务,如果是后进先出则用pop获取任务。源码示例:

(config & FIFO_QUEUE) == 0 ? pop() : poll();

当你new ForkJoinPool时,可以指定你要的并发度(parallelism),这个并发度将存储在config的低16位中。

3.2 workQueues属性
workQueues是pool的属性,它是WorkQueue类型的数组。externalPush和externalSubmit所创建的workQueue没有owner(即不是worker),且会被放到workQueues的偶数位置;而createWorker创建的workQueue(即worker)有owner,且会被放到workQueues的奇数位置。

3.3 workQueue的config属性
这是WorkQueue的config,高16位跟pool的config值保持一致,而低16位则是workQueue在workQueues数组的位置。

从workQueues属性的介绍中,我们知道,不是所有workQueue都有worker,没有worker的workQueue称为公共队列(shared queue),config的第32位就是用来判断是否是公共队列的。在externalSubmit创建工作队列时,有:

q.config = k | SHARED_QUEUE;

其中q是新创建的workQueue,k就是q在workQueues数组中的位置,SHARED_QUEUE=1<<31,注意这里config没有保留mode的信息。

而在registerWorker中,则是这样给workQueue的config赋值的:

w.config = i | mode;

w是新创建的workQueue,i是其在workQueues数组中的位置,没有设置SHARED_QUEUE标记位

3.4 scanstate属性
scanState是workQueue的属性,是int类型的。scanState的低16位可以用来定位当前worker处于workQueues数组的哪个位置。每个worker在被创建时会在其构造函数中调用pool的registerWorker,而registerWorker会给scanState赋一个初始值,这个值是奇数,因为worker是由createWorker创建,并会被放到WorkQueues的奇数位置,而createWorker创建worker时会调用registerWorker。

简言之,worker的scanState初始值是奇数,非worker的scanstate初始值=INACTIVE=1<<31,小于0(非worker的workQueue在externalSubmit中创建)。

当每次调用signalWork(或tryRelease)唤醒worker时,worker的高16位就会加1

另外,scanState<0表示worker未激活,当worker调用runtask执行任务时,scanState会被置为偶数,即设置scanState的最右边一位为0。

3.5 pool的ctl属性
pool中ctl是long类型的,主要作用有2点:

存储worker数目与目标并发度的关系
存储休眠的worker
ctl是64位的,每16位就是一个子属性,从高位到低位,4个子属性如下:

AC:最高的16位,值为活跃的worker数减去目标并行度
TC:AC之后的的6位,值为总的worker数减去目标并行度。所以,如果TC<0,表示worker数小于目标并行度,此时就要添加worker了。换言之,最高位可用来表示worker数是否足够,如果是1表示worker数不足够,要添加worker。ForkJoinPool专门定义一个变量ADD_WORKER=1<<47来表示TC的最高位
SS:等待队列中,最顶部的线程的版本数
ID:在无锁栈中等待的线程的poolIndex
其中低32位又称为SP(其实是工作队列的stackPred属性的简写),高32位称为UC。那么ctl是如何存储休眠的worker以及如何唤醒worker的?

3.6 ctl,stackPred,与scanState实现worker休眠栈
 

worker休眠时,是这样存储的

int ctlHigh32=ctl >>>32;

int ctlLow32=(int)ctl;

ctl=ctlHigh32+worker.scanState

worker.preStack=ctlLow32

worker的唤醒类似这样:

for(worker : pool.workQueues){
         if(worker.scanState==(int)ctl){
                  唤醒worker

                  worker.scanState的高16位加1

                  ctl的低32位=worker.preStack

                  退出循环

}

}

在worker休眠的4行伪码中,让ctl的低32位的值变为worker.scanState,这样下次就可以通过scanState唤醒该worker。唤醒该worker时,把该worker的preStack设置为ctl低32位的值,这样下下次唤醒的worker就是scanState等于该preStack的worker。

这里通过preStack保存下一个worker,这个worker比当前worker更早地在等待,所以形成一个后进先出的栈。

3.7 ctl,stackPred,与scanState实现worker休眠栈
runState是int类型的值,控制整个pool的运行状态和生命周期,有下面几个值(可以好几个值同时存在):

RSLOCK     = 1;

RSIGNAL    = 1 << 1;

STARTED    = 1 << 2;

STOP       = 1 << 29;

TERMINATED = 1 << 30;

SHUTDOWN   = 1 << 31;

如果runState值为0,表示pool尚未初始化。

RSLOCK表示锁定pool,当添加worker和pool终止时,就要使用RSLOCK锁定整个pool。如果由于runState被锁定,导致其他操作等待runState解锁(通常用wait进行等待),当runState设置了RSIGNAL,表示runState解锁,并通知(notifyAll)等待的操作。

剩下4个值都跟runState生命周期有关,都可以顾名思义:

当需要停止时,设置runState的STOP值,表示准备关闭,这样其他操作看到这个标记位,就不会继续操作,比如tryAddWorker看到STOP就不会再创建worker:

if(!stop){
         createWorker()

}

而tryTerminate对这些生命周期状态的处理则是这样的:

首先设置runState的SHUTDOWN,这样isShutdown等方法可以使用这个状态。然后判断查看是否设置了stop,如果否,则会通过信息worker等方式加快任务的执行,让任务尽快执行完毕,如果是则不会这样。之后开始终止pool,最后设置runState为TERMINATED。

要修改runState值,需要先调用lockRunstate,锁定runstate。lockRunstate是线程安全的,如果锁定失败,线程会调用wait等待。如果锁定成功,则使用unLockRunstate(oldRunstate,newRunstate),修改runstate值。unLockRunstate执行成功会调用notify唤醒那些在lockRunstate中等待的线程。

3.8 ctl,stackPred,与scanState实现worker休眠栈
当前top和base的初始值为 INITIAL_QUEUE_CAPACITY >>>1= (1 << 13)>>>1 = 8192/2。然后push一个task之后,top+=1,也就是说,top对应的位置是没有task的,最近push进来的task在top-1的位置。而base的位置则能对应到task,base对应最先放进队列的task,top-1对应最后放进队列的task。

3.9 workQueue的qlock
qlock值含义:1: locked, < 0: terminate; else 0

即当qlock值位0时,可以正常操作,值=1时,表示锁定

4、相关算法解释
4.1 求偶算法
int SQMASK=0x007e,则任何整数跟SQMASK位与后,得到的数就是偶数。

证明:

注意这里化为二进制是0111 1110,尤其注意最右边第一位是0,任何数跟最右边第一位是0的数位与后,得到的数就是偶数,因为位与之后,第一位就是0,比如s=A&SQMASK,A可以是任意整数,然后把s按二进制进行多项式展开,则有s=2^n1+2^n2 ……+2^nn,这里n≥1,所以s可以被2整除,即s是偶数。

所以一个数是奇数还是偶数,看其最右边第一位即可。

4.2 workQueue的hint属性与“奇数自加散列”算法
我们知道workQueue有externalPush创建的和createWorker创建的worker,两种方式创建的workQueue,其放置到workQueues的位置是不同的,前者放到workQueue的偶数位置,而后者则放到奇数位置。不同workQueue找到自己在workQueues的位置的算法有点不同。

下面看一下forkjoin框架获取workQueues中的偶数位置的workQueue的算法:

int r=ThreadLocalRandom.getProbe()

int m=workQueues.length-1,这里workQueues.length是2的指数幂

int SQMASK=0x007e

workQueue=workQueues[m & r & SQMASK]

这样就能获取workQueues的偶数位置的workQueue。m保证m & r & SQMASK这整个运算结果不会超出workQueues的下标,SQMASK保证取到的是偶数位置的workQueue。这里有一个有趣的现象,假设0到workQueues.length-1之间有n个偶数,m & r & SQMASK每次都能取到其中一个偶数,而且连续n次取到的偶数不会出现重复值,散列性非常好。而且是循环的,即1到n次取n个不同偶数,n+1到2n也是取n次不同偶数,此时n个偶数每个都被重新取一次。下面分析下r值有什么秘密,为何能保证这样的散列性

ThreadLocalRandom内有一常量PROBE_INCREMENT = 0x9e3779b9,以及一个静态的probeGenerator =new AtomicInteger() ,然后每个线程的probe= probeGenerator.addAndGet(PROBE_INCREMENT)所以第一个线程的probe值是0x9e3779b9,第二个线程的值就是0x9e3779b9+0x9e3779b9,第三个线程的值就是0x9e3779b9+0x9e3779b9+0x9e3779b9以此类推,整个值是线性的,可以用y=kx表示,其中k=0x9e3779b9,x表示第几个线程。这样每个线程的probe可以保证不一样,而且具有很好的离散性。

实际上,可以不用0x9e3779b9这个值,用任意一个奇数都是可以的,比如1。如果用1的话,probe+=1,这样每个线程的probe就都是不同的,而且具有很好的离散性。也就是说,假设有限制条件probe<n,超过n则产生溢出。则probe自加n次后才会开始出现重复值,n次前probe每次自加的值都不同。实际上用任意一个奇数,都可以保证probe自加n次后才会开始出现重复值,有兴趣可看本文最后附录部分。由于奇数的离散性,所以只要线程数小于m或者SQMASK两者中的最小值,则每个线程都能唯一地占据一个ws中的一个位置

externalPush等外部操作创建的workQueue:使用上面介绍的方法来获取偶数
createWorker:与externalPush不同的是,pool内部有一个静态常量SEED_INCREMENT=0x9e3779b9,以及一个普通属性indexSeed=0
int r=indexSeed += SEED_INCREMENT,所以获取的值跟externalPush是差不多的

无论哪种方式,最终都是workQueue.hint=r,即workQueue.hint的值就是用来定位workQueue所用的r。

5、任务提交
5.1 externalPush
根据执行任务提交的线程不同

示例中,forkJoinPool.invoke(task)是把任务放入工作队列,并等待任务执行。源码如下:

public <T> T invoke(ForkJoinTask<T> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task.join();
}
这里externalPush负责任务提交,externalPush源码如下:

final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws; WorkQueue q; int m;
    int r = ThreadLocalRandom.getProbe();

// runState有如下状态值:

// RSLOCK     = 1;

// RSIGNAL    = 1 << 1;

// STARTED    = 1 << 2;

// STOP       = 1 << 29;

// TERMINATED = 1 << 30;

// SHUTDOWN   = 1 << 31;

//如果runstate为0,表示无状态,即当前没有其他线程在执行其他操作,比如锁定线程池,终止线程池等,小于0

//表示处于SHUTDOWN状态

int rs = runState;

// workQueues!=null说明已经初始化过
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&

    //q = ws[m & r & SQMASK]就是取ws中的偶数的队列。另外m的值也要注意,m= ws.length - 1,

    //ws数组长度需要是2的指数幂,这样m的值有效位就都是1,比如假设ws长度是8,则m=7,7的二进制有效位是

    //0111,注意后面三位全是1。因此保证ws长度是2的指数幂可以让m的有效位都是1,从而位与的结果基本由r决

    //定,而m的作用就是用来限制位与后的结果不会超过m。

        (q = ws[m & r & SQMASK]) != null && r != 0

        //rs>0说明线程池在运行中,如果处于0,说明线程池还未初始化

        && rs > 0 &&

        //实现无锁锁定,跟锁不同的是,执行CAS失败的线程无法进入if内部,而是直接跳过if,执行if之外的代码

       U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        ForkJoinTask<?>[] a; int am, n, s;
        if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);
            U.putOrderedInt(q, QTOP, s + 1);

       //解锁。因为前面锁定了代码,所以这里必须解锁
            U.putIntVolatile(q, QLOCK, 0);

       //n <= 1说明此队列任务之前被处理完毕,处理此队列任务的线程可能处于等待状态,故可能需要将这些线程唤

       //醒。如果这些线程已经终结,则要看当前活动线程数是否足够,不足则需要添加线程
            if (n <= 1)
                signalWork(ws, q);
            return;
        }

    //解锁
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    externalSubmit(task);
}
5.2 externalSubmit
externalSubmit是externalPush的完整版本,从externalPush源码中可以看到,它把很多“疑难杂症”都交给externalSubmit处理,自己仅处理简单的情况。前面externalPush中的第一个if有这样一个条件:&&U.compareAndSwapInt(q, QLOCK, 0, 1),如果执行CAS失败,难道就要把任务抛弃?显然不是,所以看一下externalSubmit是怎么处理这种情况的。

从externalPush的注释中,我们知道每个线程有自己的probe,通过probe,每个线程跟一个队列绑定。externalSubmit的策略比较简单,就是之前在externalPush中由于执行CAS失败而无法push的任务,在externalSubmit再执行一次CAS,如果成功则把任务放入线程对应的队列,如果失败说明该队列比较繁忙,所以externalSubmit就给该线程换一个probe,从而给该线程换一个队列。显然,这个过程是循环的,即每次执行CAS失败externalSubmit就会给该线程换一个队列,直到执行CAS成功为止。

private void externalSubmit(ForkJoinTask<?> task) {

    int r;                                    // initialize caller's probe

    if ((r = ThreadLocalRandom.getProbe()) == 0) {

        ThreadLocalRandom.localInit();

        r = ThreadLocalRandom.getProbe();

    }

    for (;;) {

        WorkQueue[] ws; WorkQueue q; int rs, m, k;

        boolean move = false;

       // runstate小于0,此时runState对应的状态是terminate

        if ((rs = runState) < 0) {

            tryTerminate(false, false);     // help terminate

            throw new RejectedExecutionException();

        }

       // 如果runState的STARTED对应的位是0,表示pool未启动,所以要进行初始化,并启动它

        else if ((rs & STARTED) == 0 ||    

                 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {

            int ns = 0;

            rs = lockRunState();

            try {

                if ((rs & STARTED) == 0) {

                    U.compareAndSwapObject(this, STEALCOUNTER, null,

                                           new AtomicLong());

                    // workQueues的 array的大小必须是2的指数幂

                    int p = config & SMASK; // ensure at least 2 slots

                    int n = (p > 1) ? p - 1 : 1;

                  // 这一堆让人眼花缭乱的位移操作,是保证从n的最左边的1开始,一直到最右边的所有位都是1

                  //举个例子,假设n的二进制为0000 0000 1000 0000,则n |= n >>> 1后,n变为

//0000 0000 1100 0000,注意原先的1后面一位变成了1,n |= n >>> 2后,变为

//0000 0000 1111 0000,注意,原先2个1,然后这2个1后面多了2个1。以此类推,n |= n >>> 4

//则让n的最右边的1的后面4位变为1,即n变为了0000 0000 1111 1111。剩下的操作就没任何效果了

//因为后面没有位数了。之后再加1,得到的数就是2的指数幂。

                    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;

                    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;

                    workQueues = new WorkQueue[n];

                    ns = STARTED;

                }

            } finally {

                unlockRunState(rs, (rs & ~RSLOCK) | ns);

            }

        }

       //externalPush中,由于CAS失败而提交失败的任务,在这里会被再次提交进去

        else if ((q = ws[k = r & m & SQMASK]) != null) {

            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {

                ForkJoinTask<?>[] a = q.array;

                int s = q.top;

                boolean submitted = false; // initial submission or resizing

                try {                      // locked version of push

                    if ((a != null && a.length > s + 1 - q.base) ||

                        (a = q.growArray()) != null) {

                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;

                        U.putOrderedObject(a, j, task);

                        U.putOrderedInt(q, QTOP, s + 1);

                        submitted = true;

                    }

                } finally {

                    U.compareAndSwapInt(q, QLOCK, 1, 0);

                }

                if (submitted) {

                    signalWork(ws, q);

                    return;

                }

            }

            move = true;                   // move on failure

        }

       //创建新队列

        else if (((rs = runState) & RSLOCK) == 0) {

            q = new WorkQueue(this, null);

            q.hint = r;

            q.config = k | SHARED_QUEUE;

            q.scanState = INACTIVE;

            rs = lockRunState();           // publish index

            if (rs > 0 &&  (ws = workQueues) != null &&

                k < ws.length && ws[k] == null)

                ws[k] = q;                 // else terminated

            unlockRunState(rs, rs & ~RSLOCK);

        }

        else

            move = true;                   // move if busy

        if (move)

            r = ThreadLocalRandom.advanceProbe(r);

    }

}
5.3 任务提交中用到的同步机制
externalSubmit和externalPush都使用一个CAS操作来保证同步:

U.compareAndSwapInt(q, QLOCK, 0, 1)

其实forkjoinPool内workQueues数组已经设置的比较大了,为2^16=65536,外部线程对任务的提交只用到其中的偶数部分,但也有32768。通常线程数上百已经算比较多的了,但是相对于32786而言,还是比较小的,所以会发生冲突的几率就很小。这也是为何使用CAS来简单地保证同步的原因。

6、创建和唤醒worker
6.1 唤醒和创建worker:signalWork
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
// ctl<0表示活跃worker少于目标并行度
while ((c = ctl) < 0L) {
//为0表示没有正在等待的worker,即没有空闲worker
if ((sp = (int)c) == 0) {
// 表示总线程数少于目标并发度,需要添加worker
if ((c & ADD_WORKER) != 0L)
tryAddWorker(c);
break;
}
// 线程池未开始运行,即未初始化或者已经终止
if (ws == null)
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}
6.2 添加worker:tryAddWorker与createWorker
tryAddWorker通过createWorker创建worker

private void tryAddWorker(long c) {
boolean add = false;
do {
//AC和TC加1
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
if (stop != 0)
break;
if (add) {
createWorker();
break;
}
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}

private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (fac != null && (wt = fac.newThread(this)) != null) {
//这里启动了线程
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
//出现异常,取消注册
deregisterWorker(wt, ex);
return false;
}
6.3 registerWorker
registerWorker在ForkJoinThread的构造函数中被调用,这是ForkJoinThread向pool注册自身的一个方法。

final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
//配置worker为守护worker,所以worker会随main的结束而自动结束
wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // assign a pool index
int mode = config & MODE_MASK;
int rs = lockRunState();
try {
WorkQueue[] ws; int n; // skip if no array
if ((ws = workQueues) != null && (n = ws.length) > 0) {
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
i = ((s << 1) | 1) & m; // odd-numbered indices
if (ws[i] != null) { // collision
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
w.hint = s; // use as random seed
w.config = i | mode;
w.scanState = i; // publication fence
ws[i] = w;
}
} finally {
unlockRunState(rs, rs & ~RSLOCK);
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
6.3 ForkJoinThread的run
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
//从这里开始,worker开始执行任务
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
7、任务执行
线程启动后会执行pool的runWorker:

final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t;;) {
if ((t = scan(w, r)) != null)
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
//这里因为runWorker窃取了一个任务,所以需要++nsteals
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}
7.1 任务扫描(任务窃取)
scan的作用是扫描workQueues,窃取一个task,因为worker刚刚创建,所以worker的工作队列是空的,必须从外面扫一个task来执行。如果扫描来的task在被执行时调用了task.fork,生成新的task,则新的task就会被push到这个worker的工作队列。之后worker会执行自身工作队列中的task。当自身工作队列执行完毕,进入下一次循环,再次扫描task。如果扫描不到task,说明worker,多余了,即现有的worker已经足够执行workQueues中的任务了,此时worker应当休眠,在产生新task时,可能会被唤醒。如果workQueues已经没有多余任务可以执行,说明pool要终止了,则应当终止worker。worker应当休眠还是终止,这个逻辑在awaitWork实现,可查看后续章节

// w是一个worker,r是一个随机种子,用于获取workQueues中的一个workQueue
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
//如果获取到的队列不为空,开始扫描task
if ((q = ws[k]) != null) {
//如果q有task
if ((n = (b = q.base) - q.top) < 0 &&(a = q.array) != null) { // non-empty
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
//如果能从q获取到task。
if ((t = ((ForkJoinTask<?>)U.getObjectVolatile(a, i))) != null &&
//这里q.base == b是保证base不变,如果base变了,说明有另一个线程获取到这个base
//对应的任务且很可能被执行了,所以如果base变了,获取到的task就无效了
q.base == b) {
//如果w是激活的
if (ss >= 0) {
//如果CAS失败,说明有其他线程在操作a,此时应换一个队列,即更改r值并执行continue
//进行新一轮的循环
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1;
//如果q剩余的任务数大于1,则尝试唤醒或创建worker,加速任务的处理
if (n < -1) // signal others
signalWork(ws, q);
return t;
}
}
// 如果w未激活,但是这时候又能获得task,说明任务可能比较多,现有的活动的worker可能
//处理不过来,所以尝试激活worker,增加活动的worker
else if (oldSum == 0 &&
w.scanState < 0)
//尝试唤醒休眠的worker或创建worker
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
//如果获取不到task,且w未激活
if (ss < 0) // refresh
//把w.scanState值重新赋予ss,因为这期间可能有其他线程激活了w。
ss = w.scanState;
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
//origin,scan是从origin对应的位置扫描队列的,即q=workQueues[origin]但是如果
//队列q竞争比较大,即有多个worker在操作它,则需要换一个队列重新扫描,即执行下面的
// r ^= r << 1; r ^= r >>> 3; r ^= r << 10;和 origin = k = r & m,continue重新扫描。
//所以origin对应的队列是非空,且有task,而且竞争度低的队列
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
// 判断是否要讲worker设置为未激活。然后分析下怎样的worker应该设置为失活。首先
//(k = (k + 1) & m) == origin,要满足这个条件必须保证origin竞争度低,且worker是激活状态
//由于k每次加1,所以当k==origin,此时说明workQueues中的元素被遍历了一遍。但仍然获取不到task
if ((k = (k + 1) & m) == origin) { // continue until stable
//如果worker是激活状态则直接进入if内部把worker设置为失活。注意这里的失活是伪失活,因为
//仅仅是把当前worker的scanstate设置到ctl,而没有真正让线程休眠。线程设置为失活后,还会继续
//扫描任务,因为此时还未退出循环。当代码再次来到这里时,此时worker是伪失活状态,且
//workQueues又被遍历了一遍。这时候判断ss == (ss = w.scanState))就是判断worker有没有
//被其他线程或者外部线程调用signalWork或者tryRelease进行唤醒。如果满足则判断
// oldSum == (oldSum = checkSum),这个条件要满足,当前取到的q必须为空,如果q不为空,要么
// checkSum被更改,导致不相等,要么就进入重新扫描,代码到不了这里。由于伪失活后,循环没有
//结束,仍然在继续扫描。如果伪失活后,扫到的队列一直非空,说明任务比较多,现有的活动的worker
//可能处理不过来,所以伪失活worker会继续扫描。如果扫到为空的队列,则退出循环,然后交由
//awaitWork来真正让worker休眠或者终止worker
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
//worker伪失活后,重新扫描
checkSum = 0;
}
}
}
return null;
}
7.2 worker休眠与终结
如果返回false表示worker需要终结。

private boolean awaitWork(WorkQueue w, int r) {
//qlock值的含义:1: locked, < 0: terminate; else 0,所以这里小于0表示终止
if (w == null || w.qlock < 0) // w is terminating
return false;
for (int pred = w.stackPred, spins = SPINS, ss;;) {
//表示worker被重新激活了,跳出循环返回true
if ((ss = w.scanState) >= 0)
break;
//空转,当前SPINS值为0,表示不空转。空转表示执行的代码啥也不敢,就是拖拖时间,看空转期间相关的状态是否
//被其他线程更改
else if (spins > 0) {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
if (r >= 0 && --spins == 0) { // randomize spins
WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
if (pred != 0 && (ws = workQueues) != null &&
(j = pred & SMASK) < ws.length &&
(v = ws[j]) != null && // see if pred parking
(v.parker == null || v.scanState >= 0))
spins = SPINS; // continue spinning
}
}
else if (w.qlock < 0) // recheck after spins
return false;
else if (!Thread.interrupted()) {
long c, prevctl, parkTime, deadline;
int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
//这里由于采用了非阻塞策略,所以tryTerminate中调用Thread.interrupt并不会让当前线程
//抛出中断异常而终止,所以需要做一下判断
if ((ac <= 0 && tryTerminate(false, false)) ||
(runState & STOP) != 0) // pool terminating
return false;
//当前没有活跃worker,且当前worker处于worker等待栈的顶部
if (ac <= 0 && ss == (int)c) { // is last waiter
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
int t = (short)(c >>> TC_SHIFT); // shrink excess spares
//因为当前已经没有活跃线程了,这意味着任务已经执行完毕,且没有调用pool的shutdown关闭
//pool,所以留下一部分线程,这样当有新任务来时,可以复用这些线程。但是留下的线程也不能太多
//毕竟目前已经没有要处理的任务了。t>2表示已经预留了足够多的线程了,而使用CAS则是因为CTL有可能
//被其他线程更改,为何还会有其他线程更改呢?有这几种情况,1是因为其他线程在执行tryTerminate把
//自己设置为未激活状态,从而让自己处于worker等待栈的顶部,而当前worker则不再处于worker等待栈
//的顶部,栈中元素得一个一个出才行,不能出栈任意位置的元素,否则会有线程安全问题。另外则是可能
//“来活了”,即有新任务来了,所以有线程被激活或有新线程被创建。所以必须满足下面if的两个条件才能
//终止当前worker
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // else use timed wait
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
}
else
prevctl = parkTime = deadline = 0L;
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
w.parker = wt;
if (w.scanState < 0 && ctl == c) // recheck before park
U.park(false, parkTime);
//如果线程等待了一段时间发现还没有任务要执行,则终止自己,否则线程自动醒来执行任务
U.putOrderedObject(w, QPARKER, null);
U.putObject(wt, PARKBLOCKER, null);
if (w.scanState >= 0)
break;
if (parkTime != 0L && ctl == c &&
deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // shrink pool
}
}
return true;
}
8、任务分割与等待:fork和join
fork和join是ForkJoinTask的方法,也是整个框架的设计灵魂:fork把任务切分为小任务,join则等待任务结果。

8.1 fork
fork的实现异常简单:

public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
//放到当前worker的workQueue
((ForkJoinWorkerThread)t).workQueue.push(this);
else
//common是一个静态的,ForkJoinPool实例,不可关闭,所以即使你不new一个ForkJoinPool
//直接调用fork,此时任务就被提交到common了
ForkJoinPool.common.externalPush(this);
return this;
}
8.2 join
public final V join() {
int s;
//doJoin是一个等待任务执行完毕的方法,当任务执行完毕就会返回
//这里虽然等待,但是却并不一定阻塞
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;


//status表示任务执行状态,有如下值:
// static final int NORMAL = 0xf0000000; // 必须为负数
// static final int CANCELLED = 0xc0000000; // 必须 < NORMAL
// static final int EXCEPTIONAL = 0x80000000; // 必须 < CANCELLED
// static final int SIGNAL = 0x00010000; //必须 >= 1 << 16
//所以status<0表示任务已经完成,或取消,或抛异常,无论怎样,就是任务结束了
//下面这段代码的意思是,任务是否结束,是则返回,否则看任务执行线程是否是ForkJoinWorkerThread实例
//否则externalAwaitDone()等待,是则尝试让任务出队并执行,如果不能出队或者执行无法完成,则使用
// wt.pool.awaitJoin(w, this, 0L)等待。externalAwaitDone()基本就是wait和notify,而awaitJoin
//就复杂点,需要帮助窃取当前任务的worker(称为stealer)尽快执行任务,即会窃取stealer的任务来执行
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s : //尝试从自身队列中获取当前任务来执行
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}


private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}

final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
if (task != null && w != null) {
// currentJoin这个属性基本仅在这个方法内使用,这里先把currentJoin保存在prevJoin
//而把当前任务引用赋予currentJoin,所以这里可以看出,任务之间必须是有向无环图,比如任务A和B
//不能A等待B完成,然后B又等待A完成,这样会造成死锁。不过真遇到这种情况,也有解决方法,就是
//设置等待超时时间,所以下面的wait即internalWait的等待是有超时时间的
ForkJoinTask<?> prevJoin = w.currentJoin;
U.putOrderedObject(w, QCURRENTJOIN, task);
CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
(CountedCompleter<?>)task : null;
for (;;) {
//如果task完成
if ((s = task.status) < 0)
break;
if (cc != null)
helpComplete(w, cc, 0);
//如果当前工作队列没有任务,且task未完成,则task被人窃取了
else if (w.base == w.top || w.tryRemoveAndExec(task))
//帮助stealer执行任务,毕竟当前自身队列的任务已经执行完了,所以帮助stealer执行任务,
//以尽快能够返回
helpStealer(w, task);
if ((s = task.status) < 0)
break;
long ms, ns;
if (deadline == 0L)
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
//尝试增加一个线程作为补偿,因为当前线程准备进入等待,能到达这里,表示
//任务未完成,且工作队列还有任务
if (tryCompensate(w)) {
task.internalWait(ms);
U.getAndAddLong(this, CTL, AC_UNIT);
}
}
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
}
return s;
}


/**
* 从队列中获取指定task来执行,或其他已经取消的task。此方法仅被awaitJoin使用
* @return true 如果队列为空,且task不知是否完成时
*/
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; int m, s, b, n;
if ((a = array) != null && (m = a.length - 1) >= 0 &&
task != null) {
while ((n = (s = top) - (b = base)) > 0) {
for (ForkJoinTask<?> t;;) { // traverse from s to b
//从top的前一个位置,即第一个元素开始遍历
long j = ((--s & m) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
//如果获取到的task为空,s+1==top为true,表示当前队列为空,否则非空
//如果队列非空,说明队列内的元素都遍历过了
return s + 1 == top; // shorter than expected
else if (t == task) {
boolean removed = false;
if (s + 1 == top) { // pop
if (U.compareAndSwapObject(a, j, task, null)) {
U.putOrderedInt(this, QTOP, s);
removed = true;
}
}
//如果到这里,说明获取的t是在base和top-2之间的。base == b表示任务没有被偷,
//因为如果此时有任务被偷,可能被偷的任务就是t,所以加一层判断
else if (base == b) // replace with proxy
removed = U.compareAndSwapObject(
a, j, task, new EmptyTask());
if (removed)
task.doExec();
break;
}
else if (t.status < 0 && s + 1 == top) {
if (U.compareAndSwapObject(a, j, t, null))
U.putOrderedInt(this, QTOP, s);
break; // was cancelled
}
if (--n == 0)
return false;
}
if (task.status < 0)
return false;
}
}
return true;
}
/**
* 主要帮助stealer执行任务。当执行此方法时当前worker处于join等待子任务执行完毕,但子任务被其他worker
* 窃取所以一定有一个stealer,可以通过遍历所有worker,看它们的currentSteal是否跟等待的task匹配,如果
* 匹配的话,该worker就是stealer。然后窃取该stealer的任务来执行。借此加速自身的子任务执行,因为当
* stealer任务执行完毕,意味着worker当前执行的任务的join将有返回结果。在执行窃取的任务期间,也可能会
* 产生新的子任务以及发生对子任务的join,这个过程类似这样:
* 主任务join ->执行窃取的任务,导致生成子任务 -> 子任务join -> 子任务执行后返回
* 返回过程则反过来
* 子任务执行后返回-> 子任务join结束并返回->窃取的任务join结束,并返回->主任务join结束
* 由于每个worker自身就能处理fork产生的子任务,且任务没有循环依赖,所以整个过程不会出现死锁的情况
* 另外,worker也可能找不到stealer。如果找不到stealer,则表示当前task已经被执行,如果task的状态是
* 还未执行完毕,则执行这个task的子任务的stealer也肯定处于join,且在窃取任务来执行。而worker则因为
* 找不到stealer,要么等待,要么空转.整个实现思想是:执行任务,如果有子任务,则join时处理子任务,因为只有
* 子任务处理完,任务才能完成,所以要先帮助子任务执行。如果子任务被窃取,则找到stealer,帮助stealer执行
* 从而帮助子任务的执行。但是如果任务被窃取,又找不到stealer,说明子任务正在其他worker上执行,所以当前
* worker就只能等待或空转了
*/
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
WorkQueue[] ws = workQueues;
int oldSum = 0, checkSum, m;
if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
task != null) {
do { // restart point
checkSum = 0; // for stability check
ForkJoinTask<?> subtask;
// j是等待task完成,要进行join的worker,v是task的stealer
WorkQueue j = w, v;
descent: for (subtask = task; subtask.status >= 0; ) {
//这个for是查找窃取subtask的stealer,但无论是否能找到这个worker,最终都会返回v
for (int h = j.hint | 1, k = 0, i; ; k += 2) {
if (k > m) // can't find stealer
break descent;
if ((v = ws[i = (h + k) & m]) != null) {
if (v.currentSteal == subtask) {
j.hint = i;
break;
}
checkSum += v.base;
}
}
//第一轮循环如果能找到stealer,且当前处于join,则窃取stealer的task来执行。
//第二轮循环,如果stealer处于join,则stealer将窃取stealer的stealer的task来执行
//第三轮就是处理stealer的stealer,以此类推。
for (;;) { // help v or descend
ForkJoinTask<?>[] a; int b;
checkSum += (b = v.base);
ForkJoinTask<?> next = v.currentJoin;
//subtask执行完毕
if (subtask.status < 0 ||
//第一轮循环的时候这不可能发生,因为j就是传进来的worker,而subtask就是task
//但是第二轮的时候,j是stealer,而subtask就是stealer之前的currentjoin,这表示
//stealer把之前currentJoin的任务执行完了,而这个currentJoin的任务可能是当前task
//的子任务所以退出循环,看当前task是否已经执行完。而且这个for本来就是帮助stealer
//在处于join时,窃取别人任务来执行的,现在currentJoin变了,就没有继续的必要
j.currentJoin != subtask ||
//找不到stealer。找不到的原因是,上一个for中没找到,或者上一个for中找到了
//但由于stealer在执行窃取的任务,且这时候也在执行此方法,因此改了currentSteal
//具体看下面执行窃取任务的部分的代码。无论什么原因,最终都等价于找不到stealer
v.currentSteal != subtask) // stale
break descent;
if (b - v.top >= 0 || (a = v.array) == null) {
if ((subtask = next) == null)
break descent;
j = v;
break;
}
int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
ForkJoinTask<?> t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i));
//准备执行窃取的任务
if (v.base == b) {
if (t == null) // stale
break descent;
if (U.compareAndSwapObject(a, i, t, null)) {
v.base = b + 1;
ForkJoinTask<?> ps = w.currentSteal;
int top = w.top;
do {
U.putOrderedObject(w, QCURRENTSTEAL, t);
t.doExec(); // clear local tasks too
} while (task.status >= 0 &&
w.top != top &&
(t = w.pop()) != null);
U.putOrderedObject(w, QCURRENTSTEAL, ps);
if (w.base != w.top)
return; // can't further help
}
}
}
}
} while (task.status >= 0 && oldSum != (oldSum = checkSum));
}
}
9、pool终止
/**
* Possibly initiates and/or completes termination.
*
* @param now 如果true,则无条件终止,否则会等待没有work且没有活动worker时才终止
* @param enable 表示是否允许关闭,为false表示当runstate为非SHUTDOWN状态时,将不会关闭。若为true,且
* now为false,则如果发现还有活动的worker或未执行完的任务,则不会关闭
* @return true 表示正在关闭或者已经关闭
*/
private boolean tryTerminate(boolean now, boolean enable) {
int rs;
if (this == common) // cannot shut down
return false;
if ((rs = runState) >= 0) {
if (!enable)
return false;
rs = lockRunState(); // enter SHUTDOWN phase
unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
}
//这里主要是检测是否有其他线程执行了tryTerminate,并且已经进入stop阶段。因为其他线程也会调用
//awaitWork并因此调用tryTerminate。如果没有设置STOP,则进入SHUTDOWN阶段。整个终结过程被分为
// SHUTDOWN和stop两个阶段,SHUTDOWN之后就是stop。SHUTDOWN主要是对剩余任务的处理,如果now为false
//则SHUTDOWN阶段会尽快把剩余任务执行完毕,否则忽略这些任务进入stop阶段。且设置SHUTDOWN后,pool
//不再允许外部推送任务。
if ((rs & STOP) == 0) {
if (!now) { // check quiescence
for (long oldSum = 0L;;) { // repeat until stable
WorkQueue[] ws; WorkQueue w; int m, b; long c;
long checkSum = ctl;
if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
return false; // still active workers
if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
break; // check queues
//遍历队列,如果队列有task,则尝试激活一个worker来执行task
for (int i = 0; i <= m; ++i) {
if ((w = ws[i]) != null) {
if ((b = w.base) != w.top || w.scanState >= 0 ||
w.currentSteal != null) {
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
return false; // arrange for recheck
}
//如果队列不为空,checksum就会变
checkSum += b;
if ((i & 1) == 0)
w.qlock = -1; // try to disable external
}
}
//如果所有队列都为空,则break。分析下oldSum == (oldSum = checkSum)成立的条件:
//刚刚开始checksum=ctl,oldSum=0,如果此时ctl=0,条件将成立,但这是不可能的,因为
//ctl=0,则ctl的SP部分为0,AC为0,TC也为0,但是AC为0表示还有活跃的worker,所以
//在前面会return false,代码到不了这里。所以第一次到达这里时,条件一定不成立,一定会
//进行新一轮循环。从第二次循环开始,如果发现不为空的队列,checksum就会改变,再次导致条件
//不成立。所以当这个条件成立时,意味着至少执行了2次循环,且最后一次循环时,所有队列都为空
if (oldSum == (oldSum = checkSum))
break;
}
}
if ((runState & STOP) == 0) {
rs = lockRunState(); // enter STOP phase
unlockRunState(rs, (rs & ~RSLOCK) | STOP);
}
}
//pass表示循环执行了几轮,第一轮是0,第二轮pass是1,以此类推。如果没有退出循环,
//第一轮仅仅是设置所有工作队列的qlock为-1,表示terminate,然后进入第二轮,取消所有队列的任务
//并尝试激活所有休眠中的worker,然后进入第三轮,中断worker的owner对应的线程。
//第四轮结束tryTerminate
int pass = 0; // 3 passes to help terminate
for (long oldSum = 0L;;) { // or until done or stable
WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
long checkSum = ctl;
if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
(ws = workQueues) == null || (m = ws.length - 1) <= 0) {
if ((runState & TERMINATED) == 0) {
rs = lockRunState(); // done
unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
synchronized (this) { notifyAll(); } // for awaitTermination
}
break;
}

for (int i = 0; i <= m; ++i) {
if ((w = ws[i]) != null) {
checkSum += w.base;
w.qlock = -1; // try to disable
if (pass > 0) {
w.cancelAll(); // clear queue
if (pass > 1 && (wt = w.owner) != null) {
if (!wt.isInterrupted()) {
try { // unblock join
wt.interrupt();
} catch (Throwable ignore) {
}
}
if (w.scanState < 0)
//这里之所以要把线程唤醒,是因为有些很多操作采用非阻塞的策略,所以调用
// interrupt可能是无法真正中断线程的,故需要将线程唤醒,让其看到中断
//标记自行终结
U.unpark(wt); // wake up
}
}
}
}
//当前面存在不为空的工作队列时,就会checkSum != oldSum
if (checkSum != oldSum) { // unstable
oldSum = checkSum;
pass = 0;
}
else if (pass > 3 && pass > m) // can't further help
break;
else if (++pass > 1) { // try to dequeue
long c; int j = 0, sp; // bound attempts
while (j++ <= m && (sp = (int)(c = ctl)) != 0)
tryRelease(c, ws[sp & m], AC_UNIT);
}
}
return true;

————————————————
版权声明:本文为CSDN博主「花公子丶」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/yu766588220/article/details/106870608

免责声明:文章转载自《ForkJoin全解2:forkjoin实际工作流程与实现》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇小白自制Linux开发板 六. SPI TFT屏幕修改与移植MDK5.29,5.30,5.31和各种软件包镜像下载(2020-07-03)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

随便看看

ubuntu网卡配置

网卡配置文件采用YAML格式,必须以/etc/netplan/XXX.yaml文件命名方式存放可以每个网卡对应一个单独的配置文件,也可以将所有网卡都放在一个配置文件里自动获取IProot@ubuntu1804:~#cat/etc/netplan/01-netcfg.yaml#Thisfiledescribesthenetworkinterfacesavail...

Jmeter中获取返回结果中的值

在jmeter的测试中,通常需要在下一个请求中使用上一个请求的返回值。如何获得返回值非常重要。插件下载地址为:http://jmeter-plugins.org/wiki/JSONPathExtractor/下载后,将lib文件夹放在jmeter目录中。...

C#探秘系列(十)WPF:打开文件选择器选择文件并保存

//此为点击按钮的监听事件,点击按钮弹出文件选择器privatevoidimageButton_Click(objectsender,RoutedEventArgse){vardialog=newOpenFileDialog();dialog.Filter=".jpg|*.jpg|.png|*.png|.jpeg|*.jpeg";if(dialog.Show...

CentOS7 复制文件夹和移动文件夹

CentOS7在Linux中复制、移动和删除文件的命令有:cp、mv、rm I。文件复制命令cp命令格式:cp[-adfilprsu]源文件(source)目标文件(destination)cp[option]source1source2source3…directory参数描述:-a:指存档,即复制所有目录-d:如果源文件是连接文件(linkfile...

fiddler抓包+雷电模拟器 完成手机app抓包的配置

找到系统应用,点击设置,点击无线网络WLAN—˃左键常按点击已连接网络—˃修改网络鼠标左键长按在桌面找到下面这个文件之后双击打开上面证书弄完之后。可以说本机已经安装过证书了,如果你能在模拟器上找到这个证书就不用将这个证书再拉入模拟器了在模拟器中打开系统应用—˃设置—˃安全—˃从SD卡安装。找到FiddlerRoot.cer文件,按提示导入即可,注意在此过程需...

【转】设置:ftl模板文件编辑器

在学习了Freemarker如何生成Word文件的知识后,博客作者使用Eclipse进行开发,并使用内置JSP框架编辑ftl文件。在编辑ftl文件之前,如果您的Eclipse没有指定ftl的编辑方法,则会提示您安装ftl模板的编辑器或使用TextEditor进行编辑。我没有选择在这里为ftl模板安装编辑器,所以我跳过了。...