0%

并发源码-并发工具包

ThreadLocal

1
2
3
4
5
6
7
public class ThreadLocalDemo {
public static void main(String[] args) {
ThreadLocal<Long> requestId = new ThreadLocal<Long>();
requestId.set(1L);
System.out.println(requestId.get());
}
}

ThreadLocal在并发编程里,非常常用,每个线程执行的时候,都保存一个变量的副本,每个线程自己用自己的,互不影响,经常用于保存一些上下文信息,或者请求的id之类的。

在Thread里,有个变量 ThreadLocal.ThreadLocalMap threadLocals,是一个map,这个就可以用于保存,每一个线程独有的一份数据。可以保存多个ThreadLocal的变量副本。

这个map的key就是ThreadLocal,value就是对应的变量副本。

内存泄露问题

有时候面试可能会问到,说ThreadLocal有内存泄漏的问题。

ThreadLocal的内部是ThreadLocalMap。ThreadLocalMap内部是由一个Entry数组组成。Entry类的构造函数为 Entry(弱引用的ThreadLocal对象, Object value对象)。因为Entry的key是一个弱引用的ThreadLocal对象,所以在 垃圾回收 之前,将会清除此Entry对象的key。那么, ThreadLocalMap 中就会出现 key 为 null 的 Entry,就没有办法访问这些 key 为 null 的 Entry 的 value。这些 value 被Entry对象引用,所以value所占内存不会被释放。

所以一定要记得调用remove方法,及时清除不用的对象,并且也会清理key为null的value。

CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);

new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println("线程1开始执行,休眠2秒");
Thread.sleep(1000);
System.out.println("线程1准备执行countDown操作");
countDownLatch.countDown();
System.out.println("线程1完成countDown操作");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println("线程2开始执行,休眠2秒");
Thread.sleep(1000);
System.out.println("线程2准备执行countDown操作");
countDownLatch.countDown();
System.out.println("线程2完成countDown操作");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

System.out.println("main线程准备执行countDownLatch的await操作,将同步阻塞等待");
countDownLatch.await();
System.out.println("所有线程完成countDown操作,阻塞等待结束");

}
}

CountDownLatch还是基于AQS来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// CountDownLatch.java
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
// 这个state,在初始化的时候,指定了就是2,其实countDown,就是把state -1
// 结合上面的demo,new CountDownLatch(2),这里的值应该是2
// 所以肯定是返回-1
return (getState() == 0) ? 1 : -1;
}

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {

final Node node = addWaiter(Node.SHARED);
boolean failed = true;

try {
for (;;) {
final Node p = node.predecessor();
// 这个时候,p应该是空Node
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 直接阻塞挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}


  1. state != 0
  2. 将main线程封装为一个node,加入AQS的等待队列
  3. 调用LockSupport.park()操作,挂起main线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// state - 1
if (tryReleaseShared(arg)) {
// 出队并唤醒线程
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 如果等于0了,说明countDown结束了
return nextc == 0;
}
}
// 从AQS的队列中,将之前阻塞的Node,也就是main线程唤醒。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

countDown,就是把state -1,直到减为0的时候。就调用doReleaseShared(),从队列里将阻塞的Node唤醒。

  1. await(),触发了一个线程入队阻塞等待
  2. countDown(),如果state == 0,唤醒队列里等待的所有的线程
  3. 所有线程被唤醒,发现state == 0,就从await()方法里退出

CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class CyclicBarrierDemo {
public static void main(String[] args) {

CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("所有线程都完成了自己的任务,现在可以合并结果");
}
});

new Thread(() -> {
try {
System.out.println("线程1执行一部分自己工作");
barrier.await();
System.out.println("最终结果合并完成,线程1可以退出");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try {
System.out.println("线程2执行一部分自己工作");
barrier.await();
System.out.println("最终结果合并完成,线程2可以退出");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try {
System.out.println("线程2执行一部分自己工作");
barrier.await();
System.out.println("最终结果合并完成,线程3可以退出");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();


}
}

构造方法源码:

1
2
3
4
5
6
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

核心逻辑都在await方法里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 搞了一个锁,保证并发安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 这个是3,所以调用一次await(),这个就会-1
int index = --count;
// 如果都为0,说明await调用满了,会触发动作的执行
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 这里有调用Condition的signalAll方法,唤醒所有阻塞的线程
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// 否则就死循环,利用Condition的await,将当前线程阻塞
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();
// 被唤醒以后,发现这个对象已经发生了变化,返回退出了
if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

private void nextGeneration() {
// signal completion of last generation

// 唤醒所有线程
trip.signalAll();
// set up next generation
count = parties;
// 可以重复利用再来一轮
generation = new Generation();
}

这个源码看下来,还是比较简单,就基于lock就实现了和condition就实现了。

线程1:

  1. ReentrantLock,加锁,保证多线程并发安全

  2. count = 3,–count = 2

  3. Condition.await(),底层,其实是释放了当前的lock锁,触发了把当前线程加入condition等待队列里,挂起当前线程

线程2:

  1. ReentrantLock,加锁,保证多线程并发安全

  2. count = 2,–count = 1

  3. Condition.await(),底层,其实是释放了当前的lock锁,触发了把当前线程加入condition等待队列里,挂起当前线程

线程3:

  1. ReentrantLock,加锁,保证多线程并发安全

  2. count = 1,–count = 0

  3. Condition.await(),底层,其实是释放了当前的lock锁,触发了把当前线程加入condition等待队列里,挂起当前线程。然后执行构造方法传入的action,并唤醒所有阻塞的线程,也就是把condition队列里的线程全部唤醒。

Semaphore

这个是等待指定的线程完成任务,触发退出条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SemaphoreDemo {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(0);

new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println("线程1执行一个计算任务");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println("线程2执行一个计算任务");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

semaphore.acquire(1);
System.out.println("等待一个线程完成任务即可");
}
}

只要有一个线程完成,阻塞的acquire代码就会得到继续执行。

new Semaphore(0);这行代码,在就是给state传了了一个0。

然后acquire,是获取一个读锁,最后是调用到了acquireSharedInterruptibly方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 刚调用,就肯定是-1
if (tryAcquireShared(arg) < 0)
// 尝试获取锁,阻塞等待
doAcquireSharedInterruptibly(arg);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 这里是0
int available = getState();
// 0 - 1 = -1,
int remaining = available - acquires;
// 所以返回-1
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

现在,就是相当于有一个线程,在等待获取锁。然后在线程release的时候,会把state +1,然后唤醒阻塞的线程。

因为有死循环,被唤醒的线程,再次获取锁,此时

int remaining = available - acquires; -> 1 - 1 = 0,拿到了锁。程序继续执行。

ConcurrentHashMap

HashMap在并发的情况下,会有bug,比如说死循环,然后导致数据丢失。所以在并发情况下,一定要去使用ConcurrentHashMap来编程。

那这个ConcurrentHashMap的解决方案,实际上就是分段加锁,HashMap底层源码不是数组么,如果每次操作都加锁的话,肯定性能不好,所以ConcurrentHashMap就提供了分段加锁的方案,把一份数据拆分为多个segment,对每个段设置一把小锁,put操作只对某个段的segment进行加锁。然后其他线程操作其他的数据,是没有锁竞争的,大大的提高了安全性和并发的效率。

put

分析一下最核心的put方法,看看ConcurrentHashMap的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 和HashMap一样,将hashcode做一个运算,让高低16位都参与到运算中
// (h ^ (h >>> 16)) & HASH_BITS;
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// i = (n - 1) & hash 位运算,相当于取模
// tabAt,(Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 利用CAS操作对数组进行插入,成功后就break了
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
// 走到这里,表示数组的某个位置已经有Node了,发生了hash冲突,要追加链表,或者往红黑树挂节点
V oldVal = null;
// f就是数组上某个位置链表上的第一个Node,对某一个段进行加锁
// 加锁后的操作都是线程安全的
// 所以默认,是可以16个线程并发操作的
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 对红黑树的处理
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// 如果一个链表的元素的数量超过了8,达到了一个阈值之后,就会将链表转换为红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

在treeifyBin方法中,还包含了对是否需要扩容的判断,扩容后,数组的size必须是原来的2的倍数,这样hash运算才能能定位到对应的位置。

同时,get方法和size方法是不需要加锁的,因为都是通过valotile的方式去读取的值,有了可见性的保证,线程在读取数据的时候,load屏障,因为有MESI机制的存在,会先嗅探一下无效队列,如果某个数据被其他线程修改了,此时马上过期掉本地高速缓存里的缓存数据,invalid(I),然后再读的时候,就需要发送read消息到总线,从其他线程修改修改这个值的线程的高速缓存里,必须这个加载到最新的值,不需要加锁。

CopyOnWriteArrayList

ArrayList是线程不安全的,取而代之是的CopyOnWriteArrayList,写时复制的ArrayList。

1
2
3
4
5
6
7
8
9
10
11
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("zhangsan");
list.add("lisi");
list.set(0, "123");
list.get(0);
list.remove(0);
list.iterator();
}
}

构造方法:

1
2
3
4
5
6
7
8
9
10
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
/**
* Creates an empty list.
*/
public CopyOnWriteArrayList() {
setArray(new Object[0]);
}

CopyOnWriteArrayList,底层维护了一个volatile的数组变量,保证了多线程读写的可见性,只要有一个线程修改了这个数组,其他线程马上能感知到变化。

每一个数组,还维护了一把ReentrantLock锁,用独占锁来保证,在修改数组里的数组的时候,只有一个线程获取到锁进行操作。所以CopyOnWriteArrayList的并发写性能不好,只能有一个线程可以进行写操作。

add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean add(E e) {
final ReentrantLock lock = this.lock;
// 独占锁
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 先用Arrays.copyOf复制出来一个数组,然后对复制后的数组,进行添加操作
// 然后再将新数组,设置到volatile修饰的数组中
// 这就是所谓的写时复制
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

set

接下来看修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E set(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
E oldValue = get(elements, index);

if (oldValue != element) {
int len = elements.length;
// 复制一个新数组
Object[] newElements = Arrays.copyOf(elements, len);
// 对新数组的值做修改
newElements[index] = element;
// 设置回去,写时复制
setArray(newElements);
} else {
// Not quite a no-op; ensures volatile write semantics
setArray(elements);
}
return oldValue;
} finally {
lock.unlock();
}
}

remove

删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E remove(int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
// 如果是删除的最后面的元素,直接只复制前面的部分到新数组
setArray(Arrays.copyOf(elements, len - 1));
else {
// 删除中间的元素,就把该元素前后的部分复制到新数组中
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}

get

读操作的源码很简单,这里没有加锁,因为读的就是volatile的变量,增删改,有独占锁来保证只有一个线程会修改,并且每次都是复制一个新的数组,修改完毕后再设置到数组变量中,同时因为数组变量是volatile修饰的,所以读操作的线程每次都能及时的读到变化,或者修改操作还没有来得及完成写入变量,我读到的也是老数组的值,不存在锁竞争的操作,性能很高。

1
2
3
public E get(int index) {
return get(getArray(), index);
}

Iterator

迭代器的方法较多,就不全部复制了,他的理念和读操作是类似的,在获取迭代器的时候,将老数组的值作为一个snapshot存下来,进行循环迭代,所以这个snapshot,是不允许修改或者删除的,只能遍历。

1
2
3
4
5
6
7
8
9
10
11
12
13
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}
static final class COWIterator<E> implements ListIterator<E> {
/** Snapshot of the array */
private final Object[] snapshot;
/** Index of element to be returned by subsequent call to next. */
private int cursor;

private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}

总结

它的核心理念是弱一致性提升读并发,多个线程并发读写这个list,中间肯定是有复制后的数组被修改好了,但是还没有来得及写入到array变量中,这个时候读到的肯定就是老数组里的数据,在这个过程中,多个线程读到的数据可能是不一样的,但是数据的最终是一致的。

优点:读和写不互斥的,写和写互斥,同一时间就一个线程可以写,但是写的同时可以允许其他所有人来读;读和读也是并发的;比读写锁机制还要好;他也不涉及到Unsafe.getObjectVolatile

使用场景:多线程并发安全性,可以选用他;尽可能是读多写少的场景,大量的读是不被影响的;可能有一个线程刚刚发起了写,此时别的线程读到的还是旧的数据,也有这种可能

缺点:空间换时间,写的时候,经常内存里会出现复制出来的一模一样的副本,对内存消耗过大,副本机制保证了保证读写并发优化,大量的并发读不需要锁互斥,list如果很大,要考虑在线上运行的时候,内存占用会是list大小的几倍。

ConcurrentLinkedQueue

线程安全的链表队列,有链表,底层肯定就是基于Node的链表,在源码中大量用到了cas和volatile变量

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("1");
queue.offer("2");
queue.offer("3");
queue.poll();
queue.peek();
queue.remove("1");
queue.size();
System.out.println(queue);
}
}

最关键的Node数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static class Node<E> {
// 保证可见性
volatile E item;
volatile Node<E> next;
// 利用UNSAFE通过偏移量来设置
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}

boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}

boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

offer

入队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
// 重新获取tail节点,继续相同逻辑
Node<E> q = p.next;
if (q == null) {
// cas操作,只有一个线程会成功,没有加锁,保证只有一个线程会入队成功
// 如果另外线程失败了,重新进入循环,变化指针,将指针往后挪,一直重试到成功为止
if (p.casNext(null, newNode)) {
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}

}
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
p = (p != t && t != (t = tail)) ? t : q;
}
}

poll

出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 同样的,通过cas将队头出队,然后将下一个节点变成队头,只有一个线程会成功
// 如果失败了,就重新获取队头,循环一直到成功为止
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

peek

peek,也是获取队头的数据,但是他并不出队,就是获取了看一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

remove

删除,删除队列里某一个元素,这个是需要遍历整个队列,如果发现相同的元素,就利用cas设置为null,然后通过指针变换,将这个元素删除掉,在使用队列的场景,删除是比较少用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public boolean remove(Object o) {
if (o == null) return false;
Node<E> pred = null;
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null &&
o.equals(item) &&
p.casItem(item, null)) {
Node<E> next = succ(p);
if (pred != null && next != null)
pred.casNext(p, next);
return true;
}
pred = p;
}
return false;
}

size

获取队列的大小,将队列进行遍历,然后计算得出一个队列的大小,所以说这个大小并不是实时的,完全有可能在遍历的过程中,数据就发生了很大的变化,大部分的并发工具类,为了并发的效率,都在一定程度上舍弃了数据的一致性,只保留了最终一致性。

1
2
3
4
5
6
7
8
9
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}

contains

查询是否包含某个数据,也是一样的遍历,有可能在遍历过程中,数据发生了改变

1
2
3
4
5
6
7
8
9
public boolean contains(Object o) {
if (o == null) return false;
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null && o.equals(item))
return true;
}
return false;
}

总结

大多数情况下,我们直接用并发包的工具就行了,如果想要数据完全保证一致性的数据结构,只能是自己采用加锁的方式去使用。

LinkedBlockingQueue

ConcurrentLinkedQueue是无界队列,他是单向链表,不停往里面塞,可能会导致内存溢出。

LinkedBlockingQueue是有界队列,也是链表,但是限制了链表的长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class LinkedBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
queue.put("1");
queue.put("2");
queue.put("3");
queue.take();
queue.remove("1");
queue.size();
queue.iterator();
System.out.println(queue);
}
}

put

LinkedBlockingQueue,大量运用了锁的API来进行阻塞和唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
// 一个put,独占锁
final ReentrantLock putLock = this.putLock;
// 当前队列的大小
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// capacity就是有界队列的大小,如果以及满了,就阻塞
while (count.get() == capacity) {
// 这个是putLock对应的Condition
notFull.await();
}
// 如果没有满,那么就从队尾入队
enqueue(node);
c = count.getAndIncrement();
// 此时判断一下别的线程有没有可能消费了,队列有空间,唤醒正在阻塞的putLock线程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放写锁
putLock.unlock();
}
// c == 0也就是全部被消费完了,说明有线程正在阻塞了,尝试唤醒所有等待takeLock的线程
if (c == 0)
signalNotEmpty();
}

take

出队也是一样,锁+阻塞和唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列空了,阻塞等待
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 队列里还有,再尝试唤醒一个正在等待的线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 队列是满的,说明有put的线程在阻塞等待,take以后尝试唤醒
if (c == capacity)
signalNotFull();
return x;
}

size

AtomicInteger维护的size,本来就是线程安全的,而且由于put和take都是基于锁来操作的,所以这个size是实时的,并且是准确的。

1
2
3
public int size() {
return count.get();
}

ArrayBlockingQueue

这个也是有界队列,但是是基于数组实现的,队列的长度,就是数组的长度

1
2
3
4
5
6
7
8
9
public class ArrayBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
queue.put("zhangsan");
queue.take();
queue.size();
queue.iterator();
}
}

put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 同样也是加锁
lock.lockInterruptibly();
try {
// 直接获取数组的长度用于比较,数组满了以后,就直接阻塞
while (count == items.length)
notFull.await();
// 入队
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// 因为有锁,所以可以直接修改数组里的值
items[putIndex] = x;
// 每放入一个元素后,就往后挪一位,直到数组所有索引都被使用过以后,又从头开始
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒正在阻塞take的线程
notEmpty.signal();
}

take

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列是空的,take线程会阻塞
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 从0开始往后读取
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 读到队尾后,又从头开始读
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒正在put阻塞的线程
notFull.signal();
return x;
}

size和iterator

size和iterator,是直接加独占锁,此时此刻是没有任何一个线程可以出队或者是入队的

1
2
3
4
5
6
7
8
9
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}