ThreadLocal 1 2 3 4 5 6 7 public class ThreadLocalDemo { public static void main (String[] args) { ThreadLocal<Long> requestId = new ThreadLocal<Long>(); requestId.set(1L ); System.out.println(requestId.get()); } }
ThreadLocal在并发编程里,非常常用,每个线程执行的时候,都保存一个变量的副本,每个线程自己用自己的,互不影响,经常用于保存一些上下文信息,或者请求的id之类的。
在Thread里,有个变量 ThreadLocal.ThreadLocalMap threadLocals
,是一个map,这个就可以用于保存,每一个线程独有的一份数据。可以保存多个ThreadLocal的变量副本。
这个map的key就是ThreadLocal,value就是对应的变量副本。
内存泄露问题 有时候面试可能会问到,说ThreadLocal有内存泄漏的问题。
ThreadLocal的内部是ThreadLocalMap。ThreadLocalMap内部是由一个Entry数组组成。Entry类的构造函数为 Entry(弱引用的ThreadLocal对象, Object value对象)。因为Entry的key是一个弱引用的ThreadLocal对象,所以在 垃圾回收 之前,将会清除此Entry对象的key。那么, ThreadLocalMap 中就会出现 key 为 null 的 Entry,就没有办法访问这些 key 为 null 的 Entry 的 value。这些 value 被Entry对象引用,所以value所占内存不会被释放。
所以一定要记得调用remove方法,及时清除不用的对象,并且也会清理key为null的value。
CountDownLatch 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class CountDownLatchDemo { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2 ); new Thread(() -> { try { Thread.sleep(1000 ); System.out.println("线程1开始执行,休眠2秒" ); Thread.sleep(1000 ); System.out.println("线程1准备执行countDown操作" ); countDownLatch.countDown(); System.out.println("线程1完成countDown操作" ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Thread.sleep(1000 ); System.out.println("线程2开始执行,休眠2秒" ); Thread.sleep(1000 ); System.out.println("线程2准备执行countDown操作" ); countDownLatch.countDown(); System.out.println("线程2完成countDown操作" ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); System.out.println("main线程准备执行countDownLatch的await操作,将同步阻塞等待" ); countDownLatch.await(); System.out.println("所有线程完成countDown操作,阻塞等待结束" ); } }
CountDownLatch还是基于AQS来实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
state != 0
将main线程封装为一个node,加入AQS的等待队列
调用LockSupport.park()操作,挂起main线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public void countDown () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
countDown,就是把state -1,直到减为0的时候。就调用doReleaseShared(),从队列里将阻塞的Node唤醒。
await(),触发了一个线程入队阻塞等待
countDown(),如果state == 0,唤醒队列里等待的所有的线程
所有线程被唤醒,发现state == 0,就从await()方法里退出
CyclicBarrier 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class CyclicBarrierDemo { public static void main (String[] args) { CyclicBarrier barrier = new CyclicBarrier(3 , new Runnable() { @Override public void run () { System.out.println("所有线程都完成了自己的任务,现在可以合并结果" ); } }); new Thread(() -> { try { System.out.println("线程1执行一部分自己工作" ); barrier.await(); System.out.println("最终结果合并完成,线程1可以退出" ); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { System.out.println("线程2执行一部分自己工作" ); barrier.await(); System.out.println("最终结果合并完成,线程2可以退出" ); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { System.out.println("线程2执行一部分自己工作" ); barrier.await(); System.out.println("最终结果合并完成,线程3可以退出" ); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } }
构造方法源码:
1 2 3 4 5 6 public CyclicBarrier (int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException(); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; }
核心逻辑都在await方法里。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 private int dowait (boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L ) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } private void nextGeneration () { trip.signalAll(); count = parties; generation = new Generation(); }
这个源码看下来,还是比较简单,就基于lock就实现了和condition就实现了。
线程1:
ReentrantLock,加锁,保证多线程并发安全
count = 3,–count = 2
Condition.await(),底层,其实是释放了当前的lock锁,触发了把当前线程加入condition等待队列里,挂起当前线程
线程2:
ReentrantLock,加锁,保证多线程并发安全
count = 2,–count = 1
Condition.await(),底层,其实是释放了当前的lock锁,触发了把当前线程加入condition等待队列里,挂起当前线程
线程3:
ReentrantLock,加锁,保证多线程并发安全
count = 1,–count = 0
Condition.await(),底层,其实是释放了当前的lock锁,触发了把当前线程加入condition等待队列里,挂起当前线程。然后执行构造方法传入的action,并唤醒所有阻塞的线程,也就是把condition队列里的线程全部唤醒。
Semaphore 这个是等待指定的线程完成任务,触发退出条件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class SemaphoreDemo { public static void main (String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(0 ); new Thread(() -> { try { Thread.sleep(1000 ); System.out.println("线程1执行一个计算任务" ); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Thread.sleep(1000 ); System.out.println("线程2执行一个计算任务" ); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); semaphore.acquire(1 ); System.out.println("等待一个线程完成任务即可" ); } }
只要有一个线程完成,阻塞的acquire代码就会得到继续执行。
new Semaphore(0);
这行代码,在就是给state传了了一个0。
然后acquire,是获取一个读锁,最后是调用到了acquireSharedInterruptibly方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
现在,就是相当于有一个线程,在等待获取锁。然后在线程release的时候,会把state +1,然后唤醒阻塞的线程。
因为有死循环,被唤醒的线程,再次获取锁,此时
int remaining = available - acquires;
-> 1 - 1 = 0
,拿到了锁。程序继续执行。
ConcurrentHashMap HashMap在并发的情况下,会有bug,比如说死循环,然后导致数据丢失。所以在并发情况下,一定要去使用ConcurrentHashMap来编程。
那这个ConcurrentHashMap的解决方案,实际上就是分段加锁,HashMap底层源码不是数组么,如果每次操作都加锁的话,肯定性能不好,所以ConcurrentHashMap就提供了分段加锁的方案,把一份数据拆分为多个segment,对每个段设置一把小锁,put操作只对某个段的segment进行加锁。然后其他线程操作其他的数据,是没有锁竞争的,大大的提高了安全性和并发的效率。
put 分析一下最核心的put方法,看看ConcurrentHashMap的源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; } static final int spread (int h) { return (h ^ (h >>> 16 )) & HASH_BITS; }
在treeifyBin方法中,还包含了对是否需要扩容的判断,扩容后,数组的size必须是原来的2的倍数,这样hash运算才能能定位到对应的位置。
读 同时,get方法和size方法是不需要加锁的,因为都是通过valotile的方式去读取的值,有了可见性的保证,线程在读取数据的时候,load屏障,因为有MESI机制的存在,会先嗅探一下无效队列,如果某个数据被其他线程修改了,此时马上过期掉本地高速缓存里的缓存数据,invalid(I),然后再读的时候,就需要发送read消息到总线,从其他线程修改修改这个值的线程的高速缓存里,必须这个加载到最新的值,不需要加锁。
CopyOnWriteArrayList ArrayList是线程不安全的,取而代之是的CopyOnWriteArrayList,写时复制的ArrayList。
1 2 3 4 5 6 7 8 9 10 11 public class CopyOnWriteArrayListDemo { public static void main (String[] args) { CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(); list.add("zhangsan" ); list.add("lisi" ); list.set(0 , "123" ); list.get(0 ); list.remove(0 ); list.iterator(); } }
构造方法:
1 2 3 4 5 6 7 8 9 10 final transient ReentrantLock lock = new ReentrantLock();private transient volatile Object[] array;public CopyOnWriteArrayList () { setArray(new Object[0 ]); }
CopyOnWriteArrayList,底层维护了一个volatile的数组变量,保证了多线程读写的可见性,只要有一个线程修改了这个数组,其他线程马上能感知到变化。
每一个数组,还维护了一把ReentrantLock锁,用独占锁来保证,在修改数组里的数组的时候,只有一个线程获取到锁进行操作。所以CopyOnWriteArrayList的并发写性能不好,只能有一个线程可以进行写操作。
add 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public boolean add (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } finally { lock.unlock(); } }
set 接下来看修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public E set (int index, E element) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); E oldValue = get(elements, index); if (oldValue != element) { int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len); newElements[index] = element; setArray(newElements); } else { setArray(elements); } return oldValue; } finally { lock.unlock(); } }
remove 删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public E remove (int index) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; E oldValue = get(elements, index); int numMoved = len - index - 1 ; if (numMoved == 0 ) setArray(Arrays.copyOf(elements, len - 1 )); else { Object[] newElements = new Object[len - 1 ]; System.arraycopy(elements, 0 , newElements, 0 , index); System.arraycopy(elements, index + 1 , newElements, index, numMoved); setArray(newElements); } return oldValue; } finally { lock.unlock(); } }
get 读操作的源码很简单,这里没有加锁,因为读的就是volatile的变量,增删改,有独占锁来保证只有一个线程会修改,并且每次都是复制一个新的数组,修改完毕后再设置到数组变量中,同时因为数组变量是volatile修饰的,所以读操作的线程每次都能及时的读到变化,或者修改操作还没有来得及完成写入变量,我读到的也是老数组的值,不存在锁竞争的操作,性能很高。
1 2 3 public E get (int index) { return get(getArray(), index); }
Iterator 迭代器的方法较多,就不全部复制了,他的理念和读操作是类似的,在获取迭代器的时候,将老数组的值作为一个snapshot存下来,进行循环迭代,所以这个snapshot,是不允许修改或者删除的,只能遍历。
1 2 3 4 5 6 7 8 9 10 11 12 13 public Iterator<E> iterator () { return new COWIterator<E>(getArray(), 0 ); } static final class COWIterator <E > implements ListIterator <E > { private final Object[] snapshot; private int cursor; private COWIterator (Object[] elements, int initialCursor) { cursor = initialCursor; snapshot = elements; }
总结 它的核心理念是弱一致性提升读并发,多个线程并发读写这个list,中间肯定是有复制后的数组被修改好了,但是还没有来得及写入到array变量中,这个时候读到的肯定就是老数组里的数据,在这个过程中,多个线程读到的数据可能是不一样的,但是数据的最终是一致的。
优点:读和写不互斥的,写和写互斥,同一时间就一个线程可以写,但是写的同时可以允许其他所有人来读;读和读也是并发的;比读写锁机制还要好;他也不涉及到Unsafe.getObjectVolatile
使用场景:多线程并发安全性,可以选用他;尽可能是读多写少 的场景,大量的读是不被影响的;可能有一个线程刚刚发起了写,此时别的线程读到的还是旧的数据,也有这种可能
缺点:空间换时间,写的时候,经常内存里会出现复制出来的一模一样的副本,对内存消耗过大,副本机制保证了保证读写并发优化,大量的并发读不需要锁互斥,list如果很大,要考虑在线上运行的时候,内存占用会是list大小的几倍。
ConcurrentLinkedQueue 线程安全的链表队列,有链表,底层肯定就是基于Node的链表,在源码中大量用到了cas和volatile变量
1 2 3 4 5 6 7 8 9 10 11 12 13 public class ConcurrentLinkedQueueDemo { public static void main (String[] args) { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); queue.offer("1" ); queue.offer("2" ); queue.offer("3" ); queue.poll(); queue.peek(); queue.remove("1" ); queue.size(); System.out.println(queue); } }
最关键的Node数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private static class Node <E > { volatile E item; volatile Node<E> next; Node(E item) { UNSAFE.putObject(this , itemOffset, item); } boolean casItem (E cmp, E val) { return UNSAFE.compareAndSwapObject(this , itemOffset, cmp, val); } void lazySetNext (Node<E> val) { UNSAFE.putOrderedObject(this , nextOffset, val); } boolean casNext (Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this , nextOffset, cmp, val); }
offer 入队
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public boolean offer (E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null ) { if (p.casNext(null , newNode)) { if (p != t) casTail(t, newNode); return true ; } } else if (p == q) p = (t != (t = tail)) ? t : head; else p = (p != t && t != (t = tail)) ? t : q; } }
poll 出队
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public E poll () { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null )) { if (p != h) updateHead(h, ((q = p.next) != null ) ? q : p); return item; } else if ((q = p.next) == null ) { updateHead(h, p); return null ; } else if (p == q) continue restartFromHead; else p = q; } } }
peek peek,也是获取队头的数据,但是他并不出队,就是获取了看一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public E peek () { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null || (q = p.next) == null ) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; } } }
remove 删除,删除队列里某一个元素,这个是需要遍历整个队列,如果发现相同的元素,就利用cas设置为null,然后通过指针变换,将这个元素删除掉,在使用队列的场景,删除是比较少用的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public boolean remove (Object o) { if (o == null ) return false ; Node<E> pred = null ; for (Node<E> p = first(); p != null ; p = succ(p)) { E item = p.item; if (item != null && o.equals(item) && p.casItem(item, null )) { Node<E> next = succ(p); if (pred != null && next != null ) pred.casNext(p, next); return true ; } pred = p; } return false ; }
size 获取队列的大小,将队列进行遍历,然后计算得出一个队列的大小,所以说这个大小并不是实时的,完全有可能在遍历的过程中,数据就发生了很大的变化,大部分的并发工具类,为了并发的效率,都在一定程度上舍弃了数据的一致性,只保留了最终一致性。
1 2 3 4 5 6 7 8 9 public int size () { int count = 0 ; for (Node<E> p = first(); p != null ; p = succ(p)) if (p.item != null ) if (++count == Integer.MAX_VALUE) break ; return count; }
contains 查询是否包含某个数据,也是一样的遍历,有可能在遍历过程中,数据发生了改变
1 2 3 4 5 6 7 8 9 public boolean contains (Object o) { if (o == null ) return false ; for (Node<E> p = first(); p != null ; p = succ(p)) { E item = p.item; if (item != null && o.equals(item)) return true ; } return false ; }
总结 大多数情况下,我们直接用并发包的工具就行了,如果想要数据完全保证一致性的数据结构,只能是自己采用加锁的方式去使用。
LinkedBlockingQueue ConcurrentLinkedQueue是无界队列,他是单向链表,不停往里面塞,可能会导致内存溢出。
LinkedBlockingQueue是有界队列,也是链表,但是限制了链表的长度。
1 2 3 4 5 6 7 8 9 10 11 12 13 public class LinkedBlockingQueueDemo { public static void main (String[] args) throws InterruptedException { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); queue.put("1" ); queue.put("2" ); queue.put("3" ); queue.take(); queue.remove("1" ); queue.size(); queue.iterator(); System.out.println(queue); } }
put LinkedBlockingQueue,大量运用了锁的API来进行阻塞和唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException(); int c = -1 ; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); }
take 出队也是一样,锁+阻塞和唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
size AtomicInteger维护的size,本来就是线程安全的,而且由于put和take都是基于锁来操作的,所以这个size是实时的,并且是准确的。
1 2 3 public int size () { return count.get(); }
ArrayBlockingQueue 这个也是有界队列,但是是基于数组实现的,队列的长度,就是数组的长度
1 2 3 4 5 6 7 8 9 public class ArrayBlockingQueueDemo { public static void main (String[] args) throws InterruptedException { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10 ); queue.put("zhangsan" ); queue.take(); queue.size(); queue.iterator(); } }
put 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void put (E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue (E x) { final Object[] items = this .items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); }
take 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue () { final Object[] items = this .items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return x; }
size和iterator size和iterator,是直接加独占锁,此时此刻是没有任何一个线程可以出队或者是入队的
1 2 3 4 5 6 7 8 9 public int size () { final ReentrantLock lock = this .lock; lock.lock(); try { return count; } finally { lock.unlock(); } }