可重入锁 public class ReEnterLockDemo { static Object objectLockA = new Object(); public static void m1 () { new Thread(() -> { synchronized (objectLockA){ System.out.println(Thread.currentThread().getName()+"\t" +"------外层调用" ); synchronized (objectLockA){ System.out.println(Thread.currentThread().getName()+"\t" +"------中层调用" ); synchronized (objectLockA) { System.out.println(Thread.currentThread().getName()+"\t" +"------内层调用" ); } } } },"t1" ).start(); } public static void main (String[] args) { m1(); } }
public class ReEnterLockDemo { public synchronized void m1 () { System.out.println("=====外层" ); m2(); } public synchronized void m2 () { System.out.println("=====中层" ); m3(); } public synchronized void m3 () { System.out.println("=====内层" ); } public static void main (String[] args) { new ReEnterLockDemo().m1(); } }
LockSupport 是什么?
官方说明:https://www.apiref.com/java11-zh/java.base/java/util/concurrent/locks/LockSupport.html
LockSupport中的park()和unpark()的作用分别是阻塞线程和解除阻塞线程,相当于线程等待和唤醒机制的加强版。
3种让线程等待和唤醒的方法
方式1: 使用Object中的wait()方法让线程等待, 使用Object中的notify()方法唤醒线程
方式2: 使用JUC包中Condition的await()方法让线程等待,使用signal()方法唤醒线程
方式3: LockSupport类可以阻塞当前线程以及唤醒指定被阻塞的线程
Object类提供的等待唤醒机制的缺点 正常情况下 public class LockSupportDemo1 { static Object objectLock = new Object(); public static void main (String[] args) { new Thread(() -> { synchronized (objectLock){ System.out.println(Thread.currentThread().getName()+"\t" +"------come in" ); try { objectLock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t" +"------被唤醒" ); } },"A" ).start(); new Thread(() -> { synchronized (objectLock) { objectLock.notify(); System.out.println(Thread.currentThread().getName()+"\t" +"------通知" ); } },"B" ).start(); } }
结果:
A ------come in B ------通知 A ------被唤醒 Process finished with exit code 0
异常情况1 去掉同步代码块
public class LockSupportDemo1 { static Object objectLock = new Object(); public static void main (String[] args) { new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t" +"------come in" ); try { objectLock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t" +"------被唤醒" ); },"A" ).start(); new Thread(() -> { objectLock.notify(); System.out.println(Thread.currentThread().getName()+"\t" +"------通知" ); },"B" ).start(); } }
结果:
A ------come in Exception in thread "A" Exception in thread "B" java.lang.IllegalMonitorStateException at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at com.youth.guiguthirdquarter.AQS.LockSupportDemo1.lambda$main$0(LockSupportDemo1.java:16) at java.lang.Thread.run(Thread.java:748) java.lang.IllegalMonitorStateException at java.lang.Object.notify(Native Method) at com.youth.guiguthirdquarter.AQS.LockSupportDemo1.lambda$main$1(LockSupportDemo1.java:26) at java.lang.Thread.run(Thread.java:748) Process finished with exit code 0
报错了。
异常情况2 先唤醒,再等待。
public class LockSupportDemo1 { static Object objectLock = new Object(); public static void main (String[] args) { new Thread(() -> { try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (objectLock){ System.out.println(Thread.currentThread().getName()+"\t" +"------come in" ); try { objectLock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t" +"------被唤醒" ); } },"A" ).start(); new Thread(() -> { synchronized (objectLock) { objectLock.notify(); System.out.println(Thread.currentThread().getName()+"\t" +"------通知" ); } },"B" ).start(); } }
结果:
B ------通知 A ------come in Process finished with exit code -1
死循环,A无法被唤醒了。
这两点我们之前也说过,Object类提供的wait和notify
1、只能在synchronized同步代码块里使用
2、只能先等待(wait),再唤醒(notify)。顺序一旦错了,那个等待线程就无法被唤醒了。
Condion类提供的等待唤醒机制的缺点 缺点和Object类里的wait,notify一样。
1、只能在lock同步代码块里使用,不然就报错
2、只能先等待(await),再唤醒(signal)。顺序一旦错了,那个等待线程就无法被唤醒了。
但相对于wait,notify改进的一点是,可以绑定lock进行定向唤醒。
LockSupport的优点 有的时候我不需要进入同步代码块,我只是需要让线程阻塞,这个时候LockSupport就发挥作用了。并且还解决了之前的第二个问题,也就是等待必须在唤醒的前面。
static void park () static void unpark (Thread thread)
LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可(permit), permit只有两个值1和零,默认是零。可以把许可看成是一种(0,1)信号量(Semaphore),但与Semaphore不同的是,许可的累加上限是1。
public static void park () { UNSAFE.park(false , 0L ); }
LockSupport底层还是UNSAFE(前面讲过)。
permit默认是0,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设置为1时,park方法会被唤醒,然后会将permit再次设置为0并返回。
调用unpark(thread)方法后,就会将thread线程的许可permit设置成1(注意多次调用unpark方法,不会累加,permit值还是1)会自动唤醒thread线程,即之前阻塞中的LockSupport.park()方法会立即返回。
LockSupport和每个使用它的线程都有一个许可(permit)关联。permit相当于1,0的开关,默认是0, 调用一次unpark就将0变成1, 调用一次park会消费permit,也就是将1变成o,同时park立即返回。 如再次调用park会变成阻塞(因为permit为零了会阻塞在这里,一直到permit变为1),这时调用unpark会把permit置为1。 每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累凭证。
形象的理解 线程阻塞需要消耗凭证(permit),这个凭证最多只有1个。 当调用park方法时
如果有凭证,则会直接消耗掉这个凭证然后正常退出;
如果无凭证,就必须阻塞等待凭证可用;
而unpark则相反,它会增加一个凭证,但凭证最多只能有1个,累加无效。
我们用LockSupport来测试下之前的异常场景
异常情况1 无同步代码块
public class LockSupportDemo3 { public static void main (String[] args) { Thread t1 = new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t ----begin-时间:" + System.currentTimeMillis()); LockSupport.park(); System.out.println(Thread.currentThread().getName() + "\t ----被唤醒-时间:" + System.currentTimeMillis()); }, "t1" ); t1.start(); LockSupport.unpark(t1); System.out.println(Thread.currentThread().getName() + "\t 通知t1..." ); } }
结果:
t1 ----begin-时间:1603376148147 t1 ----被唤醒-时间:1603376148147 main 通知t1... Process finished with exit code 0
没有问题
异常情况2 先唤醒,再阻塞(等待)。
public static void main (String[] args) { Thread t1 = new Thread(() -> { try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t ----begin-时间:" + System.currentTimeMillis()); LockSupport.park(); System.out.println(Thread.currentThread().getName() + "\t ----被唤醒-时间:" + System.currentTimeMillis()); }, "t1" ); t1.start(); LockSupport.unpark(t1); System.out.println(Thread.currentThread().getName() + "\t 通知t1..." ); }
结果:
main 通知t1... t1 ----begin-时间:1603376257183 t1 ----被唤醒-时间:1603376257183 Process finished with exit code 0
可以看到,如果你先唤醒了。那么后面的LockSupport.park();
就相当于瞬间被唤醒了,不会和之前一样程序卡死。为什么呢?结合之前分析的流程
1、先执行unpark,将许可证由0变为1
2、然后park来了发现许可证此时为0(也就是有许可证),那么他就不会阻塞,马上就往后执行。同时消耗许可证(也就是将1又变为0)。
AQS AQS是什么? 字面意思: 抽象的队列同步器
技术翻译: 是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石, 通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量state
表示持有锁的状态。
AbstractOwnableSynchronizer AbstractQueuedLongSynchronizer AbstractQueuedSynchronizer
上面几个都是AQS,但是通常地: AbstractQueuedSynchronizer简称为AQS。
AQS是一个抽象的父类,可以将其理解为一个框架。基于AQS这个框架,我们可以实现多种同步器,比如下方图中的几个Java内置的同步器。同时我们也可以基于AQS框架实现我们自己的同步器以满足不同的业务场景需求。
AQS能干嘛? 加锁会导致阻塞:有阻塞就需要排队,实现排队必然需要有某种形式的队列来进行管理
1、抢到资源的线程直接使用办理业务,抢占不到资源的线程的必然涉及一种排队等候机制 ,抢占资源失败的线程继续去等待(类似办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。
2、既然说到了排队等候机制 ,那么就一定 会有某种队列形成,这样的队列是什么数据结构呢?
3、如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node) ,通过CAS、自旋以及LockSuport.park()的方式,维护state变量的状态,使并发达到同步的效果。
AQS独占模式(以ReentrantLock 源码为例) AQS结构 private transient volatile Node head;private transient volatile Node tail;private volatile int state;private transient Thread exclusiveOwnerThread;
Node类结构 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; }
Node 的数据结构其实也挺简单的,就是 thread + waitStatus + pre + next 四个属性而已,大家先要有这个概念在心里。
AQS队列基本结构
注意排队队列,不包括head(也就是后文要说的哨兵节点)。
开始 package com.youth.guiguthirdquarter.AQS;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantLock;public class AQSDemo { public static void main (String[] args) { ReentrantLock lock = new ReentrantLock(); new Thread(() -> { lock.lock(); try { System.out.println("-----A thread come in" ); try { TimeUnit.MINUTES.sleep(20 ); }catch (Exception e) {e.printStackTrace();} }finally { lock.unlock(); } },"A" ).start(); new Thread(() -> { lock.lock(); try { System.out.println("-----B thread come in" ); }finally { lock.unlock(); } },"B" ).start(); new Thread(() -> { lock.lock(); try { System.out.println("-----C thread come in" ); }finally { lock.unlock(); } },"C" ).start(); } }
以这样的一个实际例子说明。
非公平锁lock()加锁 lock() static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } } protected final void setExclusiveOwnerThread (Thread thread) { exclusiveOwnerThread = thread; }
A线程刚进来的时候,AQS的head和tail节点都还没有被初始化,则会被默认初始化为null。并且state默认初始化为0。
1、A线程进去窗口办理业务,此时state == 0,那么CAS就直接成功了,并且把sate改为1。然后调用下setExclusiveOwnerThread
,就直接结束了。【加锁成功,直接返回】
B线程
1、接着B线程去窗口办理业务,因为之前A线程把state变为了1,那么B线程在进行第一个if-CAS判断就会失败。所以就走到了else分支,调用acquire(1)
方法。
C线程
因为A线程占用着锁,C线程执行逻辑和B一样。(后续假设C进行加锁时间在B后面一点)
acquire()和tryAcquire() public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
B线程
1、B线程最终走进了nonfairTryAcquire()
方法,但是因为A还在占锁(占着处理窗口state),所以此时state为1,B线程走到else if分支进行判断。
2、B线程发现已经占有锁的线程不是自己,说明不是重入锁,也不会进入else if分支。最终返回fasle,回到tryAcquire
,准备挂起线程。
C线程
因为A线程占用着锁,C线程执行逻辑和B一样
addWaiter() private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
之前说了A线程刚进来的时候,AQS的head和tail节点都还没有被初始化,则会被默认初始化为null
B线程
1、B线程进入addWaiter()
,发现pred == null,直接进入enq()
C线程
1、【前面说了C在B后面】,C线程进来后和B不一样,因为B在后面已经设置了tail指针。那么C线程在判断的时候pred 就不是null,就直接进入了if分支
2、C在if逻辑里准备入队,进行相应设置后,变成下面这样。
enq() private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
B线程
第一轮循环
1、B线程进入enq()。首先发现t == tail 依然为null,那么就直接进入if分支。
2、进入if分支后,调用compareAndSetHead(new Node())
准备初始化head节点。注意这里传的参数是new Node()
,说明是一个空的节点(并不是我们B线程封装的节点,这个空节点只作为占位符,称作傀儡节点或者哨兵节点 ),然后将head赋值给tail。
补充:双向链表中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。 真正的第一个有数据的节点,是从第二个节点开始的。
此时队列变成了下面的样子:
3、然后if结束之后,继续空的for循环,B线程开始了第二轮循环。
第二轮循环
1、第二次循环再过来的时候,t == tail,但此时tail不再为null,所以进入else分支。
2、node.prev = t
,进入if之后,让B节点的prev指针指向t,然后compareAndSetTail(t, node)
设置尾节点
3、CAS设置尾节点成功之后,执行if里的逻辑
acquireQueued() final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
B线程
1、进入acquireQueued()
后,发现也是一个空循环。首先通过node.predecessor()
得到B节点的前一个节点P,也就是哨兵节点。
2、p == head为true。然后if里再次执行tryAcquire(arg)
拿一次锁【流程前面已经分析过了,不重复了】。因为A线程任然持有锁,所以最终结果B节点tryAcquire
失败。准备挂起线程
shouldParkAfterFailedAcquire() private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
B线程
第一次循环
1、B线程的前驱节点是哨兵节点(ws == 0), 所以最终走了else分支,执行了 compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
方法。将哨兵节点的compareAndSetWaitStatus
值变为了-1
2、返回false,返回到acquireQueued()
进行第二次循环【不再赘述】。
第二次循环
1、此时B线程的前驱节点–哨兵节点的ws == -1。那么此方法返回true,准备执行parkAndCheckInterrupt
parkAndCheckInterrupt() private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); } }
到这一步,B线程才算真正的入队坐稳了。B线程在这里阻塞,或者说挂起。
非公平锁lock()解锁 然后,就是还需要介绍下唤醒的动作了。我们知道,正常情况下,如果线程没获取到锁,线程会被 LockSupport.park(this);
挂起停止,等待被唤醒。
release()和tryRelease() public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
unparkSuccessor() private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
B线程
1、哨兵节点的后一个节点就是B节点,B节点的waitStatus == 0,所以就直接走唤醒线程那一步了。
唤醒之后 唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:
private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
返回这个方法进行第三次循环
final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
1、B线程tryAcquire()
成功之后就占有了state,也就是拿到了锁。
final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } protected final void setExclusiveOwnerThread (Thread thread) { exclusiveOwnerThread = thread; }
2、此时state那里有B线程的引用exclusiveOwnerThread
,队列里也有B线程的引用,需要把队列里的多余引用给GC掉。
3、AQS采用的是将head指向B节点成为新的哨兵节点,旧的哨兵节点因为没有任何引用指向了,慢慢就会被GC掉。
公平锁和非公平锁 看了上面的源码,这个知识点应该是可以很轻松理解的。公平锁和非公平锁在源码层次只有几处不一样。
构造 ReentrantLock 默认采用非公平锁,除非你在构造方法中传入参数 true 。
public ReentrantLock () { sync = new NonfairSync(); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
非公平锁的 lock 方法 static final class NonfairSync extends Sync { final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } } final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
公平锁的 lock 方法 static final class FairSync extends Sync { final void lock () { acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
推荐 CLH队列