Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

ThreadPoolExecutor源码分析

GrimMjx 2019-01-29 13:29:00 阅读数:156 评论数:0 点赞数:0 收藏数:0

前言

在熟练掌握如何使用线程池之后,我们来对ThreadPoolExecutor进行源码分析。希望大家保持对源码的阅读热情,不仅要知其然,也要知其所以然。阅读源码比较苦涩,请养成反复研究琢磨为什么这么写的精神,多推敲。冲鸭!

其实有时候想不通的时候可以看一下英文注释,还是作者解释的精准

 

1 ThreadPoolExecutor类图

 

2 ThreadPoolExecutor重要变量

2.1 ctl

这个变量是整个类的核心,AtomicInteger保证了原子性,这个变量存储了2个内容

  • 线程池的状态
  • 所有工作线程的数量
// int是4个字节,有32位,这里的ctl前3位表示线程池的状态,后29位标识工作线程的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer.SIZE - 3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 容量 000 11111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 运行中状态 111 00000000000000000000000000000 (-536870912) 括号内为十进制的
private static final int RUNNING = -1 << COUNT_BITS;
// 关闭状态 000 00000000000000000000000000000 (0)
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 停止状态 001 00000000000000000000000000000 (536870912)
private static final int STOP = 1 << COUNT_BITS;
// 整理状态 010 00000000000000000000000000000 (1073741824)
private static final int TIDYING = 2 << COUNT_BITS;
// 终结状态 011 00000000000000000000000000000 (1610612736)
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 先非然后位与运算符获取线程池运行的状态,也就是前3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 位与运算符获取工作线程数量,也就是后29位
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池的状态

  • RUNNING:接收任务,处理workQueue队列里的任务
  • SHUTDOWN:不再接收新的任务,但是处理workQueue队列里的任务
  • STOP:拒绝新任务并且抛弃队列里的任务
  • TIDYING:将要调用terminated方法
  • TERMINATED:终结状态

2.2 Woker静态内部类

Worker实现了Runnable接口,说明可以当做一个可执行的任务。Woker也继承了AbstractQueuedSynchronizer,说明可以实现锁的功能,他是一个简单的不可重入的互斥锁,工作线程执行任务的时候,会先加锁,如果想要中断工作线程,需要先获取锁,否则无法中断,工作线程执行完任务才会释放锁,然后接着从workQueue获取任务继续执行。Worker的主要作用是执行队列的任务,并管理工作线程和统计一些东西。

/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// 工作线程
final Thread thread;
/** Initial task to run. Possibly null. */
// 第一个任务
 Runnable firstTask;
/** Per-thread task counter */
// 该工作线程已经完成任务的数量
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 直到runWorker方法禁止被中断
setState(-1);
this.firstTask = firstTask;
// 从线程工厂获取线程,并把第一个任务给worker
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

 

3 ThreadPoolExecutor重要函数

3.1 execute(Runnable command)

/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 获取线程池状态和线程数
int c = ctl.get();
// 工作线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 添加工作线程
if (addWorker(command, true))
return;
// 添加工作线程失败则重新获取线程池状态和线程数
c = ctl.get();
}
// 线程池正在运行且任务可以将任务放进队列里
if (isRunning(c) && workQueue.offer(command)) {
// 重新检查 - 获取线程池状态和线程数
int recheck = ctl.get();
// 这里重新检查是为了以下2种情况
// 1.当offer方法执行之后,线程池关闭了,回滚之前放入队列的操作并拒绝任务
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.线程池里没有可用的消费线程,比如现在核心线程数就1个,前一个任务抛异常了
// 那么现在就没有可用的消费线程了,所以要判断还有没有Worker,这步很关键
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 新增线程失败则拒绝任务
else if (!addWorker(command, false))
reject(command);
}

3.2 addWorker(Runnable firstTask, boolean core)

/*
* Methods for creating, running and cleaning up after workers
*/
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 外循环
for (;;) {
// 获取线程池状态和线程数
int c = ctl.get();
// 线程池状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 这里我做了一个小调整,看着舒服点,以下几种情况会返回false
// 1.线程池状态为STOP,TIDYING,TERMINATED
// 2.线程池状态为SHUTDOWN且工作线程的firstTask不为空
// 3.线程池状态为SHUTDOWN且队列为空
if (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))
return false;
// 内循环
for (;;) {
// 获取工作线程数
int wc = workerCountOf(c);
// 如果工作线程大于容量或者工作线程大于核心线程数(或者最大线程数)返回false
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 添加工作线程+1
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新获取线程池状态和线程数
c = ctl.get(); // Re-read ctl
// 如果线程池状态变了,那么重新走外循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
// 如果CAS操作失败,那么重新走内循环
 }
}
// 线程是否开始工作
boolean workerStarted = false;
// 线程是否添加到工作线程集合
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 利用显式锁加锁添加Worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 如果线程池状态是RUNNING或者是SHUTDOWN&&第一个任务为空
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
// 检查这个线程是否处于活动状态 - RUNNABLE或者RUNNING
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加到工作线程集合
 workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果添加到工作线程集合则开始工作
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程没有开始工作,那么工作线程数量-1
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

3.3 runWorker(Worker w)

/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
// 此处获取的wt就是Worker里的thread 
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 这里为什么要先unlock一下呢?到这一行代码为止,我们没有进行任何的任务处理
// Worker的构造函数中,setState(-1);这一行代码抑制了线程中断,所以这里需要unlock从而允许中断
w.unlock(); // allow interrupts
// 是否是异常终止的标识,默认为true。有2中情况为true
// 1.执行任务抛出了异常
// 2.worker被中断
boolean completedAbruptly = true;
try {
// 获取任务,如果getTask()方法返回null,那么随之worker也要-1,之后有getTask()方法分析
// 只有在等待从workQueue队列里获取任务的时候才能中断。
// 第一次执行传入的任务,之后从workQueue队列里获取任务,如果队列为空则等待keepAliveTime这么久
while (task != null || (task = getTask()) != null) {
// 加锁的目的在于防止在执行任务的时候,中断当前worker
 w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 这个方法比较重要,当线程池正在关闭,确保worker被中断
// 有2次runStateAtLeast(ctl.get(), STOP)方法调用是因为double-check
// 第2次检查Thread.interrupted(),该方法会直接擦除线程的interrupt标识
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
// 执行任务之前的操作,如统计日志等,子类自己实现
 beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
// 将异常包装成Error抛出
thrown = x; throw new Error(x);
} finally {
// 执行任务之前的操作,如统计日志等,子类自己实现
 afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 解锁,一次任务的执行结束
 w.unlock();
}
}
completedAbruptly = false;
} finally {
// 结束worker的清理工作
 processWorkerExit(w, completedAbruptly);
}
}

3.4 getTask()

/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 当线程池状态是STOP或者SHUTDOWN并且workQueue队列是空的,返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// timed用来判断该工作线程是否有超时控制?
// allowCoreThreadTimeOut参数是是否允许核心线程也有keepAliveTime这么一个属性
// 核心线程默认是没有超时限制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 条件1:如果工作线程大于最大线程数或者超时了
// 条件2:如果工作线程大于1或者workQueue队列为空
// 满足以上2个条件则返回null
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 一个是阻塞方法,一个是非阻塞方法,关键还是看timed这个变量,见上
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

 3.5 shutdown

线程池将不会再接收新的任务,将先前放在队列中的任务执行完成。

/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
// 获取显式锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查shutdown权限
 checkShutdownAccess();
// 将线程池状态改为SHUTDOWN
 advanceRunState(SHUTDOWN);
// 中断空闲worker
// 如果该线程正在工作,则不中断
 interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 保证workQueue里的剩余任务可以执行完
 tryTerminate();
}

 

 

参考资料:

《Java concurrence in practice》

https://www.cnblogs.com/leesf456/p/5585627.html

版权声明
本文为[GrimMjx]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/GrimMjx/p/10284893.html

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;

支付宝红包,每日可领