【JAVA多线程】JDK线程同步工具:Phaser

_BugMan 2024-09-07 10:05:06 阅读 70

目录

1.干什么的?

2.代码示例

2.1.线程间的协作

2.2.协作API

2.3.动态调整线程数

3.树形结构

4.state

5.Treiber Stack

6.核心源码


1.干什么的?

线程的同步无非就Semaphore、CountDownLatch、CyclicBarrier去应付的三大类情况了,其中线程之间有协作关系的是CountDownLatch、CyclicBarrier去对付的情况。线程间有协作关系的场景里线程数量是可能需要动态调整的,尤其是CyclicBarrier要面对的分阶段执行的场景,这个阶段可能是10条线程,下个阶段可能需要8条线程......Phaser就是用来干这事儿的。

Phaser:

可以实现CountDownLatch、CyclicBarrier需要应付的线程之间协作的场景,也就是说可以实现这两者的效果。

可以动态调整线程数量(核心能力)

除了以上外,Phaser还有一个核心点:

支持任务之间存在前后依赖关系,比如B任务依赖于A任务的结果,支持B任务在A任务之前执行。

2.代码示例

2.1.线程间的协作

phaser实现CountDownLatch的效果,一条线程等待其它线程执行完:

<code>public static void main(String[] args) throws InterruptedException {

       phaserDemo();

  }

   private static void phaserDemo1() {

       Phaser phaser = new Phaser(10);

       Thread threadParent = new Thread(() -> {

           //主线调用awaitAdvance程阻塞在当前轮次

           System.out.println(Thread.currentThread().getName()+"......awaitAdvance");

           phaser.awaitAdvance(phaser.getPhase());

           System.out.println(Thread.currentThread().getName()+"......wakeUp");

      });

       Thread threadChild = new Thread(() -> {

           //10条子线程随机到达

           ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 2L, TimeUnit.MINUTES, new ArrayBlockingQueue(10));

           for (int i = 0; i < threadPoolExecutor.getCorePoolSize(); i++) {

               threadPoolExecutor.execute(new Runnable() {

                   @Override

                   public void run() {

                       try {

                           Thread.sleep(1000);

                           System.out.println(Thread.currentThread().getName() + "......arrive");

                           //子线程到达

                           phaser.arrive();

                      } catch (InterruptedException e) {

                           throw new RuntimeException(e);

                      }

                  }

              });

          }

           //关闭线程池,不然程序不会退出

           threadPoolExecutor.shutdown();

      });

       threadParent.start();

       threadChild.start();

  }

执行结果:

phaser实现CyclicBarrier的效果,线程之间相互协作:

<code>private static void phaserDemo2(){

      Phaser phaser = new Phaser(10);

      ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 2L, TimeUnit.MINUTES, new ArrayBlockingQueue(10));

      for(int i=0;i<threadPoolExecutor.getCorePoolSize();i++){

          threadPoolExecutor.execute(new Runnable() {

              @Override

              public void run() {

                  try {

                      Thread.sleep(1000);

                      //线程之间相互到达、等待

                      System.out.println("phase"+phaser.getPhase()+"......"+Thread.currentThread().getName()+"......arriveAndAwaitAdvance");

                      phaser.arriveAndAwaitAdvance();

                      System.out.println(Thread.currentThread().getName()+"......wakeUp");

                  } catch (InterruptedException e) {

                      throw new RuntimeException(e);

                  }

              }

          });

      }

      for(int i=0;i<threadPoolExecutor.getCorePoolSize();i++){

          threadPoolExecutor.execute(new Runnable() {

              @Override

              public void run() {

                  try {

                      Thread.sleep(1000);

                      //线程之间相互到达、等待

                      System.out.println("phase"+phaser.getPhase()+"......"+Thread.currentThread().getName()+"......arriveAndAwaitAdvance");

                      phaser.arriveAndAwaitAdvance();

                      System.out.println(Thread.currentThread().getName()+"......wakeUp");

                  } catch (InterruptedException e) {

                      throw new RuntimeException(e);

                  }

              }

          });

      }

      threadPoolExecutor.shutdown();

  }

执行结果:

2.2.协作API

arrive和awaitAdvance

arrive,线程已到达

awaitAdvance,线程阻塞在当前轮次

arriveAndAwaitAdvance

线程到达并等待其它参与协作的线程全部到达

arriveAndDeregister

线程到达,并从phaser中注销,即协作的线程数量-1

2.3.动态调整线程数

phaser可以动态的调整参与协作的线程数量是其核心能力,调整的方式有如下:

注册新参与者: 使用register() 方法来将协作线程数+1

取消注册参与者: 使用arriveAndDeregister() 方法来表示当前线程到达并且phaser的协作线程数-1

手动调整: 使用 bulkRegister(int parties) 方法一次注册多个参与者。 使用 bulkArriveAndDeregister(int parties) 方法一次取消注册多个参与者。

查询参与者状态: 使用 getRegisteredParties() 获取注册的参与者数量。 使用 getArrivedParties() 获取已到达当前阶段的参与者数量。

以上都是Phaser的API,可以在任何阶段来进行线程的调整。

3.树形结构

前面我们说过了Phaser有一个核心能力是支持任务之间的前后依赖关系,如B任务依赖于A任务,那么A任务在B任务之前执行。

再说清楚一点:

Phaser支持在单轮次里用依赖关系来控制任务的执行顺序!

如何实现喃?其实很容易能想到,链表关系,但是链表关系很明显只支持单依赖,没办法支持多依赖,用树形结构就能支持多依赖了,B任务和C任务依赖于A任务,那么A任务是父节点,B任务和C任务是子节点。

Phaser允许组成树形结构:

<code>Phaser phaser0=new Phaser(1);

Phaser phaser1 = new Phaser(phaser0,1);

Phaser phaser2 = new Phaser(phaser0,1);

Phaser phaser3 = new Phaser(phaser1,1);

Phaser phaser4 = new Phaser(phaser2,1);

Phaser phaser5 = new Phaser(phaser2,1);

再推深一点:

先从构造方法进去,可以看到所有构造本质上都是调用的一个构造方法:

<code>public Phaser(int parties) {

  this(null, parties);

}

这个构造方法里会完成state和Treiber Stack的初始化:

这里我们可以看到Phaser是允许传一个父Phaser进构造方法来组成树形结构的,不管组成树状结构与否,全局都用的一个Treiber Stack和state。

所以我们更能理解了,树形结构的存在就是为了以依赖关系来控制执行顺序,父子之间只有执行顺序的差异,资源都是共用的一套。

4.state

其中轮数叫phase。

Phaser给出了一堆方法用来获取这四部分的值:

5.Treiber Stack

phaser并没有依托于AQS来实现,所以自己实现了一套完整的线程阻塞唤醒逻辑,被阻塞的线程放在Treiber Stack中。

Treiber Stack是R.Kent Treibe r在其于1986年发表的论文Systems Programming:Coping with Para llelism 中首次提出。说白了就是一个用链表实现的栈。

Phaser内部实现了Treiber Stack。因为是链表结构,所以有Node节点,节点里面存线程:

由于是栈结构,所以Phaser里面只有一个头指针,永远的指向栈顶,只是为了减少并发冲突,这里定义了2个链表,也就是2个Treiber Sta ck。当phase为奇数轮的时候,阻塞线程放在oddQ里面;当phase为偶 数轮的时候,阻塞线程放在evenQ里面:

6.核心源码

核心源码无非就是阻塞等待的awaitAdvance方法和线程到达的arrive方法。

先来看arrive方法,arrive方法会去调用doArrive方法,默认传参ONE_ARRIVAL为1:

叫大模型帮我们给doArrive方法加上注释来读一下:

<code>private int doArrive(int adjust) {

   final Phaser root = this.root;

   for (;;) {

       // 如果当前对象是根节点,则直接获取 state 字段;

       // 否则调用 reconcileState() 方法来获取正确的状态。

       long s = (root == this) ? state : reconcileState();

       // 提取当前阶段的值。

       int phase = (int)(s >>> PHASE_SHIFT);

       // 如果阶段小于 0,这通常意味着发生了错误,直接返回阶段值。

       if (phase < 0)

           return phase;

       // 获取状态中的计数部分。

       int counts = (int)s;

       // 计算未到达的参与者数量。

       int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);

       // 如果没有未到达的参与者,则抛出异常。

       if (unarrived <= 0)

           throw new IllegalStateException(badArrive(s));

       // 使用 CAS 更新状态。adjust 参数决定了状态应该如何被调整。

       if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {

           // 如果只有一个未到达的参与者,则执行以下逻辑:

           if (unarrived == 1) {

               // 获取下一阶段的基础状态。

               long n = s & PARTIES_MASK;

               // 提取下一阶段未到达的参与者数量。

               int nextUnarrived = (int)n >>> PARTIES_SHIFT;

               // 如果当前对象是根节点,则进行以下操作:

               if (root == this) {

                   // 如果 onAdvance 返回 true,则设置终止标志。

                   if (onAdvance(phase, nextUnarrived))

                       n |= TERMINATION_BIT;

                   // 如果下一阶段没有未到达的参与者,则设置空闲标志。

                   else if (nextUnarrived == 0)

                       n |= EMPTY;

                   // 否则,设置下一阶段未到达的参与者数量。

                   else

                       n |= nextUnarrived;

                   // 更新下一阶段的阶段号。

                   int nextPhase = (phase + 1) & MAX_PHASE;

                   n |= (long)nextPhase << PHASE_SHIFT;

                   // 再次使用 CAS 更新状态。

                   UNSAFE.compareAndSwapLong(this, stateOffset, s, n);

                   // 调用 releaseWaiters 方法释放等待者。

                   releaseWaiters(phase);

              }

               // 如果当前对象不是根节点且下一阶段没有未到达的参与者,则递归调用父节点的 doArrive 方法,并设置空闲标志。

               else if (nextUnarrived == 0) {

                   phase = parent.doArrive(ONE_DEREGISTER);

                   UNSAFE.compareAndSwapLong(this, stateOffset,

                                             s, s | EMPTY);

              }

               // 否则,递归调用父节点的 doArrive 方法。

               else

                   phase = parent.doArrive(ONE_ARRIVAL);

          }

           // 返回当前阶段号。

           return phase;

      }

  }

}

对上面的源码进行一下总结:

未到达者unarrived==1,说明当前这条线程就是最后一条未到达线程,所以会进行一些列的资源操作,推进到下一轮。

其中releaseWaiters方法会去唤醒阻塞在当前轮次的线程,也就是调用awaitAdvance方法的线程。

先去让父Phaser去doArrive,因为Phaser的树形结构的存在是为了满足任务之间有依赖的情况,设计上是子Phaser对父Phaser有依赖,在组成树形结构的时候,依赖者为子,被依赖者为父。这样能在单轮次里面绝对控制住依赖关系的先后执行。

awaitAdvance的逻辑也不复杂,核心就是将线程加入Treiber Stack,然后阻塞该线程:

<code>private int internalAwaitAdvance(int phase, QNode node) {

  // assert root == this; // 确保当前对象是根节点

  releaseWaiters(phase-1);         // 清理旧的等待队列

  boolean queued = false;           // 表示节点是否已经被加入到等待队列中

  int lastUnarrived = 0;           // 用于增加自旋次数的变量

  int spins = SPINS_PER_ARRIVAL;   // 初始自旋次数

  long s;

  int p;

  while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {

      // 如果 node 为空,则表示当前线程正在以不可中断模式自旋等待

      if (node == null) {

          int unarrived = (int)s & UNARRIVED_MASK;

          if (unarrived != lastUnarrived &&

              (lastUnarrived = unarrived) < NCPU)

              spins += SPINS_PER_ARRIVAL; // 增加自旋次数

          boolean interrupted = Thread.interrupted(); // 检查线程是否被中断

          if (interrupted || --spins < 0) { // 如果线程被中断或自旋次数耗尽

              node = new QNode(this, phase, false, false, 0L);

              node.wasInterrupted = interrupted; // 创建 QNode 并记录中断状态

          }

      }

      // 如果 node 不为空并且可以被释放,则退出循环

      else if (node.isReleasable())

          break;

      // 如果 node 还没有被加入到队列中

      else if (!queued) {

          AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; // 获取当前阶段的队列头

          QNode q = node.next = head.get(); // 将 node 加入到队列中

          if ((q == null || q.phase == phase) &&

              (int)(state >>> PHASE_SHIFT) == phase) // 避免加入过时的节点

              queued = head.compareAndSet(q, node); // 尝试将 node 设置为队列头

      }

      // 如果 node 已经加入到队列中,则阻塞当前线程

      else {

          try {

              ForkJoinPool.managedBlock(node);

          } catch (InterruptedException ie) {

              node.wasInterrupted = true; // 如果线程被中断,则记录中断状态

          }

      }

  }

  // 如果 node 不为空

  if (node != null) {

      if (node.thread != null)

          node.thread = null; // 清理 thread 引用

      if (node.wasInterrupted && !node.interruptible)

          Thread.currentThread().interrupt(); // 如果线程被中断且不可中断,则重新设置中断标志

      if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)

          return abortWait(phase); // 如果阶段没有变化,则可能需要清理

  }

  releaseWaiters(phase); // 清理等待队列

  return p; // 返回当前阶段号

}

注意:

在前面聊Treiber的时候只说了它的栈的结构,但是没有说线程的阻塞,这里可以看到线程是通过调用ForkJoinPool.managedBlock来阻塞的,这里用ForkJoinPool来阻塞线程,并不是为了用到ForkJoinPool的高效调度能力,毕竟在这里所有线程都不是放在一个ForkJoinPool里的,只是用managedBlock将线程封装成了ForkJoinWorkerThread类型而已,ForkJoinWorkerThread能支持对中断进行响应,仅此而已,这里不要被迷惑了,觉得用到了ForkJoinPool核心目的是为了加快资源调度,其实不是这样的。



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。