线程池 JDK为我们提供了4种构造线程池的方式,分别是
newFixedThreadPool
newSingleThreadExecutor
newCachedThreadPool
newScheduledThreadPool
我们也可以构造自己的线程池,来实现一些我们想要的一些功能。
newFixedThreadPool 来看看这个fixed线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class FixedThreadPoolDemo { public static void main (String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3 ); for (int i = 0 ; i < 20 ; i++) { executorService.execute(() -> { System.out.println("线程池异步执行任务" + Thread.currentThread().getName()); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); } }
1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
顺着这个线程池,看一下ThreadPoolExecutor的构造函数,corePoolSize和maximumPoolSize都是指定的值,然后队列是用的无界队列。
1 2 3 4 5 6 7 8 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
corePoolSize:线程池里应该有多少个线程
maximumPoolSize:如果线程池里的线程用完了,并且等待队列满了,可能会增加一些线程,但是最多把线程数量增加到maximumPoolSize指定的数量
keepAliveTime + TimeUnit:如果线程数量超出了corePoolSize的话,超出corePoolSize指定数量的线程,就会在空闲keepAliveTime毫秒之后,就会自动被释放掉
workQueue:线程池的等待队列
threadFactory:在线程池里创建线程的时候,指定一个线程工厂,按照自己的方式创建线程出来
RejectedExecutionHandler:如果线程池里的线程都在执行任务,然后等待队列满了,此时增加额外线程也达到了maximumPoolSize指定的数量了,这个时候实在无法承载更多的任务了,此时就会执行这个
在源码中,有一个非常关键的类变量
1 2 3 4 5 6 7 8 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 ));private static int runStateOf (int c) { return c & ~CAPACITY; }private static int workerCountOf (int c) { return c & CAPACITY; }private static int ctlOf (int rs, int wc) { return rs | wc; }
只通过一个字段,和二进制的操作,就实现了线程池状态和数量的维护。
execute 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
然后就是创建线程并执行的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
线程启动了,那么就该执行我们提交到线程池的任务了,这个逻辑肯定就是在Worker里的run方法去执行的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public void run () { runWorker(this ); } final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
从队列获取任务执行 前面分析的是,直接创建线程执行任务,或者corePoolSize满了以后,将任务入队。
在Worker的runWorker方法里,调用了getTask()方法,这个getTask(),就是从队列里获取任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
线程池没有任务执行的时候在干什么 在getTask()方法中
答案就是在fixed线程中,因为timed是false,所以workQueue.take();肯定是会阻塞的,所以队列里没有任务的时候,coreSizePool的线程就会阻塞等待中。
如果是别的线程池,比如Cached线程池,那么timed可能是true,这个要根据当前线程池是否大于corePoolSize来决定,如果当前线程池大于corePoolSize,那么timed就是true,然后就会等待一定的时间,如果没有任务,那么线程执行也就结束了,线程也就被销毁了
线程池关闭 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); } private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); }
不停往fixed线程池提交任务会导致内存溢出 因为是无界队列,所以如果堆积了上百万的数据,是有可能会导致内存溢出的。
newCachedThreadPool 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class CachedThreadPoolDemo { public static void main (String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0 ; i < 20 ; i++) { executorService.execute(() -> { System.out.println("线程池异步执行任务" + Thread.currentThread().getName()); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); } }
1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
corePoolSize:0
maximumPoolSize:Integer.MAX_VALUE
keepAliveTime:60s
workQueue:SynchronousQueue
corePoolSize是0,根据execute的源码,当前线程数量是不小于corePoolSize的,所以他会直接入队
if (workerCountOf(c) < corePoolSize)
执行任务创建非core线程 SynchronousQueue这个队列,offer是不会入队成功的,除非有另外的线程在take。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command);
从队列获取任务 回过来看getTask()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ;
不停往cached线程池提交任务时会导致CPU负载过高 在系统高峰期大量的线程被创建出来,然后导致机器的CPU负载过高,甚至线程太多导致内存溢出或者导致CPU负载飙升。
触发拒绝任务提交 第一种fixed,是基于有限固定数量的线程处理源源不断涌入的任务,无界队列,所以任务可以无限制的涌入和排队。
第二种cached,是在需要的时候无限制的创建新的线程来处理新的任务,提交的任务几乎是不会排队的,永远能最快速度的得到执行,入队的时候先看看有没有人空闲在poll,如果有立马执行
4核8G,虚拟机,一般来说,线程池开启线程来异步处理任务,200以内,100~200的时候,线程机器的CPU负载就很高了,内存队列排队个几十万个任务,也还好,内存也没撑爆,但是如果你的线程一旦达到四五百个,线上机器的CPU负载过高的报警
newSingleThreadExecutor
只有1个线程,不停的处理提交到无界队列的任务
newScheduledThreadPool
corePoolSize: 10
maximumPoolSize: Integer.MAX_VALUE
keepAliveTime: 0
workQueue: DelayedWorkQueue
ScheduledThreadPoolExecutor,继承自ThreadPoolExecutor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0 ]; if (first == null ) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0 ) return finishPoll(first); first = null ; if (leader != null ) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null ; } } } } } finally { if (leader == null && queue[0 ] != null ) available.signal(); lock.unlock(); } }