Java中的Semaphore原理解析

 更新时间:2024年01月22日 10:28:32   作者:我不是欧拉_  
这篇文章主要介绍了Java中的Semaphore原理解析,Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,需要的朋友可以参考下

1. Semaphore是什么?

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

Semaphore一般用于流量的控制,特别是公共资源有限的应用场景。例如数据库的连接,假设数据库的连接数上线为10个,多个线程并发操作数据库可以使用Semaphore来控制并发操作数据库的线程个数最多为10个。

2. 类图

通过类图可以看到,Semaphore与ReentrantLock的内部类的结构相同,类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。 

3. 实现原理

3.1 使用示例

    // 定义一个资源池类
    class Pool {
        // 可用资源数100
        private static final int MAX_AVAILABLE = 100;
        // 定义信号量100
        private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
        // 获取资源
        public Object getItem() throws InterruptedException {
            // 尝试获取
            available.acquire();
            // 返回可用资源
            return getNextAvailableItem();
        }
        // 释放资源
        public void putItem(Object x) {
            // 如果资源标记为未被使用
            if (markAsUnused(x))
            // 释放资源
            available.release();
        }
        // Not a particularly efficient data structure; just for demo
        // 定义资源类型,可以是满足业务的任何类型
        protected Object[] items = new Object[MAX_AVAILABLE] ... whatever kinds of items being managed
        // 是否被使用标记
        protected boolean[] used = new boolean[MAX_AVAILABLE];
        // 获取下一个可用资源
        protected synchronized Object getNextAvailableItem() {
            // 循环遍历
            for (int i = 0; i < MAX_AVAILABLE; ++i) {
                // 如果未被使用
                if (!used[i]) {
                    // 使用标记设置为true
                    used[i] = true;
                    // 返回当前的资源
                    return items[i];
                }
            }
            return null; // not reached
        }
        // 标记资源为未被使用
        protected synchronized boolean markAsUnused(Object item) {
            // 循环遍历
            for (int i = 0; i < MAX_AVAILABLE; ++i) {
                // 找到需要释放的资源
                if (item == items[i]) {
                    // 如果是被使用中
                    if (used[i]) {
                    // 使用标记设置为false
                    used[i] = false;
                    // 返回true表示标记成功
                    return true;
                } else
                    // 返回false表示标记失败
                    return false;
                }
            }
            return false;
        }
    }

3.2 Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        // 构造方法,调用父类AQS的setState方法,给共享变量state赋值
        // 即通过构造方法给锁的数量附初始值
        Sync(int permits) {
            setState(permits);
        }
        // 获取锁,也叫许可
        final int getPermits() {
            return getState();
        }
        // 共享模式下的非公平获取
        // 此方法也体现出与ReentrantLock中Sync的实现不同
        // ReentrantLock中Sync是独占模式下的获取
        // 具体实现的不同体现在int remaining = available - acquires;
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                // 获取锁的可用数量
                int available = getState();
                // 可用数量 - 请求的数量(acquires默认值为1) = 剩余量
                int remaining = available - acquires;
                // 如果remaining < 0即请求的锁大于可用的数量,马上返回负数,表示获取锁失败
                if (remaining < 0 ||
                    // 否则通过CAS的方式将可用数量换成剩余量,并返回剩余量
                    // 自旋 + CAS 保证线程安全,线程不用排队体现出非公平性
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        // 共享模式下释放锁
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                // CAS修改锁数量,成功则返回,失败则继续自旋
                if (compareAndSetState(current, next))
                    return true;
            }
        }
        // 根据指定的缩减量减小可用锁的数目
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
        // 获取并返回立即可用的所有锁
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

3.3 NonfairSync

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        // 构造方法初始化锁数量
        NonfairSync(int permits) {
            super(permits);
        }
        // 直接调用nonfairTryAcquireShared方法,走非公平策略
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

3.4 FairSync

static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
        // 构造方法初始化锁数量
        FairSync(int permits) {
            super(permits);
        }
        // 共享模式下的公平策略获取
        // 与非公平策略唯一的不同体现在线程是否需要排队
        // 即是否调用hasQueuedPredecessors()方法进行判断
        // 如果需要排队则立即返回继续排队
        // 否则通过CAS方式获取锁并返货锁的剩余量,结束自旋
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

通过分析代码发现,Semaphore与ReentrantLock的内部类的结构相同,具体实现的不同体现在 int remaining = available - acquires这行代码上。

ReentrantLock对于锁的控制是 int c = getState(); if (c == 0){....}。体现为一种独占的控制。

Semaphore对锁的控制是 for (;;) { int available = getState(); int remaining = available - acquires;......}。即所有线程都可以进入自旋,只要锁有剩余量都可以尝试获取锁,体现为一种共享的控制。

3.5 Semaphore

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    /** All mechanics via AbstractQueuedSynchronizer subclass */
    // 同步队列
    private final Sync sync;
    // 构造方法初始话锁数量
    // 默认采用非公平策略
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    // 构造方法,带一个布尔参数,true表示采用公平策略,false表示采用非公平策略
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
}

3.5.1 acquire() 方法解析

// Semaphore
public void acquire() throws InterruptedException {
        // 调用sync的acquireSharedInterruptibly,即响应中断的获取
        // 因为sync继承AbstractQueuedSynchronizer
        // 即调用AQS的acquireSharedInterruptibly
        sync.acquireSharedInterruptibly(1);
    }
// 进入AQS
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 如果线程被中断,则响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        // 否则调用tryAcquireShared,如果获取的锁小于0即获取锁失败则调用doAcquireSharedInterruptibly方法,进入同步队列排队
        // 如果获取锁成功则不排队,走业务逻辑
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
// Semaphore 中tryAcquireShared的实现
// 公平策略
protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
// 非公平策略
protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
// 前面已经解析过,不在赘述
 

3.5.2 release() 方法解析

// Semaphore
public void release() {
        sync.releaseShared(1);
    }
// 进入AQS
public final boolean releaseShared(int arg) {
        // 尝试释放锁, 如果释放锁成功
        if (tryReleaseShared(arg)) {
            // 线程出同步队列,返回true
            doReleaseShared();
            return true;
        }
        // 否则返回false
        return false;
    }
// Semaphore 中tryReleaseShared实现
protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

3.5.3 其他方法

方法说明调用
acquire(int permits)获取信号量,指定获取许可的个数,响应中断sync.acquireSharedInterruptibly(permits)
acquireUninterruptibly()获取信号量,默认获取1个许可,不响应中断sync.acquireShared(1)
acquireUninterruptibly(int permits)获取信号量,指定获取许可的个数,不响应中断sync.acquireShared(permits)
release(int permits)释放信号量,指定释放许可的个数sync.releaseShared(permits);
tryAcquire()尝试获取许可,如果获取成功返回true,否则返回false,不会阻塞线程,而且不响应中断sync.nonfairTryAcquireShared(1)
tryAcquire(int permits)同上,可以指定获取许可的个数sync.nonfairTryAcquireShared(permits)
tryAcquire(long timeout, TimeUnit unit)共享式超时获取sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
tryAcquire(int permits, long timeout, TimeUnit unit)同上,可以指定获取许可的个数sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout))
availablePermits()获取可用许可数sync.getPermits()
drainPermits()将剩下的信号量一次性消耗光,并且返回所消耗的信号量sync.drainPermits()
reducePermits(int reduction)减少信号量的总数,不会导致任何线程阻塞,调用该方法可能会导致信号量最终为负数sync.reducePermits(reduction)
isFair()是否采用公平策略
hasQueuedThreads()是否是已排队的线程
getQueueLength()获取排队线程的长度
getQueuedThreads()获取排队线程

4. 总结

Semaphore是一个有效的流量控制工具,它基于AQS共享锁实现。我们常常用它来控制对有限资源的访问。使用步骤

每次使用资源前,先申请一个信号量,如果资源数不够,就会阻塞等待;每次释放资源后,就释放一个信号量。

到此这篇关于Java中的Semaphore原理解析的文章就介绍到这了,更多相关Semaphore原理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • springboot+redis过期事件监听实现过程解析

    springboot+redis过期事件监听实现过程解析

    这篇文章主要介绍了springboot+redis过期事件监听实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • Java中的构造方法(构造函数)与普通方法区别及说明

    Java中的构造方法(构造函数)与普通方法区别及说明

    这篇文章主要介绍了Java中的构造方法(构造函数)与普通方法区别及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-03-03
  • mybatis中的count()按条件查询方式

    mybatis中的count()按条件查询方式

    这篇文章主要介绍了mybatis中的count()按条件查询方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • SpringBoot项目部署在weblogic中间件的注意事项说明

    SpringBoot项目部署在weblogic中间件的注意事项说明

    这篇文章主要介绍了SpringBoot项目部署在weblogic中间件的注意事项说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • java与php的区别浅析

    java与php的区别浅析

    在本篇文章里小编给大家整理了关于java与php的区别以及相关知识点,有兴趣的朋友们学习下。
    2019-03-03
  • Java编程二项分布的递归和非递归实现代码实例

    Java编程二项分布的递归和非递归实现代码实例

    这篇文章主要介绍了Java编程二项分布的递归和非递归实现代码实例,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
    2018-01-01
  • Java入门之集合框架详解

    Java入门之集合框架详解

    本文介绍了Java集合框架体系结构,包括List、Set和Map接口及其常用实现类,重点讲解了ArrayList、LinkedList、HashSet、TreeSet和HashMap等类的使用方法,并详细介绍了泛型的定义和作用,最后,还讨论了集合的遍历和并发修改异常处理
    2025-01-01
  • Java结构型设计模式中代理模式示例详解

    Java结构型设计模式中代理模式示例详解

    代理模式(Proxy Parttern)为一个对象提供一个替身,来控制这个对象的访问,即通过代理对象来访问目标对象。本文将通过示例详细讲解一下这个模式,需要的可以参考一下
    2022-09-09
  • java 设计模式之依赖倒置实例详解

    java 设计模式之依赖倒置实例详解

    这篇文章主要介绍了java 设计模式之依赖倒置,结合实例形式详细分析了依赖倒置的相关概念、原理、使用技巧及相关操作注意事项,需要的朋友可以参考下
    2019-11-11
  • Java overload和override的区别分析

    Java overload和override的区别分析

    方法的重写(Overriding)和重载(Overloading)是Java多态性的不同表现,想要了解更多请参考本文
    2012-11-11

最新评论