Java中的Semaphore源码分析

 更新时间:2023年11月20日 09:59:00   作者:进击的猫  
这篇文章主要介绍了Java中的Semaphore源码分析,Semaphore是一个访问公共资源的线程数量如限流、停车等,它是一个基于AQS实现的共享锁,主要是通过控制state变量来实现,需要的朋友可以参考下

(一)概念简介

Semaphore是一个访问公共资源的线程数量如限流、停车等,它是一个基于AQS实现的共享锁,主要是通过控制state变量来实现。

其内部结构关系为:Semaphore内部是通过一个内部核心成员变量sync去调用AQS中父类的方法,NoneFairSync/FairSync继承于内部类Sync,Sync继承于AQS,在使用Semaphore构造方法进行实例化时可指定公平或非公平,其内部主要是靠acquire和release方法进行阻塞或释放。

(二)使用场景

当主线程进行执行时,利用构造方法初始化一个公平或非公平的线程访问总数,子线程调用acquire尝试获取访问资源即访问令牌,待到线程访问总数不够分配即分配出现负数时则进行阻塞,当其他占用资源被释放时,会调用release方法进行唤醒阻塞中的线程。

(1)经典停车位或餐厅,因位置有限无法容纳更多。

(2)数据库连接数限制或控制系统并发如限流;

(三)特点

(1)子线程调用acquire方法去资源总数中分配,如果分配成功则不会阻塞,否则会被阻塞,等待被唤醒;

(2)当一个子线程任务执行结束,会通过release方法去唤醒阻塞中的线程。

Semaphore简单使用

public static void main(String[] args) {
    Semaphore semaphore = new Semaphore(5);
    for (int i = 0; i < 10; i++){
        int num = i;
        new Thread(()->{
            System.out.println("线程"+num+"初始化!");
            try {
                semaphore.acquire();
                System.out.println("线程"+num+"拿到了锁执行权!");
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("------------------线程"+num+"被唤醒执行!-----------------");
            semaphore.release();
        }).start();
    }
    System.out.println("main 线程执行!");
}

(四)Semaphore源码分析

(1)构造函数

 /**
  * Semaphore一个参数值容量,使用非公平NonfairSync类去实例化Sync继承的AQS中的原子变量state值
  */
  public Semaphore(int permits) {
      sync = new NonfairSync(permits);
  }
  /**
  * Semaphore采用两个参数,一个容量和是否使用公平或非公平标记来初始化Sync继承AQS中的原子变量state值
  */
  public Semaphore(int permits, boolean fair) {
      sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  }

(2)acquire方法(核心)

/**
* 暴露给外部用于是否阻塞调用的API
* 其内部是Semaphore内部类(Sync)变量sync去调用acquireSharedInterruptibly
*/
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

/**
* AQS类中确定获取共享锁和是否阻塞
*/
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())//判断执行线程是否有中断标记
        throw new InterruptedException();
    /**
    * AQS定义了共享锁方法并强制子类去重写该方法(模板模式)
    * 由其子类NonfairSync继承内部类Sync,Sync继承于AQS类,最终由NoneFairSync去重写
    * NoneFairSync又调用了父类Sync中的nonfairTryAcquireShared方法确定是否需要阻塞
    */
    if (tryAcquireShared(arg) < 0)//尝试获取共享锁
        doAcquireSharedInterruptibly(arg);//在未标记中断前提下,是否需要真正阻塞
}
NoneFairSync类:继承于Semaphore中的内部类Sync
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
Sync类:
//自旋的模式对state进行-1并返回state-1后的值
final int nonfairTryAcquireShared(int acquires) {
    /**
    * (1)自旋保证本次对state进行-1
    * (2)state-1后的值小于0的情况下对state进行CAS设置新的值
    */
    for (;;) {
        int available = getState();//获取state值
        int remaining = available - acquires;//当前线程对其-1
        //如果state-1后的值小于0代表无容量可用
        //如果state-1后的值大于等于0,则代表共享锁还有位置可用,并设置新的state值
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;//返回state-1后的值
    }
}

(3)doAcquireSharedInterruptibly方法(核心)

/**
 * AQS中定义的是否阻塞方法
 * 如果在自旋过程中获取到了共享锁,则不进行阻塞,这也是非公平的原因之一
 * 如果在自旋中未获取到共享锁则shouldParkAfterFailedAcquire进行更改等待状态
 * 在成功改变等待线程信号量后再调用parkAndCheckInterrupt是否阻塞
 * 上述的parkAndCheckInterrupt在被唤醒之前的一段时间内,如果存在中断标记则会抛出异常,否则正常执行
 */
 private void doAcquireSharedInterruptibly(int arg)
         throws InterruptedException {
     final Node node = addWaiter(Node.SHARED);//以共享锁模式创建队列
     boolean failed = true;
     try {
         for (;;) {//自旋获取共享锁或阻塞
             final Node p = node.predecessor();//获取当前线程的前继节点
             if (p == head) {//前继节点是否为头节点
                 int r = tryAcquireShared(arg);//尝试再次获取共享锁
                 if (r >= 0) {//获取到了共享锁
                     setHeadAndPropagate(node, r);//设置新的头节点和唤醒后继节点
                     p.next = null; // help GC
                     failed = false;//不需要执行娄底的cancelAcquire方法
                     return;
                 }
             }
             /**
             * shouldParkAfterFailedAcquire改变前继节点的等待状态信号量
             * parkAndCheckInterrupt真正阻塞该线程,使用LockSupport的park方法
             */
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())//改变等前继节点的等待状态信号量和阻塞该线程
                 throw new InterruptedException();
         }
     } finally {
         if (failed)//是否需要执行保底方法
         //当该线程有中断标记或不需要阻塞时发生异常取消获取锁和过滤超时节点
             cancelAcquire(node);
     }
 }

 private void setHeadAndPropagate(Node node, int propagate) {
   Node h = head; // Record old head for check below
   setHead(node);
   /**
    * propagate > 0表示可以尝试唤醒node结点的后继结点,可能是-3那个线程进行导致其大于0
    * (h = head) == null || h.waitStatus < 0(此时的h被重新赋值)可能会引起没必要的唤醒操作
    * 比如线程A任务结束后释放许可,但是线程B任务还没结束,此时线程C获取到许可走到这里
    * 执行完上面的setHead后,然后 h = head 即h执行线程C的结点,
    * 而线程C对应的结点的 waitStatus = SIGNAL,所以也会执行doReleaseShared唤醒线程D
    * 线程D唤醒后接着去执行 doAcquireSharedInterruptibly 中的for循环,
    * 执行tryAcquireShared去拿许可证的时候发现是小于0,后面继续走挂起方法去挂起线程D
    */
   if (propagate > 0 || h == null || h.waitStatus < 0 ||
       (h = head) == null || h.waitStatus < 0) {
       Node s = node.next;
       if (s == null || s.isShared())
           doReleaseShared();//唤醒等待中的线程
   }
 }

(4)release方法(核心)

/**
 * 暴露给外部调用释放API
 * Semaphore内部核心成员变量sync调用AQS中的releaseShared方法进行释放唤醒
 */
 public void release() {
      sync.releaseShared(1);//调用AQS的释放并唤醒方法
  }
  AQS类:
  public final boolean releaseShared(int arg) {
      if (tryReleaseShared(arg)) {//强制子类Sync重写该方法,模板模式
          doReleaseShared();//调用AQS自身释放共享锁
          return true;
      }
      return false;
  }
  Sync类:强制重写父类AQS中的尝试释放锁方法
  protected final boolean tryReleaseShared(int releases) {
      for (;;) {//自旋操作来唤醒
          int current = getState();//获取state值
          int next = current + releases;//将state值+1
          if (next < current) //Integer.MAX是否达到最大,防止溢出如最大数+1变为负数
              throw new Error("Maximum permit count exceeded");
          if (compareAndSetState(current, next))//CAS改变state值
              return true;
      }
  }

(5)doReleaseShared方法(核心)

/**
 * 尝试唤醒等待线程
 * (1)正常唤醒头结点的后继节点线程
 * (2)可能伴随着多个共享锁被释放,为了防止空闲许可浪费,会唤醒头节点的后继节点的后继节点
 */
 private void doReleaseShared() {
     for (;;) {
         Node h = head;//获取队列中的头结点
         if (h != null && h != tail) {//判断队列中是否还有等待线程
             int ws = h.waitStatus;//获取头结点的等待状态
             if (ws == Node.SIGNAL) {//可唤醒状态
                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//将头结点等待状态设置为0
                     continue;//改变头结点等待状态失败,则跳过本次操作,自旋再次设置
                 unparkSuccessor(h);//在可唤醒状态下且改变头节点等待状态成功的前提下进行唤醒后继节点
             }
             /**
             * ws等于0,然后设置waitStatus为Node.PROPAGATE,表示在自旋过程同时有多个锁都在被释放
             * 线程A执行上面的cas操作将头结点等待状态设置为0
             * 此时的线程B刚好在执行上面if不满足时则执行else if逻辑,将头结点状态设置为-3,
             * 主要是方便被唤醒线程自旋执行doAcquireSharedInterruptibly中的setHeadAndPropagate方法
             * 即表示要唤醒head后继结点的后继结点
             * waitStatus=PROPAGATE就表示要唤醒head后继结点的后继结点
             * 线程A和B都释放许可了,如果有多个等待线程在等待唤醒,在setHeadAndPropagate方法中会有逻辑判断
             * 防止已经有2张许可却只有线程C拿到许可,线程D还在傻乎乎的等线程C释放许可来唤醒线程D
             else if (ws == 0 &&
                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                 continue;//CAS设置失败,自旋重新设置
         }
         if (h == head)//结束死循环
             break;
     }
 }

到此这篇关于Java中的Semaphore源码分析的文章就介绍到这了,更多相关Semaphore源码分析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java编程线程间通信与信号量代码示例

    Java编程线程间通信与信号量代码示例

    这篇文章主要介绍了Java编程线程间通信与信号量代码示例,具有一定借鉴价值,需要的朋友可以参考下。
    2017-12-12
  • SpringCache之 @CachePut的使用

    SpringCache之 @CachePut的使用

    这篇文章主要介绍了SpringCache之 @CachePut的使用,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-02-02
  • springboot打包部署到linux服务器的方法

    springboot打包部署到linux服务器的方法

    这篇文章主要介绍了springboot打包部署到linux服务器的方法,通过实例代码相结合的形式给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-06-06
  • Java中的Lombok使用及工作原理详解

    Java中的Lombok使用及工作原理详解

    这篇文章主要介绍了Java中的Lombok使用及工作原理详解,Lombok是一个Java库,能自动插入编辑器并构建工具,简化Java开发,通过添加注解的方式,不需要为类编写getter或eques方法,同时可以自动化日志变量,需要的朋友可以参考下
    2023-10-10
  • Springboot中整合knife4j接口文档的过程详解

    Springboot中整合knife4j接口文档的过程详解

    knife4j就swagger的升级版API文档的一个框架,但是用起来比swagger方便多了,UI更加丰富,这篇文章主要介绍了Springboot中整合knife4j接口文档,需要的朋友可以参考下
    2022-04-04
  • java实现左旋转字符串

    java实现左旋转字符串

    这篇文章主要为大家详细介绍了java实现左旋转字符串,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-03-03
  • 详解Java从工厂方法模式到 IOC/DI思想

    详解Java从工厂方法模式到 IOC/DI思想

    工厂方法(Factory Method)模式的意义是定义一个创建产品对象的工厂接口,将实际创建工作推迟到子类当中。核心工厂类不再负责产品的创建,这样核心类成为一个抽象工厂角色,仅负责具体工厂子类必须实现的接口。本文将详细介绍Java从工厂方法模式到 IOC/DI思想。
    2021-06-06
  • java项目中classpath的理解

    java项目中classpath的理解

    这篇文章介绍了java项目中classpath的理解,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-12-12
  • Hibernate管理Session和批量操作分析

    Hibernate管理Session和批量操作分析

    这篇文章主要介绍了Hibernate管理Session和批量操作的技巧,包括Hibernate管理Session、批量处理数据等的常用技巧及注意事项,具有一定的参考借鉴价值,需要的朋友可以参考下
    2014-12-12
  • springcloud整合到项目中无法启动报错Failed to start bean 'eurekaAutoServiceRegistration'

    springcloud整合到项目中无法启动报错Failed to start bean&n

    这篇文章主要介绍了springcloud整合到项目中无法启动报错Failed to start bean 'eurekaAutoServiceRegistration'问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01

最新评论