啥?用了并行流还更慢了

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

前言

Java 8给大家带来了一个非常便捷的多线程工具:并行流,一改往日Java多线程繁琐的编程规范,只需要一行代码,就可以让一个多线程跑起来,似乎让很多人忘记了被多线程支配的恐惧,这篇文章给大家分享一个真实的生产故障,由于在消费消息的处理器中使用了Java 8的并行流,导致集群消费消息的能力急速下降,造成线上消息堆积,引发故障。可能有朋友会好奇,到底是什么场景让并行流起了反作用?

并行流执行速度一定比串行快吗?

答案是:不一定

我把产生线上问题的代码抽象成下面的示例代码:

void testParallelStream() throws InterruptedException {
  ExecutorService threadPool = new ThreadPoolExecutor(5020020, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000),
    new ThreadFactoryBuilder().setNameFormat("test-parallel-thread").build(), new ThreadPoolExecutor.CallerRunsPolicy());

  Long time1 = System.currentTimeMillis();
  // 1. 多线程+foreach执行时长
  for (int i = 0; i < ARRAY_LENGTH; i++) {
    CommonExecutor commonExecutor = new CommonExecutor();
    commonExecutor.array = arrays[i];
    threadPool.submit(commonExecutor);
  }
  commonCountDownLatch.await();
  Long time2 = System.currentTimeMillis();
  System.out.println("for循环耗时: " + (time2 - time1));

  threadPool = new ThreadPoolExecutor(5020020, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000),
    new ThreadFactoryBuilder().setNameFormat("test-parallel-thread").build(), new ThreadPoolExecutor.CallerRunsPolicy());

  // 2. 多线程+并行流执行时长
  for (int i = 0; i < ARRAY_LENGTH; i++) {
    ParallelStreamExecutor parallelStreamExecutor = new ParallelStreamExecutor();
    parallelStreamExecutor.array = arrays[i];
    threadPool.submit(parallelStreamExecutor);
  }
  parallelCountDownLatch.await();
  Long time3 = System.currentTimeMillis();
  System.out.println("并行流耗时: " + (time3 - time2));
}

其中两次提交给线程池的执行器如下所示:

@Data
private static class CommonExecutor implements Runnable {

private long[] array;

  @Override
  public void run() {
   // 选择排序法进行排序
    for (int i = 0; i < array.length; i++) {
     array[i] = i * i;
    }
    commonCountDownLatch.countDown();
  }
}
@Data
private static class ParallelStreamExecutor implements Runnable {

private long[] array;

   @Override
   public void run() {
    // 选择排序法进行排序
    IntStream.range(0, array.length).parallel().forEach(i -> array[i] = i * i);
    parallelCountDownLatch.countDown();
   }
 }

这段代码的思路非常简单,就是对一个二维数组arrays的每一行,计算其列下标的平方数,并且回填到数组中,只不过这个过程是通过线程池去完成的,提交给线程池的执行器有两种,一种是普通的for循环,通过游标遍历每一个元素的下标,并计算平方数。另一种使用了并行流去完成同样的事情。简单起见,我们把这段代码循环执行10次,并统计了每次两种实现方式的耗时(单位是毫秒),大家可以猜猜看,到底哪种实现方式更快。

下面是真实的耗时记录:

执行次数 1 2 3 4 5 6 7 8 9 10 平均
for循环耗时 18 17 13 18 17 13 13 16 20 16 16.1
并行流耗时 32 41 38 59 51 34 53 57 49 47 46.1

执行的结果竟然是并行流的执行速度明显慢于for循环,到底是哪里出现问题了呢?

并行流的实现原理

其实问题就出现在并行流的实现上,同一个进程中提交给并行流的Action都会被同一个公共的线程池处理。也就是说上文构造的代码无论线程池threadPool的线程数开到多大,最终实际处理Action的线程数都由并行流的公共线程池大小决定,这一点我们可以从并行流的源码上看个大概:

@Override
@SuppressWarnings("unchecked")
public final S parallel() {
  sourceStage.parallel = true;
  return (S) this;
}

调用parallel只会将java.util.stream.AbstractPipeline中的sourceStage.parallel置为true,到调用foreach的时候,会调用到下面这个方法

final <R> evaluate(TerminalOp<E_OUT, R> terminalOp) {
  assert getOutputShape() == terminalOp.inputShape();
  if (linkedOrConsumed)
    throw new IllegalStateException(MSG_STREAM_LINKED);
  linkedOrConsumed = true;

  return isParallel()
    ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
    : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

这里的isParallel()就会判断上面设置的sourceStage.parallel字段,从而使程序的执行流程走到terminalOp.evaluateParallel这个分支,再往后跟的话会发现最终任务会提交到ForEachTask

@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
                                 Spliterator<S> spliterator)
 
{
  if (ordered)
    new ForEachOrderedTask<>(helper, spliterator, this).invoke();
  else
    new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
  return null;
}

ForEachTask这里稍微提一嘴,是java 1.7引入的一个轻量级多线程任务,逻辑还是比较多的,后面有机会我们再看下它的实现原理,通过断点跟进去,发现最后提交的任务都会调用到ForEachTaskcompute方法

public void compute() {
  // 以我们初始提交的任务为例spliterator的类型是一个RangeIntSpliterator,其中from = 0, upTo = 10000, last = 0
  Spliterator<S> rightSplit = spliterator, leftSplit;
  // estimateSize = upTo - from + last
  long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
  if ((sizeThreshold = targetSize) == 0L)
    // 目标大小会根据上文提到的公共线程池计算,值等于 sizeEstimate/线程池大小 * 4
    targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
  boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
  boolean forkRight = false;
  Sink<S> taskSink = sink;
  ForEachTask<S, T> task = this;
  // 【核心逻辑】进入任务切分逻辑,
  while (!isShortCircuit || !taskSink.cancellationRequested()) {
    // 切分直至子任务的大小小于阈值
    if (sizeEstimate <= sizeThreshold ||
        // trySplit()会将rightSplit等比例切分,并返回切分的第一个子任务,切分比例跟待切分的任务总数相关
        // 如果待切分的子任务大小小于等于1,则返回null,停止切分
        (leftSplit = rightSplit.trySplit()) == null) {
      task.helper.copyInto(taskSink, rightSplit);
      break;
    }
    ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
    task.addToPendingCount(1);
    ForEachTask<S, T> taskToFork;
    // 这里通过forkRight控制本线程切割任务的顺序是
    // 左->右->左->右->左->右直至子任务大小满足阈值,这样可以让整个任务执行更离散
    // 关于这样做的好处也欢迎大家在评论区讨论
    if (forkRight) {
      forkRight = false;
      rightSplit = leftSplit;
      taskToFork = task;
      task = leftTask;
    }
    else {
      forkRight = true;
      taskToFork = leftTask;
    }
    // 通过fork将将切分好的子任务提交到线程池
    taskToFork.fork();
    sizeEstimate = rightSplit.estimateSize();
  }
  task.spliterator = null;
  task.propagateCompletion();
}

这段代码是整个并行流实现的核心逻辑,其本质就是将刚开始提交的串行大任务切分成更小的任务提交到线程池,并行流的秘密就藏在这段代码中:

public final ForkJoinTask<V> fork() {
  Thread t;
  if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    ((ForkJoinWorkerThread)t).workQueue.push(this);
  else
    ForkJoinPool.common.externalPush(this);
  return this;
}

这里涉及到ForkJoinPool的一个设计,为了避免常规线程池中各个线程访问任务队列产生竞争,ForkJoinPool除了有一个公共的任务队列之外,每个线程自身还持有一个任务队列,外部线程需要提交任务到公共队列,线程池线程切分的更小的任务则直接提交到自身的工作队列中,因此就有了上面看到的这段逻辑。整个ForkJoinPool的逻辑如下图所示:

  • 其中提交给共享队列的线程会被内部工作线程偷取
  • 私有工作队列中的任务通过fork切分成小任务后会将子任务push回私有队列
  • 如果工作线程有空闲,他还可以去偷取其他工作队列的任务
啥?用了并行流还更慢了

至于ForkJoinPool.common,就是我们上文一直提及的公共线程池,其初始化方法在ForkJoinPool的静态调用块中调用

private static ForkJoinPool makeCommonPool() {

        final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =
                new CommonPoolForkJoinWorkerThreadFactory();
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                // 可以通过设置这个值来改变公共线程池的大小
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = commonPoolForkJoinWorkerThreadFactory;
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            // 获取线程池线程数量,其值等于当前可用处理器减一
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

这段代码比较简单,就是返回了一个固定的线程池,线程池大小默认等于可用处理器减一,这是因为在ForkJoinPool的设计中外部线程也是可以参与到执行子任务的,这个看似巧妙的设计其实很容易误用,尤其是遇到跟线程状态相关的全局变量时。

并行流比串行更慢的原因

在了解了并行流的实现原理后我们也就能理解为什么在文章开头,针对同一段逻辑,并行流的执行反而比串行慢了。

啥?用了并行流还更慢了
 
当在多线程场景下使用并行流时,由于并行流使用的是一个公共的线程池,所以无论外部有多少个线程,这些线程都会把任务提交给同一个线程池,所以你会发现,无论你怎么调整外面线程池的大小,都不能使任务加速。回到文章刚开始的例子,采用并行流的实现中真实的线程数为7,而采用串行的实现中真实的线程数为100,由于线程数差别巨大,因此造成了最终的耗时也有很明显的差距。

总结

并行流的设计是比较讨巧的,其中有三个地方容易踩坑

  • 同一个进程提交给并行流的任务都会被同一个公共线程池处理,因此,如果在多线程的环境中使用了并行流,反而会降低并发,使得处理变慢
  • 并行流的公共线程池大小为可用处理器减一,并且并行流会使用外部线程去处理内部子任务,搭配ThreadLocal使用的时候务必要慎重,在一些与ThreadLocal强耦合的场景,可能会导致ThreadLocal误清理,其他线程相关的全局变量同理
  • 并行流的设计是为了应对计算密集型的场景的,如果有较多的IO场景,比如常见的RPC调用,在高并发的场景下会导致外部线程阻塞,引起外部线程数增多,且这类问题在测试的时候不容易发现,极易引起生产故障。

END

推荐好文

>>【练手项目】基于SpringBoot的ERP系统,自带进销存+财务+生产功能

>>分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!

>>能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!

啥?用了并行流还更慢了

原文始发于微信公众号(Java知音):啥?用了并行流还更慢了