synchronized synchronized就是加锁,一旦加锁以后,就只能当前线程访问和修改加锁的变量,其他线程只能阻塞等待,这就保证了原子性。
synchronized可以锁两种对象,一种是对某个实例对象进行加锁,一种是对类进行加锁。对类加锁,本质上也是对实例加锁,只不过是对class对象进行加锁。
如果用synchronized修饰普通方法,那就是对当前类的实例进行加锁。
1 2 3 4 5 6 7 8 9 synchronized void increment () { } synchronized (myObject) { } synchronized (this ) { }
如果用synchronized修饰静态方法,那就是对当前类的class对象进行加锁。
1 2 3 4 5 6 synchronized static void increment () { } synchronized (Demo.class) { }
synchronized涉及到jvm底层的两个指令,分别是monitorenter和monitorexit两个指令。
1 2 3 monitorenter monitorexit
每个对象都有一个关联的monitor属性,比如一个对象实例就有一个monitor,一个类的class对象也有一个monitor,如果要对这个对象进行加锁,那么必须获取这个对象关联的monitor的lock锁。
原理大概就是,monitor里有一个从0开始的计数器,如果一个线程要获取monitor的锁,就看他的计数器是不是0,如果是0,说明没人获取锁,然后拿到锁以后就加1。
同时,monitor是支持重入锁的
1 2 3 4 5 6 synchronized (myObject) { synchronized (myObject) { } }
这里就对同一个对象,加了两次锁,猜一下就知道,他原理肯定是每次进到synchronized(myObject)代码的时候,先看对象的monitor里的计数器是不是0,如果是0,就直接获取锁,并把对象里的monitor的计数器加1,如果不是0,看加锁的时候是不是同一个线程,如果是,就再加1,如果不是同一个线程呢,那就阻塞等待吧。
然后执行代码代码块退出的时候,会执行monitorexit的指令,此时获取锁的线程就会对那个对象的monitor的计数器减1,如果有多次重入加锁就会对应多次减1,直到最后,计数器是0。
然后之前阻塞的线程,再次竞争锁,但是只有一个线程可以拿到锁。
wait¬ify 对于有锁的代码,也就离不开wait和notify的使用,在加锁的代码中,可以wait挂起线程,并释放锁,除非其他线程对同一个对象调用notify,线程被唤醒,然后再次尝试获取锁。
wait与notify,跟synchronized是同样的原理,都是monitor,对象的monitor里,除了计数器,还有一个叫wait set的变量,加锁的时候,会对对象的monitor的计数器+1,如果调用wait方法,会将当前线程的id之类的东西,加入到monitor的wait set中,如果对这个对象调用notifyAll(),就会清空wait set,并同时唤醒所有线程。
面试有个经常问的问题,wait与sleep的区别:前者释放锁,后者不释放锁
wait(),必须是有人notify唤醒他
wait(timeout),阻塞一段时间,然后自己唤醒,继续争抢锁
wait与notify,必须在synchronized代码块中使用,因为必须是拥有monitor lock的线程才可以执行wait与notify操作
因此wait与notify,必须与synchornized一起,对同一个对象进行使用,这样他们对应的monitor才是一样的
notify()与notifyall():前者就唤醒block状态的一个线程,后者唤醒block状态的所有线程
synchronized能不能保证可见性 synchronized是可以保证可见性的,那么单例模式双检锁的volatile,是做什么用的呢?看下面的解释
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class DoubleCheckSingleton { private volatile static DoubleCheckSingleton instance; private Socket socket; private DoubleCheckSingleton () { this .socket = new Socket(); } public static DoubleCheckSingleton getInstance () { if (instance == null ) { synchronized (DoubleCheckSingleton.class) { if (instance == null ) { instance = new DoubleCheckSingleton(); } } } return instance; } }
加锁的原理 加锁的原理,和Lock加锁的原理类似,只不过它是基于C++的程序来实现的。
Java对象分为对象头和实例变量两块,其中实例变量就是平时看到的对象里的那些变量数据。然后对象头包含了两块东西,一个是Mark Word(包含hashCode、锁数据、GC数据,等等),另一个是Class Metadata Address(包含了指向类的元数据的指针)。
在Mark Word里有一个指针,指向了这个对象实例关联的monitor的地址,这个monitor是c++实现的,不是java实现的。这个monitor实际上是c++实现的一个ObjectMonitor对象,里面包含了一个_owner
指针,指向了持有锁的线程。
ObjectMonitor里有一个entrylist,想要加锁的线程全部先进入这个entrylist等待获取机会尝试加锁,实际有机会加锁的线程,就会设置_owner
指针指向自己,然后对_count计数器累加1次。
各个线程尝试竞争进行加锁,此时竞争加锁是在JDK 1.6以后优化成了基于CAS来进行加锁,理解为跟之前的Lock API的加锁机制是类似的,CAS操作,操作_count
计数器,比如说将__count
值尝试从0变为1。
如果成功了,那么加锁成功了;如果失败了,那么加锁失败了。
然后释放锁的时候,先是对_count
计数器递减1,如果为0了就会设置_owner
为null,不再指向自己,代表自己彻底释放锁。
如果获取锁的线程执行wait,就会将计数器递减,同时_owner
设置为null,然后自己进入waitset中等待唤醒,别人获取了锁执行notify的时候就会唤醒waitset中的线程竞争尝试获取锁。特别在JDk 1.6之后,对synchronized内的加锁机制做了大量的优化,优化为CAS加锁。
如果把ReentrantLock底层的源码都读懂了,AQS的机制都读懂了之后,那么synchronized底层的实现差不多的,synchronized的ObjectMonitor的地位就跟ReentrantLock里的AQS是差不多的。
内存屏障 java的并发技术底层很多都对应了内存屏障的使用,包括synchronized,他底层也是依托于各种不同的内存屏障来保证可见性和有序性的。
synchronized(this) { -> monitorenter
Load内存屏障
Acquire内存屏障
int a = b;
c = 1; => synchronized代码块里面还是可能会发生指令重排
Release内存屏障
} -> monitorexit
Store内存屏障
可见性 按照可见性来划分的话,内存屏障可以分为Load屏障和Store屏障。
Load屏障的作用是执行refresh处理器缓存的操作,说白了就是对别的处理器更新过的变量,从其他处理器的高速缓存(或者主内存)加载数据到自己的高速缓存来,确保自己看到的是最新的数据。
Store屏障的作用是执行flush处理器缓存的操作,说白了就是把自己当前处理器更新的变量的值,都刷新到高速缓存(或者主内存)里去。
在monitorexit指令之后,会有一个Store屏障,让线程把自己在同步代码块里修改的变量的值都执行flush处理器缓存的操作,刷到高速缓存(或者主内存)里去;然后在monitorenter指令之后会加一个Load屏障,执行refresh处理器缓存的操作,把别的处理器修改过的最新值加载到自己高速缓存里来
所以说通过Load屏障和Store屏障,就可以让synchronized保证可见性。
有序性 按照有序性保障来划分的话,还可以分为Acquire屏障和Release屏障。
在monitorenter指令之后,Load屏障之后,会加一个Acquire屏障,这个屏障的作用是禁止读操作和读写操作之间发生指令重排序。在monitorexit指令之前,会加一个Release屏障,这个屏障的作用是禁止写操作和读写操作之间发生重排序。
所以说,通过 Acquire屏障和Release屏障,就可以让synchronzied保证有序性,只有synchronized内部的指令可以重排序,但是绝对不会跟外部的指令发生重排序。
synchronized:
(1)原子性:加锁和释放锁,ObjectMonitor
(2)可见性:加了Load屏障和Store屏障,释放锁flush数据,加锁会refresh数据
(3)有序性:Acquire屏障和Release屏障,保证同步代码块内部的指令可以重排,但是同步代码块内部的指令和外面的指令是不能重排的
锁优化 锁消除 锁消除是JIT编译器对synchronized锁做的优化,在编译的时候,JIT会通过逃逸分析技术,来分析synchronized锁对象,是不是只可能被一个线程来加锁,没有其他的线程来竞争加锁,这个时候编译就不用加入monitorenter和monitorexit的指令。
如果只有一个线程在获取锁,就可以消除这个锁了,不涉及到多个线程来竞争。
锁粗化 JIT编译器如果发现有代码里连续多次加锁释放锁的代码,会给合并为一个锁,就是锁粗化,把一个锁给搞粗了,避免频繁多次加锁释放锁。
偏向锁 monitorenter和monitorexit是要使用CAS操作加锁和释放锁的,开销较大,因此如果发现大概率只有一个线程会主要竞争一个锁,那么会给这个锁维护一个偏好(Bias),后面他加锁和释放锁,基于Bias来执行,不需要通过CAS。性能会提升很多。
但是如果有偏好之外的线程来竞争锁,就要收回之前分配的偏好。可能只有一个线程会来竞争一个锁,但是也有可能会有其他的线程来竞争这个锁,但是其他线程唉竞争锁的概率很小。如果有其他的线程来竞争这个锁,此时就会收回之前那个线程分配的那个Bias偏好。
轻量级锁 如果偏向锁没能成功实现,就是因为不同线程竞争锁太频繁了,此时就会尝试采用轻量级锁的方式来加锁,就是将对象头的Mark Word里有一个轻量级锁指针,尝试指向持有锁的线程,然后判断一下是不是自己加的锁。如果是自己加的锁,那就执行代码就好了。如果不是自己加的锁,那就是加锁失败,说明有其他人加了锁,这个时候就是升级为重量级锁。
适应性锁 这是JIT编译器对锁做的另外一个优化,如果各个线程持有锁的时间很短,那么一个线程竞争锁不到,就会暂停,发生上下文切换,让其他线程来执行。但是其他线程很快释放锁了,然后暂停的线程再次被唤醒。也就是说在这种情况下,线程会频繁的上下文切换,导致开销过大。所以对这种线程持有锁时间很短的情况,是可以采取忙等策略的,也就是一个线程没竞争到锁,进入一个while循环不停等待,不会暂停不会发生线程上下文切换,等到机会获取锁就继续执行好了。
ReentrantLock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class ReentrantLockDemo { static int i = 0 ; private static ReentrantLock lock = new ReentrantLock(); public static void main (String[] args) { new Thread() { @Override public void run () { for (int j = 0 ; j < 10 ; j++) { lock.lock(); i++; System.out.println(i); lock.unlock(); } } }.start(); new Thread() { @Override public void run () { for (int j = 0 ; j < 10 ; j++) { lock.lock(); i++; System.out.println(i); lock.unlock(); } } }.start(); } }
Java除了使用关键字synchronized外,还可以使用ReentrantLock实现独占锁的功能。而且ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。而且ReentrantLock还支持读写锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class ReentrantLockDemo { static int i = 0 ; private static ReentrantLock lock = new ReentrantLock(); public static void main (String[] args) { new Thread() { @Override public void run () { for (int j = 0 ; j < 10 ; j++) { lock.lock(); i++; System.out.println(i); lock.unlock(); } } }.start(); new Thread() { @Override public void run () { for (int j = 0 ; j < 10 ; j++) { lock.lock(); i++; System.out.println(i); lock.unlock(); } } }.start(); } }
翻一下ReentrantLock的源码,发现它是基于AbstractQueuedSynchronizer 做的,也就是我们平常说的AQS。
1 2 3 4 public ReentrantLock () { sync = new NonfairSync(); }
ReentrantLock的lock方法, 也是直接调用了sync的lock()方法,ReentrantLock在加锁的时候,就是直接基于Sync来实现的lock操作
1 2 3 public void lock () { sync.lock(); }
而Sync,是一个抽象的静态类,他继承自AbstractQueuedSynchronizer
1 2 3 abstract static class Sync extends AbstractQueuedSynchronizer { ... }
AQS AbstractQueuedSynchronizer ,抽象队列同步器。
ReentractLock和ReadWriteReentractLock,这个锁的API都是基于AQS来实现的,我们一般也没有直接用AQS的API来做开发,但是并发包里的很多类,都是基于AQS来实现的。截图看下,除了锁,还有Semaphore,CountDownLatch,ThreadPoolExecutor这些。
AQS是一个双向链表,或者说一个双端队列,原理大致如图,通过state核心变量控制是否加锁成功,并记录加锁的线程,等待锁的线程,放入队列中
lock加锁 AQS中使用了大量的CAS操作来实现加锁和释放锁的代码,AQS里有一个核心变量,state,代表了锁的状态,0就是没线程加锁,1就是有线程加锁了。
1 2 3 4 private volatile int state;
所以加锁的代码其实很简单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } }
如何实现可重入锁? acquire(1);
1 2 3 4 5 6 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
会调用到NonfairSync的tryAcquire()方法,通过不复杂的代码,实现了锁的重入,如果加锁的线程和当前线程是同一个线程,就state+1,并返回true
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
返回false以后,就要执行addWaiter,构造AQS等待队列里的等待节点了
入队等待 1 2 3 4 5 6 7 8 9 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
看一下Node的数据结构
1 2 3 4 5 6 7 8 9 10 11 12 volatile int waitStatus;volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter;
加锁失败的话,就将当前线程封装成一个Node,并且有prev何next指针,所以如果有多个处于阻塞等待状态的线程,封装后的Node组成了一个双向链表,也就成了一个队列。
接着看一下入队的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
这就是**addWaiter(Node.EXCLUSIVE)**的逻辑,构造完Node节点后,其实节点就已经入队了,接下来就再次获取锁,如果不能获取,就挂起阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
非公平锁 公平锁就是,排队,一个一个的获取到锁,非公平锁就是抢占式的。
而ReentrantLock,默认就是非公平锁,看下构造方法就知道,不同的锁,是不同是Sync类实现的。
1 2 3 4 5 6 public ReentrantLock () { sync = new NonfairSync(); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
在源码里面,判断是不是非公平锁,就一行代码。我们看一下FairSync的tryAcquire方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
release释放锁 加锁的过程看完了,接下来是释放锁,释放锁是release方法,释放锁以后修改status变量,然后唤醒队列里等待的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
那么唤醒之后呢?别忘了acquireQueued方法里,线程阻塞的地方,是一个死循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
ReentrantReadWriteLock 1 2 3 4 5 6 7 8 9 10 11 12 13 public class ReadWriteLockDemo { public static void main (String[] args) { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); lock.writeLock().lock(); lock.writeLock().unlock(); lock.readLock().lock(); lock.readLock().unlock(); } }
读写锁,读锁和写锁是分开的。
读锁和写锁是互斥的,加了读锁之后,就不能加写锁;如果加了写锁,就不能加读锁。
如果有人加了读锁之后,别人可以同时加读锁。
如果有人在读数据,就不能有人写数据,读锁 -> 写锁 -> 互斥
如果有人在写数据,别人不能写数据,写锁 -> 写锁 -> 互斥;如果有人在写数据,别人也不能读数据,写锁 -> 读锁 > 互斥
在源码里,两个方法对应两把锁,默认是非公平锁的实现。
1 2 3 4 5 6 7 8 9 10 public ReentrantReadWriteLock (boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this ); writerLock = new WriteLock(this ); } public ReentrantReadWriteLock.WriteLock writeLock () { return writerLock; }public ReentrantReadWriteLock.ReadLock readLock () { return readerLock; }
写锁 加锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; }
基本跟之前看到的是一样的,如果加写锁的话,state += 1,锁占用线程
可重入锁 如果之前加过锁了,必须是同一个线程再次加锁,实现了可重入锁
特别要注意这个高低16位的细节
在已经有锁的情况下,线程2来加锁呢,那么tryAcquire方法肯定就是返回false。
然后就会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
这里和之前加锁的逻辑,没有太大区别
释放锁 释放锁的代码,和普通的锁释放也没有什么区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) setExclusiveOwnerThread(null ); setState(nextc); return free; }
读锁 加锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); } final int fullTryAcquireShared (Thread current) { HoldCounter rh = null ; for (;;) { int c = getState(); if (exclusiveCount(c) != 0 ) { if (getExclusiveOwnerThread() != current) return -1 ; } else if (readerShouldBlock()) { if (firstReader == current) { } else { if (rh == null ) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0 ) readHolds.remove(); } } if (rh.count == 0 ) return -1 ; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null ) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1 ; } } }
释放锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 protected final boolean tryReleaseShared (int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1 ) firstReader = null ; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1 ) { readHolds.remove(); if (count <= 0 ) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
Condition Condition可以实现wait和notify的效果,需要在加锁以后,进行阻塞操作,在底层源码里也是基于AQS来实现的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class ConditionWaitDemo { private static ReentrantLock lock = new ReentrantLock(); private static Condition condition = lock.newCondition(); public static void main (String[] args) throws InterruptedException { new Thread() { @Override public void run () { lock.lock(); System.out.println("第一个线程加锁" ); System.out.println("第一个线程阻塞" ); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一个线程被唤醒" ); lock.unlock(); System.out.println("第一个线程释放锁" ); } }.start(); Thread.sleep(3000 ); new Thread() { @Override public void run () { lock.lock(); System.out.println("第二个线程加锁" ); System.out.println("第二个线程唤醒第一个线程" ); condition.signal(); lock.unlock(); System.out.println("第二个线程释放锁" ); } }.start(); } }
阻塞 加锁成功以后,将自己加入condition等待队列、释放锁、挂起自己。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); }
唤醒 signal方法的唤醒,是把condition等待队列中的元素,转化为一个加锁等待队列中的元素
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
Condition的源码, 相对来说还是比较简单。
总结一下,ReentrantLock是基于AQS来实现的,而AQS是一个双端的队列,加锁后需要在队列里排队,这里运用了大量的CAS操作来保证并发安全。读写锁分离,就是利用了state变量的高低16位,高16位,是读锁,低16位,是写锁。
Condition,是一个单独的Condition队列,在调用await挂起后,会入队,调用signal唤醒后,会入队到加锁的队列里,重新去竞争锁。