admin管理员组

文章数量:1032173

Java中的信号量semaphore

1. 概述

在本快速教程中,我们将探讨 Java 中信号量和互斥体的基础知识。

2.信号量

我们将从java.util.concurrent.Semaphore开始。我们可以使用信号量来限制访问特定资源的并发线程数。

在下面的示例中,我们将实现一个简单的登录队列来限制系统中的用户数量:

代码语言:javascript代码运行次数:0运行复制
class LoginQueueUsingSemaphore {

    private Semaphore semaphore;

    public LoginQueueUsingSemaphore(int slotLimit) {
        semaphore = new Semaphore(slotLimit);
    }

    boolean tryLogin() {
        return semaphore.tryAcquire();
    }

    void logout() {
        semaphore.release();
    }

    int availableSlots() {
        return semaphore.availablePermits();
    }

}Copy

请注意我们如何使用以下方法:

  • tryAcquire()– 如果许可证立即可用,则返回 true,否则返回 false,但acquire() 获取许可证并阻止,直到一个许可证可用
  • release() – 释放许可证
  • availablePermits() – 返回当前可用许可证的数量

为了测试我们的登录队列,我们将首先尝试达到限制并检查下一次登录尝试是否会被阻止:

代码语言:javascript代码运行次数:0运行复制
@Test
public void givenLoginQueue_whenReachLimit_thenBlocked() {
    int slots = 10;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(loginQueue::tryLogin));
    executorService.shutdown();

    assertEquals(0, loginQueue.availableSlots());
    assertFalse(loginQueue.tryLogin());
}Copy

接下来,我们将查看注销后是否有任何插槽可用:

代码语言:javascript代码运行次数:0运行复制
@Test
public void givenLoginQueue_whenLogout_thenSlotsAvailable() {
    int slots = 10;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(loginQueue::tryLogin));
    executorService.shutdown();
    assertEquals(0, loginQueue.availableSlots());
    loginQueue.logout();

    assertTrue(loginQueue.availableSlots() > 0);
    assertTrue(loginQueue.tryLogin());
}Copy

3. 定时信号量

接下来,我们将讨论Apache CommonsTimedSemaphore。TimedSemaphore允许将多个许可证作为简单的信号量,但在给定的时间段内,在此时间段之后,时间重置并释放所有许可证。

我们可以使用TimedSemaphore来构建一个简单的延迟队列,如下所示:

代码语言:javascript代码运行次数:0运行复制
class DelayQueueUsingTimedSemaphore {

    private TimedSemaphore semaphore;

    DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
        semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
    }

    boolean tryAdd() {
        return semaphore.tryAcquire();
    }

    int availableSlots() {
        return semaphore.getAvailablePermits();
    }

}Copy

当我们使用以一秒作为时间段的延迟队列并在一秒内用完所有插槽后,应该没有一个可用:

代码语言:javascript代码运行次数:0运行复制
public void givenDelayQueue_whenReachLimit_thenBlocked() {
    int slots = 50;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    DelayQueueUsingTimedSemaphore delayQueue 
      = new DelayQueueUsingTimedSemaphore(1, slots);
    
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(delayQueue::tryAdd));
    executorService.shutdown();

    assertEquals(0, delayQueue.availableSlots());
    assertFalse(delayQueue.tryAdd());
}Copy

但是在睡了一段时间后,信号灯应该重置并释放许可证:

代码语言:javascript代码运行次数:0运行复制
@Test
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
    int slots = 50;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(delayQueue::tryAdd));
    executorService.shutdown();

    assertEquals(0, delayQueue.availableSlots());
    Thread.sleep(1000);
    assertTrue(delayQueue.availableSlots() > 0);
    assertTrue(delayQueue.tryAdd());
}Copy

4. 信号量与互斥体

互斥体的作用类似于二进制信号量,我们可以使用它来实现互斥。

在以下示例中,我们将使用简单的二进制信号量来构建计数器:

代码语言:javascript代码运行次数:0运行复制
class CounterUsingMutex {

    private Semaphore mutex;
    private int count;

    CounterUsingMutex() {
        mutex = new Semaphore(1);
        count = 0;
    }

    void increase() throws InterruptedException {
        mutex.acquire();
        this.count = this.count + 1;
        Thread.sleep(1000);
        mutex.release();

    }

    int getCount() {
        return this.count;
    }

    boolean hasQueuedThreads() {
        return mutex.hasQueuedThreads();
    }
}Copy

当许多线程尝试同时访问计数器时,它们只会在队列中被阻止:

代码语言:javascript代码运行次数:0运行复制
@Test
public void whenMutexAndMultipleThreads_thenBlocked()
 throws InterruptedException {
    int count = 5;
    ExecutorService executorService
     = Executors.newFixedThreadPool(count);
    CounterUsingMutex counter = new CounterUsingMutex();
    IntStream.range(0, count)
      .forEach(user -> executorService.execute(() -> {
          try {
              counter.increase();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }));
    executorService.shutdown();

    assertTrue(counter.hasQueuedThreads());
}Copy

当我们等待时,所有线程都将访问计数器,队列中没有线程:

代码语言:javascript代码运行次数:0运行复制
@Test
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount()
 throws InterruptedException {
    int count = 5;
    ExecutorService executorService
     = Executors.newFixedThreadPool(count);
    CounterUsingMutex counter = new CounterUsingMutex();
    IntStream.range(0, count)
      .forEach(user -> executorService.execute(() -> {
          try {
              counter.increase();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }));
    executorService.shutdown();

    assertTrue(counter.hasQueuedThreads());
    Thread.sleep(5000);
    assertFalse(counter.hasQueuedThreads());
    assertEquals(count, counter.getCount());
}Copy

5. 结论

在本文中,我们探讨了 Java 中信号量的基础知识。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-02-14,如有侵权请联系 cloudcommunity@tencent 删除javasemaphore登录队列线程

Java中的信号量semaphore

1. 概述

在本快速教程中,我们将探讨 Java 中信号量和互斥体的基础知识。

2.信号量

我们将从java.util.concurrent.Semaphore开始。我们可以使用信号量来限制访问特定资源的并发线程数。

在下面的示例中,我们将实现一个简单的登录队列来限制系统中的用户数量:

代码语言:javascript代码运行次数:0运行复制
class LoginQueueUsingSemaphore {

    private Semaphore semaphore;

    public LoginQueueUsingSemaphore(int slotLimit) {
        semaphore = new Semaphore(slotLimit);
    }

    boolean tryLogin() {
        return semaphore.tryAcquire();
    }

    void logout() {
        semaphore.release();
    }

    int availableSlots() {
        return semaphore.availablePermits();
    }

}Copy

请注意我们如何使用以下方法:

  • tryAcquire()– 如果许可证立即可用,则返回 true,否则返回 false,但acquire() 获取许可证并阻止,直到一个许可证可用
  • release() – 释放许可证
  • availablePermits() – 返回当前可用许可证的数量

为了测试我们的登录队列,我们将首先尝试达到限制并检查下一次登录尝试是否会被阻止:

代码语言:javascript代码运行次数:0运行复制
@Test
public void givenLoginQueue_whenReachLimit_thenBlocked() {
    int slots = 10;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(loginQueue::tryLogin));
    executorService.shutdown();

    assertEquals(0, loginQueue.availableSlots());
    assertFalse(loginQueue.tryLogin());
}Copy

接下来,我们将查看注销后是否有任何插槽可用:

代码语言:javascript代码运行次数:0运行复制
@Test
public void givenLoginQueue_whenLogout_thenSlotsAvailable() {
    int slots = 10;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(loginQueue::tryLogin));
    executorService.shutdown();
    assertEquals(0, loginQueue.availableSlots());
    loginQueue.logout();

    assertTrue(loginQueue.availableSlots() > 0);
    assertTrue(loginQueue.tryLogin());
}Copy

3. 定时信号量

接下来,我们将讨论Apache CommonsTimedSemaphore。TimedSemaphore允许将多个许可证作为简单的信号量,但在给定的时间段内,在此时间段之后,时间重置并释放所有许可证。

我们可以使用TimedSemaphore来构建一个简单的延迟队列,如下所示:

代码语言:javascript代码运行次数:0运行复制
class DelayQueueUsingTimedSemaphore {

    private TimedSemaphore semaphore;

    DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
        semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
    }

    boolean tryAdd() {
        return semaphore.tryAcquire();
    }

    int availableSlots() {
        return semaphore.getAvailablePermits();
    }

}Copy

当我们使用以一秒作为时间段的延迟队列并在一秒内用完所有插槽后,应该没有一个可用:

代码语言:javascript代码运行次数:0运行复制
public void givenDelayQueue_whenReachLimit_thenBlocked() {
    int slots = 50;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    DelayQueueUsingTimedSemaphore delayQueue 
      = new DelayQueueUsingTimedSemaphore(1, slots);
    
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(delayQueue::tryAdd));
    executorService.shutdown();

    assertEquals(0, delayQueue.availableSlots());
    assertFalse(delayQueue.tryAdd());
}Copy

但是在睡了一段时间后,信号灯应该重置并释放许可证:

代码语言:javascript代码运行次数:0运行复制
@Test
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
    int slots = 50;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(delayQueue::tryAdd));
    executorService.shutdown();

    assertEquals(0, delayQueue.availableSlots());
    Thread.sleep(1000);
    assertTrue(delayQueue.availableSlots() > 0);
    assertTrue(delayQueue.tryAdd());
}Copy

4. 信号量与互斥体

互斥体的作用类似于二进制信号量,我们可以使用它来实现互斥。

在以下示例中,我们将使用简单的二进制信号量来构建计数器:

代码语言:javascript代码运行次数:0运行复制
class CounterUsingMutex {

    private Semaphore mutex;
    private int count;

    CounterUsingMutex() {
        mutex = new Semaphore(1);
        count = 0;
    }

    void increase() throws InterruptedException {
        mutex.acquire();
        this.count = this.count + 1;
        Thread.sleep(1000);
        mutex.release();

    }

    int getCount() {
        return this.count;
    }

    boolean hasQueuedThreads() {
        return mutex.hasQueuedThreads();
    }
}Copy

当许多线程尝试同时访问计数器时,它们只会在队列中被阻止:

代码语言:javascript代码运行次数:0运行复制
@Test
public void whenMutexAndMultipleThreads_thenBlocked()
 throws InterruptedException {
    int count = 5;
    ExecutorService executorService
     = Executors.newFixedThreadPool(count);
    CounterUsingMutex counter = new CounterUsingMutex();
    IntStream.range(0, count)
      .forEach(user -> executorService.execute(() -> {
          try {
              counter.increase();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }));
    executorService.shutdown();

    assertTrue(counter.hasQueuedThreads());
}Copy

当我们等待时,所有线程都将访问计数器,队列中没有线程:

代码语言:javascript代码运行次数:0运行复制
@Test
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount()
 throws InterruptedException {
    int count = 5;
    ExecutorService executorService
     = Executors.newFixedThreadPool(count);
    CounterUsingMutex counter = new CounterUsingMutex();
    IntStream.range(0, count)
      .forEach(user -> executorService.execute(() -> {
          try {
              counter.increase();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }));
    executorService.shutdown();

    assertTrue(counter.hasQueuedThreads());
    Thread.sleep(5000);
    assertFalse(counter.hasQueuedThreads());
    assertEquals(count, counter.getCount());
}Copy

5. 结论

在本文中,我们探讨了 Java 中信号量的基础知识。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-02-14,如有侵权请联系 cloudcommunity@tencent 删除javasemaphore登录队列线程

本文标签: Java中的信号量semaphore