ThreadPoolExecutor 核心实现原理和源码解析

摘要: 本文将详细分析 ThreadPoolExecutor 的实现原理,并结合源码介绍 ThreadPoolExecutor 的重要操作,对理解 ThreadPoolExecutor 非常有帮助。本文中源码基于 JDK1.7
前面的文章已经详细分析了 Executor 框架及其家族的各个成员,为介绍本文做了铺垫,因此分析 ThreadPoolExecutor 核心实现原理可谓千呼万唤使出来啊,直奔主题吧!

首先从 ThreadPoolExecutor 的构造方法开始。

ThreadPoolExecutor 的构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

构造器中各个参数的含义:

corePoolSize:核心池的大小,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到工作队列当中。只有当工作队列满了的情况下才会创建超出这个数量的线程。如果某个线程的空闲时间超过了活动时间,那么将标记为可回收,并且只有当线程池的当前大小超过 corePoolSize 时该线程才会被终止。用户可调用 prestartAllCoreThreads()或者 prestartCoreThread() 方法预先创建线程,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。

maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;当大于了这个值就会将 Thread 由一个丢弃处理机制来处理。

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize,即当线程池中的线程数大于 corePoolSize 时,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean) 方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为 0;

Unit:参数 keepAliveTime 的时间单位,有 7 种取值,在 TimeUnit 类中有 7 种静态属性。

workQueue:一个阻塞队列,用来存储等待执行的任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中。

threadFactory:线程工厂,主要用来创建线程;

handler:表示当拒绝处理任务时的策略,也就是参数 maximumPoolSize 达到后丢弃处理的方法。有以下四种取值:

ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出 RejectedExecutionException 异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
用户也可以实现接口 RejectedExecutionHandler 定制自己的策略。

下面将深入剖析线程池的实现原理:

线程池状态
JDK1.7 中使用原子变量 ctl 来控制线程池的状态,其中 ctl 包装了以下两个 field:

workerCount:表示有效的线程数
runState:表示线程池的状态,是否运行,关闭等

由于 workerCount 和 runState 被保存在一个 int 中,因此 workerCount 限制为(2 ^ 29)-1(约 5 亿)线程。其使用 shift / mask 常数来计算 workerCount 和 runState 的值。源码如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

workerCount 是已经启动但没有停止的 worker 线程数量。

runState 用于控制线程池的生命周期状态,主要包含以下几个值:

RUNNING 接收新任务,并且处理任务队列中的任务,当创建线程池后,初始时,线程池处于 RUNNING 状态
SHUTDOWN 不接收新任务,但是处理任务队列的任务
STOP 不接收新任务,不处理任务队列,同时中断所有进行中的任务
TIDYING 所有任务已经被终止,工作线程数量为 0,到达该状态会执行 terminated()
TERMINATED terminated() 已经完成
各状态的转换关系:

RUNNING -> SHUTDOWN:shutdown() 被调用

(RUNNING or SHUTDOWN) -> STOP:shutdownNow() 被调用

SHUTDOWN -> TIDYING:队列和池均为空

STOP -> TIDYING:池为空

TIDYING -> TERMINATED:钩子方法 terminated() 已经完成。

d753edfd7c1b4493a37c3fca94ea0507_image.png

当线程池状态为 TERMINATED 时,调用 awaitTermination() 的线程将从等待中返回。

Worker
考虑到将 Worker 实现分析加入本文将导致文章太长,不宜阅读,关于 Worker 的核心实现以及 ThreadPoolExecutor 的核心方法 runWorker、getTask 和 processWorkerExit 的功能分析和源码解读请参见 ThreadPoolExecutor 核心实现原理和源码解析 < 二 > https://my.oschina.net/7001/blog/889770。

addWorker
addWorker 用于添加并启动工作线程,先看其流程图:

5c2e07d17ff248fa898c597a1fb16c41_image.png

源码解析如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /** 这里返回false有以下可能:
          * 1 线程池状态大于SHUTDOWN
          * 2 线程池状态为SHUTDOWN,但firstTask不为空,也就是说线程池已经SHUTDOWN,拒绝添加新任务
          * 3 线程池状态为SHUTDOWN且firstTask为空,但workQueue为空,即无任务需要执行
          */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            /** 返回false有以下可能:
              * 1 工作线程数量超过最大容量
              * 2 core为true,工作线程数量超过边界corePoolSize
              * 3 core为false,工作线程数量超过边界maximumPoolSize
              */
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;//直接跳出最外层循环
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)//线程池状态发生改变则从最外层循环重新开始
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    Worker w = new Worker(firstTask);
    Thread t = w.thread;

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 持有锁之后需要重新检查线程池状态,防止ThreadFactory返回失败或线程池在加锁之前被关闭
        int c = ctl.get();
        int rs = runStateOf(c);
         /** 返回false有以下可能:
           * 1 t为null,说明ThreadFactory创建线程失败,可能发生OutOfMemoryError
           * 2 线程池状态大于SHUTDOWN
           * 3 线程池状态为SHUTDOWN,但firstTask不为空
           */
        if (t == null ||
            (rs >= SHUTDOWN &&
             ! (rs == SHUTDOWN &&
                firstTask == null))) {
            decrementWorkerCount();
            tryTerminate();
            return false;
        }

        workers.add(w);

        int s = workers.size();
        if (s > largestPoolSize)
            largestPoolSize = s;
    } finally {
        mainLock.unlock();
    }

    t.start();
    // 在线程池变为stop期间,线程可能已经被添加到workers,但还未被启动(该现象不太可能发生,这可能
    // 导致罕见的丢失中断,因为Thread.interrupt不能保证对非启动状态的线程有效
    if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
        t.interrupt();

    return true;
}

addWorker 首先会检查当前线程池的状态和给定的边界是否可以创建一个新的 worker,在此期间会对 workers 的数量进行适当调整;如果满足条件,将创建一个新的 worker 并启动,以参数中的 firstTask 作为 worker 的第一个任务。

任务的执行
ThreadPoolExecutor 类中,最核心的任务提交方法是 execute()方法,使用 submit() 提交任务最终调用的也是 execute() 方法,先看看流程图:

7511f3b49b2f407fa6a8226d8f45afcf_image.png

源码如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * 分3步处理:
     *
     * 1. 当前工作线程数 < corePoolSize,直接创建新的工作线程执行任务(调用addWorker)
     *
     * 2. 当前工作线程数 >=corePoolSize,线程池状态为RUNNING,且任务加入工作队列成功,
     * 再次检查线程池当前状态是否处于RUNNING,如果不是,从队列移除任务,移除成功则拒绝任务
     * 如果为RUNNING,判断当前工作线程数量是否为 0,如果为 0,就增加一个工作线程
     *
     * 3. 线程池状态不是RUNNING或任务入队失败,尝试开启普通线程执行任务,失败就拒绝该任务
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

从上面的分析可以总结出线程池运行任务的四个阶段:

poolSize < corePoolSize 且队列为空,此时会新建线程来处理提交的任务
poolSize == corePoolSize,提交的任务进入工作队列,工作线程从队列中获取任务执行,此时队列不为空且未满。
poolSize == corePoolSize,并且工作队列已满,此时也会新建线程来处理提交的任务,但是 poolSize < maxPoolSize
poolSize == maxPoolSize,并且队列已满,此时会触发拒绝策略。
当再次检查线程池当前状态不是 RUNNING 时,不仅从任务队列移除任务,同时会尝试终止线程池。

public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}

尝试终止线程池
tryTerminate 在很多地方都有调用,那么这个方法作用是什么呢?

场景分析:当调用线程池的 shutDown()方法后,会调用 interruptIdleWorkers 尝试中断工作线程,而工作线程只有在 getTask() 期间才会有被中断的机会。假设 interruptIdleWorkers 成功设置的多个线程的中断状态,若此时任务队列非空,由于线程池状态为 SHUTDOWN,getTask()将会从任务队列成功获取到任务;在 runWorker 执行任务时,线程池状态为 SHUTDOWN(小于 STOP),那么当前工作线程的中断状态将会被清除。当中断状态被清除后,从工作队列取任务将不会响应中断,直到工作队列为空,此时之前被成功设置中断状态的工作线程都可能会阻塞在 workQueue.take(),由于 SHUTDOWN 状态的线程池不会接收新任务,工作线程将一直阻塞下去,永不会退出。怎么办呢?tryTerminate 这时将派上用场,Doug Lea 大神巧妙的在所有可能导致线程池产终止的地方安插了 tryTerminated() 尝试线程池终止的逻辑,由 tryTerminated 来终止空闲的线程,直到无空闲线程,然后终止线程池。

下面看看 tryTerminated 的具体实现:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //由之前的状态转换可知,RUNNING不能直接跳到TERMINATED,因此返回
        //状态已经为TERMINATED,无需再调用terminated(),返回
        //状态为SHUTDOWN且队列不空,队列中的任务仍需要处理,不能调用terminated(),返回
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // 符合终止条件
            interruptIdleWorkers(ONLY_ONE);//一次仅中断一个线程
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));//将状态设为TERMINATED,且设置workerCount为0
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
//中断空闲线程
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

由源码可知,以下情况将线程池变为 TERMINATED 终止状态:

1 线程池状态为 SHUTDOWN,并且线程池中工作线程数量为 0,工作队列为空

2 线程池状态为 STOP,并且线程池中工作线程数量为 0

关闭线程池
可使用 shutdown()和 shutdownNow() 关闭线程池,但是效果和实现方法各不相同;同时也可调用 awaitTermination(long timeout, TimeUnit unit) 等待线程池终止。理解关闭线程池逻辑可能需要参照文章https://my.oschina.net/7001/blog/889770 中介绍的 runWorker 和 getTask() 逻辑。

shutdown()
使用 shutdown 关闭线程池时,之前提交的任务都会被执行完成,但是拒绝接收新任务。shutdown 不会等待之前提交的任务执行结束,该情况下可以使用 awaitTermination()。源码如下:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();//权限校验
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();//中断所有空闲线程
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
//更新线程池状态为SHUTDOWN,使用自旋保证完成
private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

本文不止一次提到过空闲线程,那么线程池中什么才是空闲线程?

空闲 worker:正在从 workQueue 阻塞队列中获取任务的 worker;

运行中 worker:正在使用 runWorker 执行任务的 worker。

阻塞在 getTask()获取任务的 worker 在被中断后,会抛出 InterruptedException,不再阻塞获取任务。继续进入自旋操作,此时线程池已经是 shutdown 状态,且 workQueue.isEmpty(),getTask() 返回 null,进入 worker 退出逻辑。

shutdownNow()
shutdownNow 表示立即关闭线程池,它会尝试停止所有活动的正在执行的任务,并停止处理任务队列中的任务,该方法将返回正在等待被执行的任务列表。shutdownNow 尽力尝试停止运行中的任务,没有任何保证。取消任务是通过 Thread.interrupt()发出中断信号来实现的。由 runWorker 源码可知,已经进入加锁区的任务并不会响应中断,因此只有工作线程执行完当前任务,进入 getTask() 才会感知线程池状态为 STOP,开始处理退出逻辑。

shutdownNow 对所有线程立即发出中断信号是为了阻止从任务队列取任务,让这些线程尽快进入退出逻辑;而那些正在执行 runWorker 加锁区中代码的线程,将在执行完当前任务后立即检测到线程池的状态,进入退出逻辑。源码如下:

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);//将线程池状态修改为STOP
        interruptWorkers();//中断所有工作线程
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
/**
  * 使用drainTo方法一次性将工作队列中的任务加入taskList ,并从工作队列移除;
  * 如果队列是DelayQueue或任何其他类型的队列,poll或drainTo可能无法删除某些元素,则会逐个删除它们
  */
private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    List<Runnable> taskList = new ArrayList<Runnable>();
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

下面看看 interruptWorkers 的具体实现:

 private void interruptWorkers() {
	final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            try {
                w.thread.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    } finally {
        mainLock.unlock();
    }
}

比较 interruptIdleWorkers 源码可知,interruptWorkers 不需要等待持有 Worker 上的锁才中断线程,调用 interruptWorkers 会立即中断所有工作线程,interruptIdleWorkers 则需要首先持有 Worker 上的锁才能进行中断。interruptWorkers 目前只用在 shutdownNow 中。

awaitTermination
awaitTermination()会循环线程池是否 terminated 或是否已经超过超时时间,每次判断不满足条件则使用 Condition 对象 termination 阻塞指定时间。termination.awaitNanos() 是通过 LockSupport.parkNanos(this, nanosTimeout) 实现的阻塞等待。调用 shutdown 之后,在以下情况发生之前,awaitTermination() 都会被阻塞:

1 所有任务正常完成,线程池正常变为 TERMINATED

2 任务仍未完成,到达超时时间

3 当前线程被中断

阻塞等待过程中发生以下具体情况会解除阻塞:

1 任务正常完成,线程池正常变为 TERMINATED,此时会调用 termination.signalAll() 唤醒所有阻塞等待的线程

2 到达超时时间,nanos <= 0 条件满足,返回 false

3 当前线程被中断,termination.awaitNanos() 将从阻塞中唤醒,并向上抛出 InterruptException 异常。

源码如下:

 public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);//将超时时限分片
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))//状态>=TERMINATED
                return true;
            if (nanos <= 0)//达到超时时间
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

常用的几种线程池
通常情况下,我们使用 Executors 的静态工厂方法来创建线程池,下面看创建几种常用线程池的方法:

/** newFixedThreadPool将创建一个固定长度的线程池,每当提交一个任务就创建一个线程,
  * 直到达到线程池的最大数量,这时线程池的规模不再变化
  * (如果某个线程由于发生未预期的Exception而终止,线程池将补充一个新线程)
  */
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
/** newCachedThreadPool将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求,
  * 那么将回收空闲的线程,当需求增加时,则可以创建另外一个线程,线程池规模不存在任何限制
  */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
/** newSingleThreadExecutor是一个单线程的Executor,只创建一个线程来执行任务,
  * 如果线程异常结束,会创建新的线程来替代。
  * newSingleThreadExecutor能确保依照任务在队列中的顺序来串行执行(如:FIFO,LIFO,优先级)
  */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
/** 
  * newScheduledThreadPool创建一个固定长度的线程池,而且以延时或定时的方式来执行任务
  */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}