服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|JavaScript|易语言|

服务器之家 - 编程语言 - Java教程 - 彻底了解java中ReentrantLock和AQS的源码

彻底了解java中ReentrantLock和AQS的源码

2021-09-02 12:39cy刘皇叔 Java教程

这篇文章主要介绍了彻底了解java中ReentrantLock和AQS的源码,想了解锁机制的同学,一定要参考下

一.前言

首先在聊ReentrantLock之前,我们需要知道整个JUC的并发同步的基石,currrent里面所有的共享变量都是由volatile修饰的,我们知道volatile的语义有2大特点,可见性以及防止重排序(内存屏障,volatie写与volatile读)
1、当第二个操作为volatile写操做时,不管第一个操作是什么(普通读写或者volatile读写),都不能进行重排序。这个规则确保volatile写之前的所有操作都不会被重排序到volatile之后;

2、当第一个操作为volatile读操作时,不管第二个操作是什么,都不能进行重排序。这个规则确保volatile读之后的所有操作都不会被重排序到volatile之前;

3、当第一个操作是volatile写操作时,第二个操作是volatile读操作,不能进行重排序。
而cas操作同时包含了volatile写/读语义,这二者的完美结合就组成了current的基石

二.ReentrantLock的基础用法

1.

  1. public class ReentrantLockText {
  2.  
  3. public static void main(String[] args) {
  4. Lock lock = new ReentrantLock();
  5.  
  6. Thread t1 = new Thread(()->{
  7. try {
  8. lock.lock();
  9. System.out.println("t1 start");
  10. TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  11. System.out.println("t1 end");
  12. } catch (InterruptedException e) {
  13. System.out.println("interrupted!");
  14. } finally {
  15. lock.unlock();
  16. }
  17. });
  18. t1.start();
  19.  
  20. Thread t2 = new Thread(()->{
  21. try {
  22. //lock.lock();
  23. lock.lockInterruptibly(); //可以对interrupt()方法做出响应
  24. System.out.println("t2 start");
  25. TimeUnit.SECONDS.sleep(5);
  26. System.out.println("t2 end");
  27. } catch (InterruptedException e) {
  28. System.out.println("interrupted!");
  29. } finally {
  30. lock.unlock();
  31. }
  32. });
  33. t2.start();
  34.  
  35. try {
  36. TimeUnit.SECONDS.sleep(1);
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. t2.interrupt(); //打断线程2的等待
  41.  
  42. }
  43. }

运行结果

彻底了解java中ReentrantLock和AQS的源码

reentrantlock用于替代synchronized

  • 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)
  • 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
  • 使用reentrantlock可以进行“尝试锁定”tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待
  • 使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应
  • 在一个线程等待锁的过程中,可以被打断

2.ReentrantLock还有一个tryLock(time),可以指定时间,如果指定时间内没有获得锁,则放弃,可以通过其返回值来决定是否继续等待

3.还有就是Condition了(我个人觉得这是最灵活的一个地方)

  1. public class Lock_condition {
  2.  
  3. public static void main(String[] args) {
  4.  
  5. char[] aI = "1234567".toCharArray();
  6. char[] aC = "ABCDEFG".toCharArray();
  7.  
  8. Lock lock = new ReentrantLock();
  9. Condition conditionT1 = lock.newCondition();
  10. Condition conditionT2 = lock.newCondition();
  11.  
  12. new Thread(()->{
  13. try {
  14. lock.lock();
  15.  
  16. for(char c : aI) {
  17. System.out.print(c);
  18. conditionT2.signal();
  19. conditionT1.await();
  20. }
  21. conditionT2.signal();
  22.  
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. } finally {
  26. lock.unlock();
  27. }
  28.  
  29. }, "t1").start();
  30.  
  31. new Thread(()->{
  32. try {
  33. lock.lock();
  34.  
  35. for(char c : aC) {
  36. System.out.print(c);
  37. conditionT1.signal();
  38. conditionT2.await();
  39. }
  40.  
  41. conditionT1.signal();
  42.  
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. } finally {
  46. lock.unlock();
  47. }
  48.  
  49. }, "t2").start();
  50. }
  51. }

彻底了解java中ReentrantLock和AQS的源码

这是condition结合lock(独占锁)的用法
condition目前只实现了独占锁,关于condition的源码理解,后续也会继续更新,暂时我们只需要知道类似object的wait与notifiy

三.原理+源码

我们现在知道了基本用法,那么我们就可以开始探究源码了

彻底了解java中ReentrantLock和AQS的源码

1.AQS
我们知道JUC里面的核心类就是AQS,那么AQS究竟是个啥东西呢?
1)先上内部类NODE源码

  1. static final class Node {
  2. static final Node SHARED = new Node();
  3. static final Node EXCLUSIVE = null;
  4. static final int CANCELLED = 1;
  5. static final int SIGNAL = -1;
  6. static final int PROPAGATE = -3;
  7. volatile int waitStatus;
  8. volatile Node next;
  9. volatile Node prev;
  10. volatile Thread thread;
  11. Node nextWaiter;

不知道大家在看到这个 Node next;Node prev;的时候是啥感觉,反正我当时是激动坏了,这不就是一个双向链表嘛,
再看volatile Thread thread;这个属性,这是一个管理线程的双向链表,换句话说就是将线程打包成立节点放入AQS的链表中
基础的结构清楚之后。
SHARED 与EXCLUSIVE 代表是独占节点还是共享节点
2)再上AQS属性源码

  1. private transient volatile Node head;
  2. private transient volatile Node tail;
  3. private volatile int state;

Node0 head与tail不用说,这是来管理节点的,
这里我们要核心介绍一个属性是state,这也是AQS这个类的灵魂,
1.再独占锁中这个state是1或者0,(如果大于1则表示锁重入,这个稍后会有源码分析)
2在共享锁中代表还有多少共享锁资源,
3.在读写锁中,高16位代表写锁是否被占用,低16位代表有多少读锁,
4.在CountDownLatch中,通过构造参数代码门闸剩余个数
5.在Semaphore中,同样通过构造参数代表信号灯个数

2.ReentrantLock获取锁源码(独占锁)
首先公平锁与非公平锁,分别继承与Sync
FairSync NoFairSync ,默认是非公平锁,可以在构造方法上指定

  1. ReentrantLock lock = new ReentrantLock(true);//ture则是公平锁

公平锁故名思意,在AQS中管理着一个线程队列,如果这时候 有一个线程过来抢这把锁,如果是公平锁,那么会判断队列是不是存在不同与当前线程的等待队列(FIFO),如果存在则去排队,非公平锁则是直接去排队
(2.1.非公平锁获取锁)

  1. final void lock() {
  2. if (compareAndSetState(0, 1))//cas原子操作尝试去修改值,如果修改成功说明成功获取到了锁
  3. setExclusiveOwnerThread(Thread.currentThread());
  4. else
  5. acquire(1);
  6. }

首先cas原子操作尝试去修改值,如果修改成功说明成功获取到了锁,进入setExclusiveOwnerThread()方法

  1. protected final void setExclusiveOwnerThread(Thread thread) {
  2. exclusiveOwnerThread = thread;
  3. }

将当前线程记录,实现偏向锁,一行代码便完美实现了偏向锁!!

如果失败则,调用acquire();这个方法调用的实际上是子类的nonfairTryAcquire方法

  1. final boolean nonfairTryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. if (compareAndSetState(0, acquires)) {
  6. setExclusiveOwnerThread(current);
  7. return true;
  8. }
  9. }
  10. else if (current == getExclusiveOwnerThread()) {
  11. int nextc = c + acquires;
  12. if (nextc < 0) // overflow
  13. throw new Error("Maximum lock count exceeded");
  14. setState(nextc);
  15. return true;
  16. }
  17. return false;
  18. }

首先,获取state的值,判断是否为0,如果为0,则说明锁没有被占有,(可能是刚刚被释放)那么cas操作开始尝试获取锁,
**(注意注意注意)**重要的事情说三遍,这里仅仅尝试获取一次,没有自旋!!这是独占锁与共享锁的区别之一,因为如果state>=0(对于共享锁来说state代表剩余数量),那么共享锁会不断尝试自旋获取锁,之道state<0,因为只要》0那么就可能共享到锁
接下来的else if就是重入锁的操作了,判断当先线程是不是记录的线程,如果是,每次重入state+1,如果不是就返回flase,直接拜拜
(2.2公平锁获取锁)
刚刚介绍了公平锁的意义,所有直接上源码,公平锁比非公平锁多了一个公平判断

  1. public final boolean hasQueuedPredecessors() {
  2. // The correctness of this depends on head being initialized
  3. // before tail and on head.next being accurate if the current
  4. // thread is first in queue.
  5. Node t = tail; // Read fields in reverse initialization order
  6. Node h = head;
  7. Node s;
  8. return h != t &&
  9. ((s = h.next) == null || s.thread != Thread.currentThread());
  10. }

我们可以看到hasQueuedPredecessors是用来判断队列是否有不同于当前线程的节点等待,这里重点讨论一个情况,
h != t返回true,(s = h.next) == null返回true
首先可以知道队列中有2个节点,但是头节点没有后继结点,在这里列举一种情况,有另一个线程已经执行到初始化队列的操作了,介于compareAndSetHead(new Node())与tail = head之间,也就是之后说的enq自旋方法,请继续往下看

继续如果非公平锁没有获取到锁,那么会调用acquireQueued和addwaiter方法

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

首先来看addwaiter方法

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);
  3. // Try the fast path of enq; backup to full enq on failure
  4. Node pred = tail;
  5. if (pred != null) {
  6. node.prev = pred;
  7. if (compareAndSetTail(pred, node)) {
  8. pred.next = node;
  9. return node;
  10. }
  11. }
  12. enq(node);
  13. return node;
  14. }

首先将,当前线程封装成一个,Node,然后通过判断尾结点是不是空的方式,判断队列是不是空的,如果存在尾结点,那么直接进先驱后继的改造,放入双向链表,完成链表结构,那么如果是空的呢?这么调用了一个enq的自旋方法

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. if (t == null) { // Must initialize
  5. if (compareAndSetHead(new Node()))
  6. tail = head;
  7. } else {
  8. node.prev = t;
  9. if (compareAndSetTail(t, node)) {
  10. t.next = node;
  11. return t;
  12. }
  13. }
  14. }
  15. }

我们可以看到这个一个自旋方法,第一次循环:t为null,那么cas就new出来一个新结点,头尾都指向这个新结点,
第二次循环,将传进来的这个线程结点的前驱指向刚刚new的这个结点,然后cas操作进行,将这个线程结点替换为尾部结点,然后head后继指向线程结点,返回head
经过二次循环,得到了一个由2个节点组成的队列,head-》node,head是假节点(里面不包括线程为null),node才是真正的线程结点(addwrite封装好的传进来的线程结点)
问题1.为什么一定要用cas操作,因为防止别的线程修改了该队列

好,现在我们继续,看acquireQueued

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

这是一个最最核心的方法,从队列中取线程
首先这是一个自旋,判断该节点的前驱节点是不是head,因为(FIFO)先进先出队列。
如果不是直接拜拜进入shouldParkAfterFailedAcquire,继续上源码

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus;
  3. if (ws == Node.SIGNAL)
  4. /*
  5. * This node has already set status asking a release
  6. * to signal it, so it can safely park.
  7. */
  8. return true;
  9. if (ws > 0) {
  10. /*
  11. * Predecessor was cancelled. Skip over predecessors and
  12. * indicate retry.
  13. */
  14. do {
  15. node.prev = pred = pred.prev;
  16. } while (pred.waitStatus > 0);
  17. pred.next = node;
  18. } else {
  19. /*
  20. * waitStatus must be 0 or PROPAGATE. Indicate that we
  21. * need a signal, but don't park yet. Caller will need to
  22. * retry to make sure it cannot acquire before parking.
  23. */
  24. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  25. }
  26. return false;
  27. }

static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

  1. CANCELLED:因为超时或者中断,结点会被设置为取消状态,被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列,被GC回收;
  2. SIGNAL:表示这个结点的继任结点被阻塞了,到时需要通知它;
  3. CONDITION:表示这个结点在条件队列中,因为等待某个条件而被阻塞;
  4. PROPAGATE:使用在共享模式头结点有可能牌处于这种状态,表示锁的下一次获取可以无条件传播;
  5. 0:None of the above,新结点会处于这种状态。

首先说明一下waitStatus这个属性,为什么之前不提呢,因为之前没有对waitStatus进行操作,我们在new节点与封装结点的时候没有考虑这个属性,所以我们现在当成一个新属性,值为0来看待,

shouldParkAfterFailedAcquire这个代码的逻辑意义是说明呢?
如果是SIGNAL那么,直接ture,
如果大于0,则是CANCELLED,被取消了,直接剔除队列
如果都不是,那么将其前驱结点设为SIGNAL,也就是可以安心睡觉了,定好闹钟了,可以被等着唤醒了,
我们现在很明显是第三种,因为之前啥都没干,就是0,
所以本来状态

彻底了解java中ReentrantLock和AQS的源码

进行shouldParkAfterFailedAcquire之后,
那么现在链表中,对线程1的前驱设闹钟,0变成-1

彻底了解java中ReentrantLock和AQS的源码

假设这时候又来了个线程2,那么同理,对线程而的先驱设闹钟0变成-1

彻底了解java中ReentrantLock和AQS的源码

在调用了shouldParkAfterFailedAcquire()之后,调用parkAndCheckInterrupt方法用于阻塞,

这里提一下,关于parkAndCheckInterrupt,lock里面用于阻塞都是基于lockSupper.park()与lockSupper.unpark()完成了,而lockSupper调用的又是unsafe这个类,我们知道java是基于jvm实现的,并不能和c++一样直接对os进行操作,所以jvm给我们提供了一个梯子,这个梯子就是unsafe这个类,直接将线程的的交个操作系统阻塞

四,释放锁

终于到了释放锁,独占锁的释放锁的逻辑相对与共享锁来说比较简单,后续我也会继续更新共享锁的源码

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

首先我们来看tryRelease方法,

  1. protected final boolean tryRelease(int releases) {
  2. int c = getState() - releases;
  3. if (Thread.currentThread() != getExclusiveOwnerThread())
  4. throw new IllegalMonitorStateException();
  5. boolean free = false;
  6. if (c == 0) {
  7. free = true;
  8. setExclusiveOwnerThread(null);
  9. }
  10. setState(c);
  11. return free;
  12. }

首先获取,int c = getState() - releases;这里可能c是>0的,因为独占锁的重入锁(上面以及说明了独占锁的重入的源码操作),所以有可能是需要进行多次解锁的,继续
判断当前线程是不是独占线程,如果不是则报IllegalMonitorStateException异常
一直解锁到c=0的时候,那么线程已经解锁,则设setExclusiveOwnerThread=null
设置当前独占线程为null,然后设置state为0

继续回到release,如果头节点不是null而且h.waitStatus != 0,说明是-1,说明设置闹钟了,需要唤醒aqs队列中的阻塞结点,调用的是unparkSuccessor方法,继续看源码

  1. private void unparkSuccessor(Node node) {
  2. /*
  3. * If status is negative (i.e., possibly needing signal) try
  4. * to clear in anticipation of signalling. It is OK if this
  5. * fails or if status is changed by waiting thread.
  6. */
  7. int ws = node.waitStatus;
  8. if (ws < 0)
  9. compareAndSetWaitStatus(node, ws, 0);
  10.  
  11. /*
  12. * Thread to unpark is held in successor, which is normally
  13. * just the next node. But if cancelled or apparently null,
  14. * traverse backwards from tail to find the actual
  15. * non-cancelled successor.
  16. */
  17. Node s = node.next;
  18. if (s == null || s.waitStatus > 0) {
  19. s = null;
  20. for (Node t = tail; t != null && t != node; t = t.prev)
  21. if (t.waitStatus <= 0)
  22. s = t;
  23. }
  24. if (s != null)
  25. LockSupport.unpark(s.thread);
  26. }

这里首先明确,这里传进来的node是啥?是头节点!!!!,一定要明确这个,博主就是因为刚开始没明确这个,半天没明白,因为唤醒一定是唤醒头节点之后的waitStatus不为1的结点

首先判断头节点,如果是-1则设置为0,这个中间状态,表示有结点被唤醒了,
然后拿到,head的后继节点,进行判断,如何是null或者waitStatus >0,(就是1,CANCELLED,代表被取消了),这个时候从尾部开始遍历,剔除waitStatus >0的节点,找到第一个waitStatus <=0的节点,用LockSupport.unpark(s.thread);将其唤醒,唤醒之后的线程回到acquireQueued方法中,

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. //被阻塞的线程,被唤醒后在进行循环,
  7. //然后通过return interrupted;
  8. final Node p = node.predecessor();
  9. if (p == head && tryAcquire(arg)) {
  10. setHead(node);
  11. p.next = null; // help GC
  12. failed = false;
  13. return interrupted;
  14. }
  15. if (shouldParkAfterFailedAcquire(p, node) &&
  16. parkAndCheckInterrupt())
  17. interrupted = true;
  18. }
  19. } finally {
  20. if (failed)
  21. cancelAcquire(node);
  22. }
  23. }

首先,将该结点设置为head,然后将老head指向null,帮助gc回收,然后return返回,至此线程自由
那么此时该队列为

彻底了解java中ReentrantLock和AQS的源码

五,总结

写了很久,如果有啥不对的地方,欢迎大家指正

以上就是一篇彻底看懂ReentrantLock,AQS的源码的详细内容,更多关于一篇彻底看懂ReentrantLock,AQS的源码的资料请关注服务器之家其它相关文章!

原文链接:https://blog.csdn.net/weixin_44213940/article/details/115432633

延伸 · 阅读

精彩推荐