0%

并发源码-线程池

线程池

JDK为我们提供了4种构造线程池的方式,分别是

  • newFixedThreadPool
    • 固定数量的线程
  • newSingleThreadExecutor
    • 只有一个线程的线程池
  • newCachedThreadPool
    • 线程数量不固定,不停的创建线程,用完了以后又回收
  • newScheduledThreadPool
    • 提交任务后,在指定的时间去执行

我们也可以构造自己的线程池,来实现一些我们想要的一些功能。

newFixedThreadPool

来看看这个fixed线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 20; i++) {

executorService.execute(() -> {
System.out.println("线程池异步执行任务" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

executorService.shutdown();
}
}
1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

顺着这个线程池,看一下ThreadPoolExecutor的构造函数,corePoolSize和maximumPoolSize都是指定的值,然后队列是用的无界队列。

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
  • corePoolSize:线程池里应该有多少个线程

  • maximumPoolSize:如果线程池里的线程用完了,并且等待队列满了,可能会增加一些线程,但是最多把线程数量增加到maximumPoolSize指定的数量

  • keepAliveTime + TimeUnit:如果线程数量超出了corePoolSize的话,超出corePoolSize指定数量的线程,就会在空闲keepAliveTime毫秒之后,就会自动被释放掉

  • workQueue:线程池的等待队列

  • threadFactory:在线程池里创建线程的时候,指定一个线程工厂,按照自己的方式创建线程出来

  • RejectedExecutionHandler:如果线程池里的线程都在执行任务,然后等待队列满了,此时增加额外线程也达到了maximumPoolSize指定的数量了,这个时候实在无法承载更多的任务了,此时就会执行这个

在源码中,有一个非常关键的类变量

1
2
3
4
5
6
7
8
// 32位的Integer,前3位表示了线程池的状态,后29位表示了线程的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Packing and unpacking ctl
// 前三位取状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 后29位取线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

只通过一个字段,和二进制的操作,就实现了线程池状态和数量的维护。

execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 当前线程数量小于corePoolSize,直接创建线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 这里corePoolSize就满了,就需要排队了,根据队列的类型来决定是否入队成功
if (isRunning(c) && workQueue.offer(command)) {
// 入队成功了,再次检查下线程池是否还在运行中,如果没有在运行了,从队列里移除,直接走一下reject
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果队列也满了,那就再次创建额外的线程,达到maximumPoolSize的数量,第二个参数的true和false就是表示创建的是core线程还是非core线程
else if (!addWorker(command, false))
reject(command);
}

然后就是创建线程并执行的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 这里,就要创建线程了,利用cas将线程数量+1,这里成功了以后,才会走后面的逻辑,去真正的创建线程
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Worker是AQS的子类
w = new Worker(firstTask);
// 这个t,就是新创建的线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 创建线程的过程,是加锁的,所有的线程都维护在workers这个set里
// 只有一个线程能修改这个数据
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动刚才创建的线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

线程启动了,那么就该执行我们提交到线程池的任务了,这个逻辑肯定就是在Worker里的run方法去执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 这个getTask,就是从队列里获取
while (task != null || (task = getTask()) != null) {
// Worker继承自AQS,所以自带加锁能力
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行提交的任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
// 因为有锁,所以线程安全
w.completedTasks++;
// 解锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

从队列获取任务执行

前面分析的是,直接创建线程执行任务,或者corePoolSize满了以后,将任务入队。

在Worker的runWorker方法里,调用了getTask()方法,这个getTask(),就是从队列里获取任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// 这是很关键的一个参数,allowCoreThreadTimeOut,表示是否允许线程在指定时间后销毁
// 然后查看当前线程数量是不是大于corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 如果允许超时的话,就调用poll,并等待一段时间,否则就调用take,一直阻塞
// 在fixed线程中,是走take方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

线程池没有任务执行的时候在干什么

在getTask()方法中

答案就是在fixed线程中,因为timed是false,所以workQueue.take();肯定是会阻塞的,所以队列里没有任务的时候,coreSizePool的线程就会阻塞等待中。

如果是别的线程池,比如Cached线程池,那么timed可能是true,这个要根据当前线程池是否大于corePoolSize来决定,如果当前线程池大于corePoolSize,那么timed就是true,然后就会等待一定的时间,如果没有任务,那么线程执行也就结束了,线程也就被销毁了

线程池关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 拿到锁以后,就不能会再有新线程被创建了
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池的state
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// tryLock,会对AQS本身的state进行设置
// 尝试将Worker的state = 1,如果成功了,就说明这个Worker当前的state = 0
// 说明这个Worker当前是空闲的状态,没有执行任何一个任务,此时就可以中断掉这个Worker,Worker内部的线程就会退出
// 循环加锁并且打断,如果加锁失败了,就放弃了
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}

不停往fixed线程池提交任务会导致内存溢出

因为是无界队列,所以如果堆积了上百万的数据,是有可能会导致内存溢出的。

newCachedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CachedThreadPoolDemo {
public static void main(String[] args) {
// 不用设置线程数量,无论提交多少个任务,都会开辟创建一个新的线程来执行
// 非常适合段时间内突然涌入大量任务的场景
// 大量的线程如果只会空闲了,达到一定时间之后就会释放掉
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {

executorService.execute(() -> {
System.out.println("线程池异步执行任务" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

executorService.shutdown();
}
}
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • corePoolSize:0
  • maximumPoolSize:Integer.MAX_VALUE
  • keepAliveTime:60s
  • workQueue:SynchronousQueue

corePoolSize是0,根据execute的源码,当前线程数量是不小于corePoolSize的,所以他会直接入队

if (workerCountOf(c) < corePoolSize)

执行任务创建非core线程

SynchronousQueue这个队列,offer是不会入队成功的,除非有另外的线程在take。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// corePoolSize == 0
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// workQueue.offer(command) 直接入队失败
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 添加非core线程
else if (!addWorker(command, false))
reject(command);

从队列获取任务

回过来看getTask()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 int wc = workerCountOf(c);
// allowCoreThreadTimeOut:true
// corePoolSize: 0
// 所以timed肯定是true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// timed是true,这里会调用poll方法,如果在等待过程中,有任务进来,会取到任务并执行,如果超时后失败,r==null,紧接着线程被销毁
// 这个时间,是60秒
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;

不停往cached线程池提交任务时会导致CPU负载过高

在系统高峰期大量的线程被创建出来,然后导致机器的CPU负载过高,甚至线程太多导致内存溢出或者导致CPU负载飙升。

触发拒绝任务提交

第一种fixed,是基于有限固定数量的线程处理源源不断涌入的任务,无界队列,所以任务可以无限制的涌入和排队。

第二种cached,是在需要的时候无限制的创建新的线程来处理新的任务,提交的任务几乎是不会排队的,永远能最快速度的得到执行,入队的时候先看看有没有人空闲在poll,如果有立马执行

4核8G,虚拟机,一般来说,线程池开启线程来异步处理任务,200以内,100~200的时候,线程机器的CPU负载就很高了,内存队列排队个几十万个任务,也还好,内存也没撑爆,但是如果你的线程一旦达到四五百个,线上机器的CPU负载过高的报警

newSingleThreadExecutor

  • corePoolSize: 1

  • maximumPoolSize: 1

  • keepAliveTime: 0

  • workQueue:LinkedBlockingQueue无界队列

只有1个线程,不停的处理提交到无界队列的任务

newScheduledThreadPool

  • corePoolSize: 10
  • maximumPoolSize: Integer.MAX_VALUE
  • keepAliveTime: 0
  • workQueue: DelayedWorkQueue

ScheduledThreadPoolExecutor,继承自ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 核心通过延时队列来控制任务的出列,如果符合延时时间的话,就出队执行
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}