0%

并发源码-锁

synchronized

synchronized就是加锁,一旦加锁以后,就只能当前线程访问和修改加锁的变量,其他线程只能阻塞等待,这就保证了原子性。

synchronized可以锁两种对象,一种是对某个实例对象进行加锁,一种是对类进行加锁。对类加锁,本质上也是对实例加锁,只不过是对class对象进行加锁。

如果用synchronized修饰普通方法,那就是对当前类的实例进行加锁。

1
2
3
4
5
6
7
8
9
synchronized void increment() {

}
synchronized(myObject) {

}
synchronized(this) {

}

如果用synchronized修饰静态方法,那就是对当前类的class对象进行加锁。

1
2
3
4
5
6
synchronized static void increment() {

}
synchronized(Demo.class) {

}

synchronized涉及到jvm底层的两个指令,分别是monitorenter和monitorexit两个指令。

1
2
3
monitorenter
// 代码对应的指令
monitorexit

每个对象都有一个关联的monitor属性,比如一个对象实例就有一个monitor,一个类的class对象也有一个monitor,如果要对这个对象进行加锁,那么必须获取这个对象关联的monitor的lock锁。

原理大概就是,monitor里有一个从0开始的计数器,如果一个线程要获取monitor的锁,就看他的计数器是不是0,如果是0,说明没人获取锁,然后拿到锁以后就加1。

同时,monitor是支持重入锁的

1
2
3
4
5
6
synchronized(myObject) {
// 一大堆的代码
synchronized(myObject) {
// 一大堆的代码
}
}

这里就对同一个对象,加了两次锁,猜一下就知道,他原理肯定是每次进到synchronized(myObject)代码的时候,先看对象的monitor里的计数器是不是0,如果是0,就直接获取锁,并把对象里的monitor的计数器加1,如果不是0,看加锁的时候是不是同一个线程,如果是,就再加1,如果不是同一个线程呢,那就阻塞等待吧。

然后执行代码代码块退出的时候,会执行monitorexit的指令,此时获取锁的线程就会对那个对象的monitor的计数器减1,如果有多次重入加锁就会对应多次减1,直到最后,计数器是0。

然后之前阻塞的线程,再次竞争锁,但是只有一个线程可以拿到锁。

image-20201022162819834

wait&notify

对于有锁的代码,也就离不开wait和notify的使用,在加锁的代码中,可以wait挂起线程,并释放锁,除非其他线程对同一个对象调用notify,线程被唤醒,然后再次尝试获取锁。

wait与notify,跟synchronized是同样的原理,都是monitor,对象的monitor里,除了计数器,还有一个叫wait set的变量,加锁的时候,会对对象的monitor的计数器+1,如果调用wait方法,会将当前线程的id之类的东西,加入到monitor的wait set中,如果对这个对象调用notifyAll(),就会清空wait set,并同时唤醒所有线程。

image-20201022163420129

面试有个经常问的问题,wait与sleep的区别:前者释放锁,后者不释放锁

wait(),必须是有人notify唤醒他

wait(timeout),阻塞一段时间,然后自己唤醒,继续争抢锁

wait与notify,必须在synchronized代码块中使用,因为必须是拥有monitor lock的线程才可以执行wait与notify操作

因此wait与notify,必须与synchornized一起,对同一个对象进行使用,这样他们对应的monitor才是一样的

notify()与notifyall():前者就唤醒block状态的一个线程,后者唤醒block状态的所有线程

synchronized能不能保证可见性

synchronized是可以保证可见性的,那么单例模式双检锁的volatile,是做什么用的呢?看下面的解释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DoubleCheckSingleton {

// volatile在这里的作用是为了防止指令重排序
private volatile static DoubleCheckSingleton instance;

private Socket socket;

private DoubleCheckSingleton() {
this.socket = new Socket();
}

public static DoubleCheckSingleton getInstance() {

if (instance == null) {
// synchronized本身就是可以保证可见性的
synchronized (DoubleCheckSingleton.class) {
if (instance == null) {
instance = new DoubleCheckSingleton();
// 对象的初始化是分为几个步骤的:初始化一块内存空间,给对象里的变量进行初始化,执行构造方法等等
// 有可能会出现指令重排,有可能会导致DoubleCheckSingleton里的字段还没有在构造函数里初始化,比如socket,还是null
// 但是内存空间已经分配好了,指针也分配了,也就是instance变量不是null
// 此时另外一个线程来调用的时候,发现instance不是null,然后返回,马上进行相关调用,结果发现socket是null,调用失败
// volatile有内存屏障,写操作后有storeload屏障,会保证写完之前不能被读到
}
}
}
return instance;
}
}

加锁的原理

加锁的原理,和Lock加锁的原理类似,只不过它是基于C++的程序来实现的。

Java对象分为对象头和实例变量两块,其中实例变量就是平时看到的对象里的那些变量数据。然后对象头包含了两块东西,一个是Mark Word(包含hashCode、锁数据、GC数据,等等),另一个是Class Metadata Address(包含了指向类的元数据的指针)。

在Mark Word里有一个指针,指向了这个对象实例关联的monitor的地址,这个monitor是c++实现的,不是java实现的。这个monitor实际上是c++实现的一个ObjectMonitor对象,里面包含了一个_owner指针,指向了持有锁的线程。

ObjectMonitor里有一个entrylist,想要加锁的线程全部先进入这个entrylist等待获取机会尝试加锁,实际有机会加锁的线程,就会设置_owner指针指向自己,然后对_count计数器累加1次。

各个线程尝试竞争进行加锁,此时竞争加锁是在JDK 1.6以后优化成了基于CAS来进行加锁,理解为跟之前的Lock API的加锁机制是类似的,CAS操作,操作_count计数器,比如说将__count值尝试从0变为1。

如果成功了,那么加锁成功了;如果失败了,那么加锁失败了。

然后释放锁的时候,先是对_count计数器递减1,如果为0了就会设置_owner为null,不再指向自己,代表自己彻底释放锁。

如果获取锁的线程执行wait,就会将计数器递减,同时_owner设置为null,然后自己进入waitset中等待唤醒,别人获取了锁执行notify的时候就会唤醒waitset中的线程竞争尝试获取锁。特别在JDk 1.6之后,对synchronized内的加锁机制做了大量的优化,优化为CAS加锁。

如果把ReentrantLock底层的源码都读懂了,AQS的机制都读懂了之后,那么synchronized底层的实现差不多的,synchronized的ObjectMonitor的地位就跟ReentrantLock里的AQS是差不多的。

内存屏障

java的并发技术底层很多都对应了内存屏障的使用,包括synchronized,他底层也是依托于各种不同的内存屏障来保证可见性和有序性的。

synchronized(this) { -> monitorenter

Load内存屏障

Acquire内存屏障

int a = b;

c = 1; => synchronized代码块里面还是可能会发生指令重排

Release内存屏障

} -> monitorexit

Store内存屏障

可见性

按照可见性来划分的话,内存屏障可以分为Load屏障和Store屏障。

Load屏障的作用是执行refresh处理器缓存的操作,说白了就是对别的处理器更新过的变量,从其他处理器的高速缓存(或者主内存)加载数据到自己的高速缓存来,确保自己看到的是最新的数据。

Store屏障的作用是执行flush处理器缓存的操作,说白了就是把自己当前处理器更新的变量的值,都刷新到高速缓存(或者主内存)里去。

在monitorexit指令之后,会有一个Store屏障,让线程把自己在同步代码块里修改的变量的值都执行flush处理器缓存的操作,刷到高速缓存(或者主内存)里去;然后在monitorenter指令之后会加一个Load屏障,执行refresh处理器缓存的操作,把别的处理器修改过的最新值加载到自己高速缓存里来

所以说通过Load屏障和Store屏障,就可以让synchronized保证可见性。

有序性

按照有序性保障来划分的话,还可以分为Acquire屏障和Release屏障。

在monitorenter指令之后,Load屏障之后,会加一个Acquire屏障,这个屏障的作用是禁止读操作和读写操作之间发生指令重排序。在monitorexit指令之前,会加一个Release屏障,这个屏障的作用是禁止写操作和读写操作之间发生重排序。

所以说,通过 Acquire屏障和Release屏障,就可以让synchronzied保证有序性,只有synchronized内部的指令可以重排序,但是绝对不会跟外部的指令发生重排序。

synchronized:

(1)原子性:加锁和释放锁,ObjectMonitor

(2)可见性:加了Load屏障和Store屏障,释放锁flush数据,加锁会refresh数据

(3)有序性:Acquire屏障和Release屏障,保证同步代码块内部的指令可以重排,但是同步代码块内部的指令和外面的指令是不能重排的

锁优化

锁消除

锁消除是JIT编译器对synchronized锁做的优化,在编译的时候,JIT会通过逃逸分析技术,来分析synchronized锁对象,是不是只可能被一个线程来加锁,没有其他的线程来竞争加锁,这个时候编译就不用加入monitorenter和monitorexit的指令。

如果只有一个线程在获取锁,就可以消除这个锁了,不涉及到多个线程来竞争。

锁粗化

JIT编译器如果发现有代码里连续多次加锁释放锁的代码,会给合并为一个锁,就是锁粗化,把一个锁给搞粗了,避免频繁多次加锁释放锁。

偏向锁

monitorenter和monitorexit是要使用CAS操作加锁和释放锁的,开销较大,因此如果发现大概率只有一个线程会主要竞争一个锁,那么会给这个锁维护一个偏好(Bias),后面他加锁和释放锁,基于Bias来执行,不需要通过CAS。性能会提升很多。

但是如果有偏好之外的线程来竞争锁,就要收回之前分配的偏好。可能只有一个线程会来竞争一个锁,但是也有可能会有其他的线程来竞争这个锁,但是其他线程唉竞争锁的概率很小。如果有其他的线程来竞争这个锁,此时就会收回之前那个线程分配的那个Bias偏好。

轻量级锁

如果偏向锁没能成功实现,就是因为不同线程竞争锁太频繁了,此时就会尝试采用轻量级锁的方式来加锁,就是将对象头的Mark Word里有一个轻量级锁指针,尝试指向持有锁的线程,然后判断一下是不是自己加的锁。如果是自己加的锁,那就执行代码就好了。如果不是自己加的锁,那就是加锁失败,说明有其他人加了锁,这个时候就是升级为重量级锁。

适应性锁

这是JIT编译器对锁做的另外一个优化,如果各个线程持有锁的时间很短,那么一个线程竞争锁不到,就会暂停,发生上下文切换,让其他线程来执行。但是其他线程很快释放锁了,然后暂停的线程再次被唤醒。也就是说在这种情况下,线程会频繁的上下文切换,导致开销过大。所以对这种线程持有锁时间很短的情况,是可以采取忙等策略的,也就是一个线程没竞争到锁,进入一个while循环不停等待,不会暂停不会发生线程上下文切换,等到机会获取锁就继续执行好了。

ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ReentrantLockDemo {
static int i = 0;
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {

new Thread() {
@Override
public void run() {
for (int j = 0; j < 10; j++) {
lock.lock();
i++;
System.out.println(i);
lock.unlock();
}
}
}.start();


new Thread() {
@Override
public void run() {
for (int j = 0; j < 10; j++) {
lock.lock();
i++;
System.out.println(i);
lock.unlock();
}
}
}.start();
}
}

Java除了使用关键字synchronized外,还可以使用ReentrantLock实现独占锁的功能。而且ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。而且ReentrantLock还支持读写锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ReentrantLockDemo {
static int i = 0;
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {

new Thread() {
@Override
public void run() {
for (int j = 0; j < 10; j++) {
lock.lock();
i++;
System.out.println(i);
lock.unlock();
}
}
}.start();


new Thread() {
@Override
public void run() {
for (int j = 0; j < 10; j++) {
lock.lock();
i++;
System.out.println(i);
lock.unlock();
}
}
}.start();
}
}

翻一下ReentrantLock的源码,发现它是基于AbstractQueuedSynchronizer做的,也就是我们平常说的AQS。

1
2
3
4
// 默认就构造了一个非公平锁,NonfairSync是基于AQS实现的组件
public ReentrantLock() {
sync = new NonfairSync();
}

ReentrantLock的lock方法, 也是直接调用了sync的lock()方法,ReentrantLock在加锁的时候,就是直接基于Sync来实现的lock操作

1
2
3
public void lock() {
sync.lock();
}

而Sync,是一个抽象的静态类,他继承自AbstractQueuedSynchronizer

1
2
3
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}

AQS

AbstractQueuedSynchronizer,抽象队列同步器。

ReentractLock和ReadWriteReentractLock,这个锁的API都是基于AQS来实现的,我们一般也没有直接用AQS的API来做开发,但是并发包里的很多类,都是基于AQS来实现的。截图看下,除了锁,还有Semaphore,CountDownLatch,ThreadPoolExecutor这些。

image-20201027140545860

AQS是一个双向链表,或者说一个双端队列,原理大致如图,通过state核心变量控制是否加锁成功,并记录加锁的线程,等待锁的线程,放入队列中

image-20201027154725975

lock加锁

AQS中使用了大量的CAS操作来实现加锁和释放锁的代码,AQS里有一个核心变量,state,代表了锁的状态,0就是没线程加锁,1就是有线程加锁了。

1
2
3
4
/**
* The synchronization state.
*/
private volatile int state;

所以加锁的代码其实很简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// cas操作修改state变量,state=1表示已经有线程获取到了锁
if (compareAndSetState(0, 1))
// 当前获取到锁的线程是谁
setExclusiveOwnerThread(Thread.currentThread());
else
// 没有获取到锁,排队
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

如何实现可重入锁?

acquire(1);

1
2
3
4
5
6
// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

会调用到NonfairSync的tryAcquire()方法,通过不复杂的代码,实现了锁的重入,如果加锁的线程和当前线程是同一个线程,就state+1,并返回true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// volatile在JDK源码中得到了大量的运用
int c = getState();
// 同一个线程调用lock方法,所以state不为0
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 并且当前线程和加锁的线程是相同的,那就再次加锁
else if (current == getExclusiveOwnerThread()) {
// 所以state + 1 = 2
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 修改state,volatile保证了可见性
setState(nextc);
return true;
}
// 其他线程,就走到这了,加锁失败了,返回false
return false;
}

返回false以后,就要执行addWaiter,构造AQS等待队列里的等待节点了

入队等待

1
2
3
4
5
6
7
8
9
// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
// 第一个加锁失败
// 调用acquireQueued入队
if (!tryAcquire(arg) &&
// addWaiter(Node.EXCLUSIVE),表示这是一个独占锁的等待节点,另外的还有共享锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// AbstractQueuedSynchronizer.java
private Node addWaiter(Node mode) {
// 将当前线程封装成了一个Node,mode = EXCLUSIVE(排他锁,尝试获取一个排他锁,但是失败了)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// tail刚开始是null,把当前队列的尾巴,设置当前节点的pred
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 第一个入队的线程直接到这里来了
enq(node);
return node;
}

看一下Node的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
// 如果一个线程无法获取到锁的话,会进入一个阻塞等待的状态
// 卡住不动,线程挂起,阻塞状态又细分为很多种不同的阻塞状态:
// CANCELED、SIGNAL、CONDITION、PROPAGATE
volatile int waitStatus;
// 一个节点可以有上一个节点,prev指针,指向了Node的上一个Node
volatile Node prev;
// 一个节点还可以有下一个节点,next指针,指向了Node的下一个Node
volatile Node next;
// Node里面封装了一个线程
volatile Thread thread;
// 可以认为是下一个等待线程
Node nextWaiter;

加锁失败的话,就将当前线程封装成一个Node,并且有prev何next指针,所以如果有多个处于阻塞等待状态的线程,封装后的Node组成了一个双向链表,也就成了一个队列。

AQS原理-双向链表

接着看一下入队的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// AbstractQueuedSynchronizer.java
private Node enq(final Node node) {
for (;;) {
// 1:第一个入队的Node,tail是null
// 3: 循环第二次,tail=空Node
Node t = tail;
if (t == null) { // Must initialize
// 2:走到这里,用cas将head设置为空节点,然后tail也是空节点
if (compareAndSetHead(new Node()))
// 循环没有退出,继续执行循环
tail = head;
} else {
// 4:node.prev=空Node
node.prev = t;
// 5:用cas将当前节点设置为tail
if (compareAndSetTail(t, node)) {
// 6:讲当前节点设置为之前tail的next
t.next = node;
return t;
}
}
}
}

这就是**addWaiter(Node.EXCLUSIVE)**的逻辑,构造完Node节点后,其实节点就已经入队了,接下来就再次获取锁,如果不能获取,就挂起阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// AbstractQueuedSynchronizer.java
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取到node的前一个节点
final Node p = node.predecessor();
// 再尝试一次加锁,如果加锁成功了,那当前节点就变成head了
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 再次尝试加锁也失败了,判断当前线程是否需要挂起,阻塞等待
// 如果需要,就调用park方法挂起等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// pred是一个空node,那么ws就是空或者0
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 空node的waitStatus设置为Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

private final boolean parkAndCheckInterrupt() {
// 将当前线程挂起,阻塞了,必须由别的线程调用unpark操作唤醒
LockSupport.park(this);
return Thread.interrupted();
}

非公平锁

公平锁就是,排队,一个一个的获取到锁,非公平锁就是抢占式的。

而ReentrantLock,默认就是非公平锁,看下构造方法就知道,不同的锁,是不同是Sync类实现的。

1
2
3
4
5
6
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

在源码里面,判断是不是非公平锁,就一行代码。我们看一下FairSync的tryAcquire方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 就是这个hasQueuedPredecessors方法,加锁之前判断一下,队列里是不是有在等待的的线程,如果有,就先不尝试加锁了,直接入队
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h != t 说明队列不为空
// h.next 不为空,拿到排队的s节点,s不是自己,返回true
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

release释放锁

加锁的过程看完了,接下来是释放锁,释放锁是release方法,释放锁以后修改status变量,然后唤醒队列里等待的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public final boolean release(int arg) {
// 先释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 然后唤醒队列里的node
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// state就是加锁的数量
int c = getState() - releases;
// 必须得是同一个线程来进行锁的释放
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// cas操作,设置state
setState(c);
return free;
}

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 找到当前节点的下一个节点唤醒
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 否则从队列尾部,往前找node,然后进行唤醒
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

那么唤醒之后呢?别忘了acquireQueued方法里,线程阻塞的地方,是一个死循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// AbstractQueuedSynchronizer.java
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 重新唤醒后,再次尝试加锁,会加锁成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}

if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 那么在这里被唤醒以后,重新执行for循环
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

ReentrantReadWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ReadWriteLockDemo {
public static void main(String[] args) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();


lock.writeLock().lock();
lock.writeLock().unlock();


lock.readLock().lock();
lock.readLock().unlock();
}
}

读写锁,读锁和写锁是分开的。

读锁和写锁是互斥的,加了读锁之后,就不能加写锁;如果加了写锁,就不能加读锁。

如果有人加了读锁之后,别人可以同时加读锁。

如果有人在读数据,就不能有人写数据,读锁 -> 写锁 -> 互斥

如果有人在写数据,别人不能写数据,写锁 -> 写锁 -> 互斥;如果有人在写数据,别人也不能读数据,写锁 -> 读锁 > 互斥

在源码里,两个方法对应两把锁,默认是非公平锁的实现。

1
2
3
4
5
6
7
8
9
10
// ReentrantReadWriteLock.java
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
// 写锁
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
// 读锁
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }

写锁

加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// ReentrantReadWriteLock.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
// 默认是非公平锁
Thread current = Thread.currentThread();
// 用来控制锁状态的state变量
int c = getState();
// 这个方法,将数字拆成了32位的二进制
// 二进制里的高低16位,分别代表了读锁和写锁
// 这里是获取写锁的数量,也就是获取的是state的低16位的值,代表了写锁的状态
int w = exclusiveCount(c);
// c!=0,说明有人加过锁
if (c != 0) {
// 如果写锁是0,说明肯定至少有一个读锁,就不能再加写锁了,线程会排队阻塞
// 如果写锁不是0,那么当前线程必须和占用写锁的线程是同一个线程,这里就是可重入锁的判断了!
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 锁的数量是否超过限制
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 将
setState(c + acquires);
return true;
}
// 这里c=0,如果是非公平锁,所以直接加锁
// 如果是公平锁,此时会判断如果队列中有等待线程,就不加锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

基本跟之前看到的是一样的,如果加写锁的话,state += 1,锁占用线程

可重入锁

如果之前加过锁了,必须是同一个线程再次加锁,实现了可重入锁

特别要注意这个高低16位的细节

image-20201222162725152

在已经有锁的情况下,线程2来加锁呢,那么tryAcquire方法肯定就是返回false。

然后就会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

这里和之前加锁的逻辑,没有太大区别

释放锁

释放锁的代码,和普通的锁释放也没有什么区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 锁释放成功后,将队列里的node,唤醒
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 判断一下当前线程和加锁的线程是否是同一个线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 本次释放锁后,判断是否写锁已经全部释放完毕,然后清空占用锁的线程
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

读锁

加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// ReentrantReadWriteLock.java
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();

// 读写锁互斥,如果有写锁,就不能加读锁了
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取高16位的值来获取读锁的数量
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
// cas操作成功,即认为读锁加成功
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

// 否则就进入死循环,加锁成功为止
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 还是利用cas加锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// AbstractQueuedSynchronizer.java
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();

if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 依然是利用cas,释放锁
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒队列里等待的线程,也就是等待写锁的线程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

1748.png

Condition

Condition可以实现wait和notify的效果,需要在加锁以后,进行阻塞操作,在底层源码里也是基于AQS来实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ConditionWaitDemo {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();

public static void main(String[] args) throws InterruptedException {

new Thread() {
@Override
public void run() {
lock.lock();
System.out.println("第一个线程加锁");
System.out.println("第一个线程阻塞");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一个线程被唤醒");
lock.unlock();
System.out.println("第一个线程释放锁");
}
}.start();

Thread.sleep(3000);


new Thread() {
@Override
public void run() {
lock.lock();
System.out.println("第二个线程加锁");
System.out.println("第二个线程唤醒第一个线程");
condition.signal();
lock.unlock();
System.out.println("第二个线程释放锁");
}
}.start();
}
}

阻塞

加锁成功以后,将自己加入condition等待队列、释放锁、挂起自己。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 加入condition等待队列
Node node = addConditionWaiter();
// 释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 挂起自己
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被唤醒以后,还要继续竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

1758

唤醒

signal方法的唤醒,是把condition等待队列中的元素,转化为一个加锁等待队列中的元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 关键代码就在 transferForSignal
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 修改node的waitStatus从CONDITION -> 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 这个就是将Node入队到,加锁队列中
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 在入队以后,将阻塞的线程唤醒,然后继续去竞争写锁
LockSupport.unpark(node.thread);
return true;
}

Condition的源码, 相对来说还是比较简单。

总结一下,ReentrantLock是基于AQS来实现的,而AQS是一个双端的队列,加锁后需要在队列里排队,这里运用了大量的CAS操作来保证并发安全。读写锁分离,就是利用了state变量的高低16位,高16位,是读锁,低16位,是写锁。

Condition,是一个单独的Condition队列,在调用await挂起后,会入队,调用signal唤醒后,会入队到加锁的队列里,重新去竞争锁。