详解Java 信号量Semaphore

 更新时间:2020年09月14日 11:16:07   作者:java小新人  
这篇文章主要介绍了Java 信号量Semaphore的相关资料,帮助大家更好的理解和学习Java并发,感兴趣的朋友可以了解下

  Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数;

一.简单使用

  同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行:

package com.example.demo.study;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class Study0217 {
  //创建一个信号量的实例,信号量初始值为0
  static Semaphore semaphore = new Semaphore(0);
  
  public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    pool.submit(()->{
      System.out.println("Thread1---start");
      //信号量加一
      semaphore.release();
    });
    
    pool.submit(()->{
      System.out.println("Thread2---start");
      //信号量加一
      semaphore.release();
    });
    pool.submit(()->{
      System.out.println("Thread3---start");
      //信号量加一
      semaphore.release();
    });
    //等待两个子线程执行完毕就放过,必须要信号量等于2才放过
    semaphore.acquire(2);
    System.out.println("两个子线程执行完毕");
    
    //关闭线程池,正在执行的任务继续执行
    pool.shutdown();

  }

}

这个信号量也可以复用,类似CyclicBarrier:

package com.example.demo.study;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class Study0217 {
  //创建一个信号量的实例,信号量初始值为0
  static Semaphore semaphore = new Semaphore(0);
  
  public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    pool.submit(()->{
      System.out.println("Thread1---start");
      //信号量加一
      semaphore.release();
    });
    
    pool.submit(()->{
      System.out.println("Thread2---start");
      //信号量加一
      semaphore.release();
    });
    
    //等待两个子线程执行完毕就放过,必须要信号量等于2才放过
    semaphore.acquire(2);
    System.out.println("子线程1,2执行完毕");
    
    pool.submit(()->{
      System.out.println("Thread3---start");
      //信号量加一
      semaphore.release();
    });
    pool.submit(()->{
      System.out.println("Thread4---start");
      //信号量加一
      semaphore.release();
    });
    
    semaphore.acquire(2);
    System.out.println("子线程3,4执行完毕");
    
    //关闭线程池,正在执行的任务继续执行
    pool.shutdown();

  }

}

二.信号量原理 

  看看下面这个图,可以知道信号量Semaphore还是根据AQS实现的,内部有个Sync工具类操作AQS,还分为公平策略和非公平策略;

构造器:

//默认是非公平策略
public Semaphore(int permits) {
  sync = new NonfairSync(permits);
}
//可以根据第二个参数选择是公平策略还是非公平策略
public Semaphore(int permits, boolean fair) {
  sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

acquire(int permits)方法:

public void acquire(int permits) throws InterruptedException {
  if (permits < 0) throw new IllegalArgumentException();
  sync.acquireSharedInterruptibly(permits);
}

//AQS中的方法
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
  if (Thread.interrupted()) throw new InterruptedException();
  //这里根据子类是公平策略还是非公平策略
  if (tryAcquireShared(arg) < 0)
    //获取失败会进入这里,将线程放入阻塞队列,然后再尝试,还是失败的话就调用park方法挂起当前线程
    doAcquireSharedInterruptibly(arg);
}
//非公平策略
protected int tryAcquireShared(int acquires) {
  return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
  //一个无限循环,获取state剩余的信号量,因为每调用一次release()方法的话,信号量就会加一,这里将
  //最新的信号量减去传进来的参数比较,比如有两个线程,其中一个线程已经调用了release方法,然后调用acquire(2)方法,那么
  //这里remaining的值就是-1,返回-1,然后当前线程就会被丢到阻塞队列中去了;如果另外一个线程也调用了release方法,
  //那么此时的remaining==0,所以在这里的if中会调用CAS将0设置到state
  //
  for (;;) {
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 || compareAndSetState(available, remaining))
      return remaining;
  }
}
//公平策略
//和上面非公平差不多,只不过这里会查看阻塞队列中当前节点前面有没有前驱节点,有的话直接返回-1,
//就会把当前线程丢到阻塞队列中阻塞去了,没有前驱节点的话,就跟非公平模式一样的了
protected int tryAcquireShared(int acquires) {
  for (;;) {
    if (hasQueuedPredecessors())
      return -1;
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 ||compareAndSetState(available, remaining))
      return remaining;
  }
}

再看看release(int permits)方法:

//这个方法的作用就是将信号量加一
public void release(int permits) {
  if (permits < 0) throw new IllegalArgumentException();
  sync.releaseShared(permits);
}
//AQS中方法
public final boolean releaseShared(int arg) {
  //tryReleaseShared尝试释放资源
  if (tryReleaseShared(arg)) {
    //释放资源成功就调用park方法唤醒唤醒AQS队列中最前面的节点中的线程
    doReleaseShared();
    return true;
  }
  return false;
}

protected final boolean tryReleaseShared(int releases) {
  //一个无限循环,获取state,然后加上传进去的参数,如果新的state的值小于旧的state,说明已经超过了state的最大值,溢出了
  //没有溢出的话,就用CAS更新state的值
  for (;;) {
    int current = getState();
    int next = current + releases;
    if (next < current) // overflow
      throw new Error("Maximum permit count exceeded");
    if (compareAndSetState(current, next))
      return true;
  }
}

private void doReleaseShared() {
  
  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      //ws==Node.SIGNAL表示节点中线程需要被唤醒
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;      // loop to recheck cases
        //调用阻塞队列中线程的unpark方法唤醒线程
        unparkSuccessor(h);
      }
      //ws == 0表示节点中线程是初始状态
      else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;        // loop on failed CAS
    }
    
    if (h == head)          // loop if head changed
      break;
  }
}

  以最上面的例子简单说一下,其实不是很难,首先线程1和线程2分别去调用release方法,这个方法里面会将AQS中的state加一,但是在执行这个操作之前,主线程肯定会先到acquire(2),在这个方法里面,假如默认使用非公平策略,首先获取当前的信号量state(state的初始值是0),用当前信号量减去2,如果小于0,那么当前主线程就会丢到AQS队列中阻塞;

  这个时候线程1的release方法执行了,于是就把信号量state加一(此时state==1),CAS更新state为一,成功的话,就调用doReleaseShared()方法唤醒AQS阻塞队列中最先挂起的线程(这里就是因为调用acquire方法而阻塞的主线程),主线程唤醒之后又会去获取最新的信号量,与2比较,发现还是小于0,于是又会阻塞;

  线程2此时的release方法执行完成,重复线程一的操作,主线程唤醒之后(此时state==2),又去获取最新的信号量发现是2,减去acquire方法的参数2等于0,于是就用CAS更新state的值,然后acquire方法也就执行完毕,主线程继续执行后面的代码;

  其实信号量还是很有意思的,记得在项目里,有人利用信号量实现了一个故障隔离,什么时候我可以把整理之后的代码贴出来分享一下,还是很有意思的,就跟springcloud的熔断机制差不多,场景是:比如你在service的一个方法调用第三方的接口,你不知道调不调得通,而且你不希望每次前端过来都会去调用,比如当调用失败的次数超过100次,那么五分钟之后才会再去实际调用这个第三方服务!这五分钟内前调用这个服务,就会触发我们这个故障隔离的机制,向前端返回一个特定的错误码和错误信息!

以上就是详解Java 信号量Semaphore的详细内容,更多关于Java 信号量Semaphore的资料请关注脚本之家其它相关文章!

相关文章

  • 解决Java项目中request流只能获取一次的问题

    解决Java项目中request流只能获取一次的问题

    Java项目开发中可能存在以下几种情况,你需要在拦截器中统一拦截请求和你项目里可能需要搞一个统一的异常处理器,这两种情况是比较常见的,本文将给大家介绍如何解决Java项目中request流只能获取一次的问题,需要的朋友可以参考下
    2024-02-02
  • 如何使用java给局域网的电脑发送开机数据包

    如何使用java给局域网的电脑发送开机数据包

    这篇文章主要为大家详细介绍了如何使用java给局域网的电脑发送开机数据包,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2025-09-09
  • 看过就懂的java零拷贝及实现方式详解

    看过就懂的java零拷贝及实现方式详解

    这篇文章主要为大家详细的介绍了什么是零拷贝,传统的IO执行流程,零拷贝相关的知识点回顾,零拷贝实现的几种方式及java提供的零拷贝方式相关内容,有需要的朋友可以借鉴参考下
    2022-01-01
  • Java中实体与Map之间的相互转换代码示例

    Java中实体与Map之间的相互转换代码示例

    生活中经常用到map数据与实体类的转换,下面这篇文章主要给大家介绍了关于Java中实体与Map之间相互转换的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2023-12-12
  • 在Java中基于Geotools对PostGIS数据库的空间查询实践教程

    在Java中基于Geotools对PostGIS数据库的空间查询实践教程

    本文将深入探讨这一实践,从连接配置到复杂空间查询操作,包括点查询、区域范围查询以及空间关系判断等,全方位展示如何在 Java 环境下借助 Geotools 驾驭 PostGIS 数据库,实现高效精准的空间数据检索,为相关领域开发者提供实用的技术路径,助力空间数据应用的创新拓展
    2025-05-05
  • Eclipse+Maven构建Hadoop项目的方法步骤

    Eclipse+Maven构建Hadoop项目的方法步骤

    这篇文章主要介绍了Eclipse+Maven构建Hadoop项目的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-02-02
  • JDK21无法导入TimeUnit类的解决办法

    JDK21无法导入TimeUnit类的解决办法

    这篇文章主要给大家介绍了关于JDK21无法导入TimeUnit类的解决办法,TimeUnit是java.util.concurrent包下面的一个类,TimeUnit提供了可读性更好的线程暂停操作,通常用来替换Thread.sleep(),需要的朋友可以参考下
    2024-01-01
  • SpringBoot快速通关自动配置应用

    SpringBoot快速通关自动配置应用

    在进行项目编写前,我们还需要知道一个东西,就是SpringBoot对我们的SpringMVC还做了哪些配置,包括如何扩展,如何定制,只有把这些都搞清楚了,我们在之后使用才会更加得心应手
    2022-07-07
  • 一次线上websocket返回400问题排查的实战记录

    一次线上websocket返回400问题排查的实战记录

    最近项目中有端对端通信场景,实时性要求较高,考虑后选用了websocket 这一通信协议,下面这篇文章主要给大家介绍了一次线上websocket返回400问题排查的实战记录,需要的朋友可以参考下
    2022-04-04
  • 关于java.lang.IllegalArgumentException异常的正确解决方法

    关于java.lang.IllegalArgumentException异常的正确解决方法

    java.lang.IllegalArgumentException 是 Java 编程语言中的一个运行时异常,通常表示向方法传递了一个不合法或不适当的参数,本文给大家分享有关java.lang.IllegalArgumentException异常的正确解决方法,感兴趣的朋友跟随小编一起看看吧
    2025-09-09

最新评论