【JAVA多线程】JDK线程同步工具:Semaphore、CountDownLatch、CyclicBarrier

_BugMan 2024-08-30 16:05:01 阅读 55

目录

1.可能会遇到的线程协作场景

2.Semaphore

3.CountDownLatch

4.CyclicBarrier


1.可能会遇到的线程协作场景

在并发编程中,线程除了独自向前运行,还可能相互之间要进行协作,以保证完成最终总的目标。可能会遇到的几种任务之间的协作:

情景1:限定任务数

由于资源有限,限制最多有多少个线程进行工作

情景2:任务之间有依赖关系

一个线程依赖于其它线程的执行结果,这个线程就必须等待其它线程执行完成才能继续往下走

情景3:任务分阶段

批量线程分阶段执行,每一个阶段是一个同步点,执行完的线程必须阻塞在同步点上等待同批的其它线程也执行完,再进入下一个阶段。由于阶段可能有多个,所以要用condition来实现。

情景4:动态调整线程数量

不管是情景2也好还是情景3也好,都有可能有动态调整线程的可能性

2.Semaphore

semaphore,信号量,用来解决情景1。

业务情景:

我们有一个资源,只允许最多 3 个线程同时访问。我们将使用 Semaphore 来实现这一功能。

代码示例:

<code>import java.util.concurrent.Semaphore;

import java.util.concurrent.TimeUnit;

public class SemaphoreExample {

  private static final int MAX_CONCURRENT_ACCESS = 3;

  private static final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_ACCESS);

  public static void main(String[] args) {

      for (int i = 0; i < 10; i++) {

          new Thread(() -> {

              try {

                  // 获取许可

                  semaphore.acquire();

                  System.out.println(Thread.currentThread().getName() + " 开始访问资源");

                  // 模拟耗时操作

                  TimeUnit.SECONDS.sleep(2);

                  System.out.println(Thread.currentThread().getName() + " 结束访问资源");

              } catch (InterruptedException e) {

                  Thread.currentThread().interrupt();

                  System.err.println(Thread.currentThread().getName() + " 被中断");

              } finally {

                  // 释放许可

                  semaphore.release();

              }

          }).start();

      }

  }

}

semaphore的实现底层其实就是AQS,没抢到资源的线程阻塞在队列中,也分了公平锁和非公平锁,其实现了AQS的共享模式tryAcquireShared(),每次去抢占资源的时候就对state做CAS减法。

acquire()去抢资源,没抢到或者抢失败了就把线程阻塞进CLH队列中:

最终调用到的是tryAcquireShared(),每次去抢占资源的时候就对state做CAS减法:

释放release()就不展开了,也是很简单的。

3.CountDownLatch

CountDownLatch,栅栏,用来解决情景2。

业务场景:

1个主线程需要等待10Worker线程完成工作才能退出。

这时候就要用CountDownLatch:

<code>CountDownLatch countDownLatch=new CountDownLatch(10);

countDownLatch.await();//主线程阻塞在这里

其余Worker线程各自去:

countDownLatch.countDown();//每调用一次countDown,计数就会减1,减到0的时候主线程会被唤醒

countDownLatch也有一个继承AQS的Sync,countDown会去调用AQS共享模式的释放方法releaseShared()

releaseShared会CAS去对state进行-1,当发现state减到0后,会用doReleaseShared唤醒躺在CLH队列中的调用过await()的主线程:

4.CyclicBarrier

业务场景:

一共有10个线程,分阶段执行任务,每一个阶段必须所有10个线程都执行后,才能一同去执行下一个阶段的任务。

代码示例:

<code>import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.TimeUnit;

public class CyclicBarrierDemo {

  public static void main(String[] args) throws InterruptedException {

      // 创建一个 CyclicBarrier 实例

      CyclicBarrier barrier = new CyclicBarrier(10, () -> {

          System.out.println("All threads have arrived at the barrier. Moving to the next phase.");

      });

      // 启动 10 个线程

      for (int i = 0; i < 10; i++) {

          new Thread(() -> work(barrier)).start();

      }

      // 等待所有线程完成第一个阶段

      TimeUnit.SECONDS.sleep(2);

      // 等待所有线程完成第二个阶段

      TimeUnit.SECONDS.sleep(2);

      // 等待所有线程完成第三个阶段

      TimeUnit.SECONDS.sleep(2);

  }

  private static void work(CyclicBarrier barrier) {

      try {

          // 模拟工作

          TimeUnit.SECONDS.sleep(1); // 暂停一段时间

          // 到达屏障

          barrier.await();

          // 模拟第二阶段的工作

          TimeUnit.SECONDS.sleep(1); // 暂停一段时间

          // 到达屏障

          barrier.await();

          // 模拟第三阶段的工作

          TimeUnit.SECONDS.sleep(1); // 暂停一段时间

          // 到达屏障

          barrier.await();

      } catch (InterruptedException | BrokenBarrierException e) {

          Thread.currentThread().interrupt();

          System.err.println(Thread.currentThread().getName() + ": Interrupted or barrier broken.");

      }

  }

}

上面的代码模拟了三阶段的任务,没执行完一个阶段的任务线程就会调用CyclicBarrier的await()来等待其它的合作伙伴线程,要大家都达到后才会继续向下执行。可以看到CyclicBarrier的await()是线程同步的核心方法。一起来看看源码:

await()里面调用了doawait(),所以doawait才是核心方法:

来看看doawait()的源码:

<code>private int dowait(boolean timed, long nanos)

      throws InterruptedException, BrokenBarrierException,

              TimeoutException {

  final ReentrantLock lock = this.lock;

  lock.lock();

  try {

      // 获取当前的 Generation 对象

      final Generation g = generation;

      // 如果屏障已经损坏,则抛出 BrokenBarrierException

      if (g.broken)

          throw new BrokenBarrierException();

      // 如果线程被中断,则破坏屏障并抛出 InterruptedException

      if (Thread.interrupted()) {

          breakBarrier();

          throw new InterruptedException();

      }

      // 减少 count 计数器的值,表示有一个线程到达了屏障

      int index = --count;

       

      // 如果 count 变为 0,这意味着所有线程都已经到达屏障

      if (index == 0) { // tripped

          boolean ranAction = false;

          try {

              // 如果设置了屏障动作(回调函数),则执行该动作

              final Runnable command = barrierCommand;

              if (command != null)

                  command.run();

              ranAction = true;

               

              // 开始新的屏障周期

              nextGeneration();

               

              // 返回 0 表示当前线程触发了屏障动作

              return 0;

          } finally {

              // 如果未执行屏障动作(回调函数),则破坏屏障

              if (!ranAction)

                  breakBarrier();

          }

      }

      // 循环等待其他线程到达

      for (;;) {

      //阻塞在condition上(trip是个condition),这样就能将lock释放出来,后面的线程可以继续争抢

          try {

              // 如果没有设置超时时间,则等待所有线程到达

              if (!timed)

                  trip.await();

              // 如果设置了超时时间,则等待所有线程到达或超时

              else if (nanos > 0L)

                  nanos = trip.awaitNanos(nanos);

          } catch (InterruptedException ie) {

              // 如果线程被中断且屏障仍然有效,则破坏屏障并抛出 InterruptedException

              if (g == generation && ! g.broken) {

                  breakBarrier();

                  throw ie;

              } else {

                  // 如果线程即将完成等待,即使它被中断,也会忽略中断标志并将中断标记传递给后续执行

                  Thread.currentThread().interrupt();

              }

          }

          // 如果屏障已经损坏,则抛出 BrokenBarrierException

          if (g.broken)

              throw new BrokenBarrierException();

          // 如果屏障周期已经改变,则返回当前线程的索引

          if (g != generation)

              return index;

          // 如果设置了超时并且超时时间已到,则破坏屏障并抛出 TimeoutException

          if (timed && nanos <= 0L) {

              breakBarrier();

              throw new TimeoutException();

          }

      }

  } finally {

      // 释放锁

      lock.unlock();

  }

}

其实可以看到上面的逻辑很简单,要是没到齐就先阻塞等待,要是到齐了就调用nextGeneration()去刷新轮次,这个方法里也就是一些资源的重置。



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。