Semaphore介绍
-
Semaphore是一个计数信号量,它的本质是一个"共享锁";
-
Semaphore 主要用于限量控制并发执行代码的工具类, 其内部通过一个permit【int型】来进行定义并发执行的数量, 其实可以理解为一个限制数量的读锁(java.util.concurrent.locks.ReadWriteLock#readLock)获取;
-
信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。
Semaphore 主要特点
- Semaphore 方法的实现通过 Sync(AQS的继承类)代理来实现
- 支持公平与非公平模式, 都是在AQS的子类里面进行, 主要区分在 tryAcquire 里面
Semaphore源码
继承关系
Semaphore实现了Serializable 接口,同时含有一个私有Sync对象字段
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
/** All mechanics via AbstractQueuedSynchronizer subclass */
private final Sync sync;
}
内部类Sync的实现
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
NonfairSync的实现
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
FairSync的实现
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
构造函数
维护了一个信号量最大许可数
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
成员方法
// 释放信号量,将信号量数量返回给Semaphore
public void release()
{
sync.releaseShared(1);
}
public void release(int paramInt)
{
if (paramInt < 0) {
throw new IllegalArgumentException();
}
sync.releaseShared(paramInt);
}
// 获取信号量,直到只有一个可以用或者出现中断
public void acquire()
throws InterruptedException
{
sync.acquireSharedInterruptibly(1);
}
public void acquire(int paramInt)
throws InterruptedException
{
if (paramInt < 0) {
throw new IllegalArgumentException();
}
sync.acquireSharedInterruptibly(paramInt);
}
// 如果获取了锁立即返回true,如果别的线程正持有锁,立即返回false
public boolean tryAcquire()
{
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(long paramLong, TimeUnit paramTimeUnit)
throws InterruptedException
{
return sync.tryAcquireSharedNanos(1, paramTimeUnit.toNanos(paramLong));
}
Semaphore用例
- 源码
class SDTask extends Thread {
private Semaphore semaphore;
public SDTask(Semaphore semaphore, String name) {
super(name);
this.semaphore = semaphore;
}
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 尝试获取3个信号!!!");
semaphore.acquire(3);
System.out.println(Thread.currentThread().getName() + " 获取了3个信号!!!");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " 释放了3个信号!!!");
semaphore.release(3);
}
}
}
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(8);
for (int i = 0; i < 3; i++) {
new SDTask(semaphore, "thread" + i).start();
}
}
}
- 运行结果
thread0 尝试获取3个信号!!!
thread0 获取了3个信号!!!
thread2 尝试获取3个信号!!!
thread2 获取了3个信号!!!
thread1 尝试获取3个信号!!!
thread0 释放了3个信号!!!
thread2 释放了3个信号!!!
thread1 获取了3个信号!!!
thread1 释放了3个信号!!!
优秀文档:
- https://blog.csdn.net/carson0408/article/details/79475723