[TOC] - `Semaphore` 是一个强大的同步工具,特别适合于需要限制资源并发访问数量的场景。通过合理使用 `Semaphore`,可以有效提高应用程序的性能和稳定性。 - `Semaphore` 通过 `AQS` 提供了一种高效的同步机制,用于控制同时访问特定资源的线程数量。`- - `Semaphore` 的实现分为非公平和公平两种模式,分别适用于不同的应用场景。 ## Java中的Semaphore详解 ### 设计思路 `Semaphore` 是 Java 并发包 `java.util.concurrent` 中提供的一种同步工具,用于控制同时访问特定资源的线程数量。它通过维护一组许可证来管理这些访问权限。当一个线程需要访问共享资源时,它必须先从 `Semaphore` 获取一个许可证;如果此时没有可用的许可证,那么该线程将被阻塞,直到其他线程释放一个许可证。这种机制可以有效地限制并发执行的线程数量,确保系统的稳定性和资源的有效利用。 ### 实现原理 - **许可证概念**:`Semaphore` 内部使用了一个许可集(许可证的数量),每个许可代表一个资源单位。线程调用 `acquire()` 方法请求一个或多个许可,只有当有足够的许可时,线程才能继续执行;否则,线程将被放入等待队列中,直到其他线程调用 `release()` 方法释放相应的许可。 - **公平性选项**:`Semaphore` 支持公平性和非公平性两种模式。公平模式下,按照请求许可的顺序分配许可;非公平模式下,则允许插队,即新来的线程可能会在等待队列中的其他线程之前获得许可。 - **内部实现**:`Semaphore` 的内部实现基于 AQS(AbstractQueuedSynchronizer)框架,这是一个用于构建锁和同步器的基础框架。AQS 维护了一个 FIFO 的等待队列,用于管理线程的排队行为。 [Gitee链接地址,建议收藏,后续我会对专栏进行统一整理,每篇文章进行校正和调整,然后统一存放在gitee仓库中](https://gitee.com/MyCoder4j/coder4j) ### 源码分析 #### 数据结构 `Semaphore` 的主要数据结构包括: - **`sync`**:一个 `Sync` 类型的实例,继承自 `AbstractQueuedSynchronizer`。`Sync` 类有两个子类:`NonfairSync` 和 `FairSync`,分别表示非公平和公平的 `Semaphore`。 - **`permits`**:表示初始的许可数量。 #### 1. 构造方法 `Semaphore` 提供了两个构造方法,用于创建不同类型的 `Semaphore`: - `Semaphore(int permits)`:创建一个具有给定许可数的非公平 `Semaphore`。 - `Semaphore(int permits, boolean fair)`:创建一个具有给定许可数和指定是否为公平的 `Semaphore`。 ```java public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } ``` #### 2. `acquire()` 和 `acquire(int permits)` 这两个方法用于获取一个或多个许可。如果当前没有足够的许可,线程将被阻塞,直到其他线程释放许可。 ```java public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } ``` #### 3. `tryAcquire()` 和 `tryAcquire(int permits)` 这两个方法尝试获取一个或多个许可,如果立即可用则返回 `true`,否则返回 `false`。 ```java public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } ``` #### 4. `release()` 和 `release(int permits)` 这两个方法用于释放一个或多个许可,增加可用的许可数。 ```java public void release() { sync.releaseShared(1); } public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } ``` #### 5. `availablePermits()` 返回当前可用的许可数。 ```java public int availablePermits() { return sync.getPermits(); } ``` #### 6. `drainPermits()` 获取并清除所有可用的许可。 ```java public int drainPermits() { return sync.drainPermits(); } ``` ### 内部机制 #### 1. `AbstractQueuedSynchronizer` (AQS) `Semaphore` 的核心实现依赖于 `AQS`,`AQS` 是一个用于构建锁和同步器的基础框架。`AQS` 使用一个 `int` 类型的 `state` 变量来表示同步状态,对于 `Semaphore` 来说,这个状态就是当前可用的许可数。 - **`getState()`**:返回当前的同步状态。 - **`setState(int newState)`**:设置当前的同步状态。 - **`compareAndSetState(int expect, int update)`**:原子地将同步状态从预期值 `expect` 更新为 `update`。 #### 2. `Sync` 类 `Sync` 类继承自 `AQS`,提供了 `Semaphore` 的核心同步逻辑。`Sync` 类有两个子类:`NonfairSync` 和 `FairSync`,分别实现了非公平和公平的 `Semaphore`。 - **`nonfairTryAcquireShared(int acquires)`**:尝试获取许可,非公平模式下,不需要检查是否有等待的线程。 - **`tryReleaseShared(int releases)`**:尝试释放许可,更新同步状态并唤醒等待的线程。 - **`reducePermits(int reductions)`**:减少许可数,通常用于内部操作。 - **`drainPermits()`**:获取并清除所有可用的许可。 #### 3. 公平性 - **非公平模式**:`NonfairSync` 不检查是否有等待的线程,直接尝试获取许可。 - **公平模式**:`FairSync` 在尝试获取许可前,会检查是否有等待的线程,如果有等待的线程,则当前线程会被阻塞。 ### 与其他同步工具的差异对比 - **与 Lock 的区别**:`Lock` 提供了更细粒度的锁定机制,适用于需要精确控制锁的情况;而 `Semaphore` 更侧重于控制资源的并发访问数量。 - **与 CountDownLatch 的区别**:`CountDownLatch` 用于等待一个或多个工作线程完成操作后继续执行,是一次性的;`Semaphore` 则可以多次获取和释放许可,适用于重复使用的场景。 - **与 CyclicBarrier 的区别**:`CyclicBarrier` 用于多个线程互相等待到达某个屏障点后再一起继续执行,主要用于循环同步;`Semaphore` 主要用于控制并发访问的数量,不限于循环同步。 ### 应用场景 - **资源池管理**:如数据库连接池、线程池等,通过 `Semaphore` 控制同时可使用的资源数量。 - **流量控制**:在网络应用中,可以用来限制同时处理的请求数量,避免服务器过载。 - **并发控制**:在多线程环境中,控制对特定资源的并发访问,确保资源的安全使用。 ## 不同应用场景使用示例 ### 1. 资源池管理 #### 场景描述 假设我们有一个数据库连接池,最多允许 10 个并发连接。我们可以使用 `Semaphore` 来控制同时打开的数据库连接数量。 #### 代码示例 ```java import java.sql.Connection; import java.sql.DriverManager; import java.util.concurrent.Semaphore; public class DatabaseConnectionPool { private final Semaphore semaphore; private final Connection[] connections; public DatabaseConnectionPool(int poolSize) throws Exception { semaphore = new Semaphore(poolSize); connections = new Connection[poolSize]; for (int i = 0; i < poolSize; i++) { connections[i] = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password"); } } public Connection getConnection() throws InterruptedException { semaphore.acquire(); return findFreeConnection(); } public void releaseConnection(Connection connection) { if (connection != null) { // 假设连接已经归还到连接池 semaphore.release(); } } private Connection findFreeConnection() { for (Connection conn : connections) { if (!conn.isClosed()) { return conn; } } return null; // 这里应该不会发生,因为 acquire 已经保证了有空闲连接 } public static void main(String[] args) throws Exception { DatabaseConnectionPool pool = new DatabaseConnectionPool(10); Runnable task = () -> { try { Connection conn = pool.getConnection(); System.out.println(Thread.currentThread().getName() + " acquired connection: " + conn); Thread.sleep(1000); // 模拟数据库操作 pool.releaseConnection(conn); System.out.println(Thread.currentThread().getName() + " released connection: " + conn); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }; for (int i = 0; i < 20; i++) { new Thread(task).start(); } } } ``` ### 2. 流量控制 #### 场景描述 假设我们有一个网络服务,每秒最多处理 5 个请求。我们可以使用 `Semaphore` 来限制每秒的请求数量。 #### 代码示例 ```java import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; public class RateLimiter { private final Semaphore semaphore; public RateLimiter(int maxRequestsPerSecond) { semaphore = new Semaphore(maxRequestsPerSecond); } public void handleRequest(Runnable request) { try { semaphore.acquire(); request.run(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { semaphore.release(); } } public static void main(String[] args) { RateLimiter rateLimiter = new RateLimiter(5); Runnable request = () -> { System.out.println(Thread.currentThread().getName() + " is handling the request."); try { Thread.sleep(1000); // 模拟处理请求的时间 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }; for (int i = 0; i < 20; i++) { rateLimiter.handleRequest(request); } } } ``` ### 3. 并发控制 #### 场景描述 假设我们有一个文件系统,最多允许 3 个线程同时写入文件。我们可以使用 `Semaphore` 来控制同时写入文件的线程数量。 #### 代码示例 ```java import java.io.FileWriter; import java.io.IOException; import java.util.concurrent.Semaphore; public class FileWriteController { private final Semaphore semaphore; public FileWriteController(int maxConcurrentWrites) { semaphore = new Semaphore(maxConcurrentWrites); } public void writeFile(String content) { try { semaphore.acquire(); writeToFile(content); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { semaphore.release(); } } private void writeToFile(String content) { try (FileWriter writer = new FileWriter("output.txt", true)) { writer.write(content + "\n"); System.out.println(Thread.currentThread().getName() + " wrote: " + content); Thread.sleep(1000); // 模拟写入时间 } catch (IOException | InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { FileWriteController controller = new FileWriteController(3); Runnable task = () -> { for (int i = 0; i < 5; i++) { controller.writeFile("Line " + i + " by " + Thread.currentThread().getName()); } }; for (int i = 0; i < 10; i++) { new Thread(task, "Thread-" + i).start(); } } } ```