Java并行流的使用及速度比较

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

前言

当我第一次听到流这个概念时,对他模糊不清,后来逐步使用后,才知晓流的含义,而当再次听说还有并行流的时候,那就更是一头雾水,所以今天来介绍一下并行流,首先还需要知道什么是流,用一段话概括就是:依次处理集合中的数据,这里的依次不是说依次遍历集合中的元素,然后处理,而是依次安不同的方式处理集合中的数据,比如操作某个集合,首先找出大于5的数,对其构建出新的集合,并且打印出来,那么使用流就可以这样做:

fun main() {
    val randoms = Stream
    .generate { Random().nextInt(10) }
    .limit(10)
    .collect(Collectors.toList())
    
    randoms.stream()
    .filter { it>5 }
    .collect(Collectors.toList())
    .forEach (System.out::println)
}

这是两段代码,第一段用来生成10个0-9的随机数,第二个将完成上述我们的需求。

而并行流,可以理解为多线程处理流中的数据。

基本用法

我们还拿上面的例子来说,转换为并行流也比较简单,调用parallel即可。

randoms.stream()
.parallel()
.filter { it>5 }
.collect(Collectors.toList())
.forEach (System.out::println)

但是貌似这样做,体会不到有什么不一样的地方呀,我们可以打印出他的线程名称,

 randoms.stream()
 .parallel()
 .filter {
     println(Thread.currentThread().name);
     it > 5
 }
 .collect(Collectors.toList())
 .forEach(System.out::println)

输出结果如下:

main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-6
6
7
7
8
9

可以发现在通过filter过滤的时候,会有很多线程来处理,而且也包括main线程。

而并行流是使用ForkJoin来管理线程的,默认线程数是处理器的数量,可以通过下面方式获取,在我电脑上,他是8。

println(Runtime.getRuntime().availableProcessors())

除此之外,还可以通过下面参数更改,但他是全局性的,我们无法对不同流设置设置不同线程数量。

-D java.util.concurrent.ForkJoinPool.common.parallelism=4

速度测试

ForkJoin的思想是将一个大任务拆分成若干个小任务,直到不可再拆时,再将一个个小的任务运算的结果进行汇总,这也是ForkJoin名称的由来,Fork是分支的意思,Join则是汇总。

下面我们看下他们的速度,由于10个太小了,换为1百万个。

fun main() {
    val randoms = Stream.generate { Random().nextInt(10) }.limit(1000000).collect(Collectors.toList())

    runTest {
        randoms.stream().filter { it > 5 }.collect(Collectors.toList())
    }

    runTest {
        var mutableListOf = mutableListOf<Int>()
        randoms.forEach {
            if (it > 5) {
                mutableListOf.add(it)
            }
        }
    }

    runTest {
        randoms.stream().parallel().filter { it > 5 }.collect(Collectors.toList())
    }
}

fun runTest(function: () -> Unit) {
    val start = System.currentTimeMillis()
    function()
    println(System.currentTimeMillis() - start)

}
普通流 并行流 普通方法
20 52 25

这都是运行好几次取他们最大的,可以发现,普通流最快,并行流最慢,并行流慢的原因也很简单,原因是并行流解决问题总是要比顺序流的工作多。并行流开销涉到将工作分配到多个线程,并且进行结果和并。

另一方面使用多个线程处理时,还需要初始化线程池。此外,如果有竞争,并行处理的性能可能会持续下降。

但如果再把数据量扩大几倍,结果就会反过来,比如当随机数是1亿时,结果如下。

普通流 并行流 普通方法
957 789 1082

另外,当我们操作网络时候,这有是另一回事了,比如下面这样。

object TestStream {
    @JvmStatic
    fun download(url: String) {
        var bytes = URL(StringBuffer(url).append("?id="+Random().nextInt(10000)).toString()).openStream().readBytes()
        Files.write(Paths.get("/home/HouXinLin/pic", UUID.randomUUID().toString()), bytes)
    }
}

fun main() {
    val randoms = mutableListOf(
        "https://desk-fd.zol-img.com.cn/t_s1920x1080c5/g7/M00/01/00/ChMkLGJGYbyIbjIrABQ1DysNhHMAACEDwLHNV8AFDUn770.jpg",
        "https://desk-fd.zol-img.com.cn/t_s1920x1080c5/g7/M00/01/00/ChMkK2JGYcOIPDZqAB868hHvfqcAACEEAB3iKYAHzsK046.jpg",
        "https://desk-fd.zol-img.com.cn/t_s1920x1080c5/g5/M00/0C/05/ChMkJ14dLNeIfJBuAAZKuwc_TagAAwWUAJPWVAABkrT442.jpg",
        "https://desk-fd.zol-img.com.cn/t_s1920x1080c5/g5/M00/0C/05/ChMkJl4dNbmIS-1MAAYHKJ-tub0AAwWXACrfHkABgdA472.jpg",
        "https://desk-fd.zol-img.com.cn/t_s1920x1080c5/g7/M00/01/00/ChMkLGJGYbyIbjIrABQ1DysNhHMAACEDwLHNV8AFDUn770.jpg",
        "https://desk-fd.zol-img.com.cn/t_s1920x1080c5/g7/M00/01/00/ChMkK2JGYcOIPDZqAB868hHvfqcAACEEAB3iKYAHzsK046.jpg",
    )

    var threadPoolExecutor = ThreadPoolExecutor(441, TimeUnit.HOURS, LinkedBlockingQueue())

    runTest {
        randoms.stream().parallel().forEach(TestStream::download)
    }

    runTest {
        randoms.stream().parallel().forEach {
            threadPoolExecutor.execute { TestStream.download(it) }
        }
        threadPoolExecutor.shutdown()
        threadPoolExecutor.awaitTermination(1, TimeUnit.HOURS)
    }

    runTest {
        randoms.stream().forEach(TestStream::download)
    }

}

fun runTest(function: () -> Unit) {
    val start = System.currentTimeMillis()
    function()
    println(System.currentTimeMillis() - start)

}

这其实就是比那种方法管理的线程池最合适了,肯定最后一种方式最慢,但这个测试还需要网络原因,家里网不稳定,一会快一会慢,没办法给出并行流快还是自定义线程池快,另外这也是需要看情况的,用不用并行流或者还是使用自定义线程池也说不准,最好办法还是多测试。

这也与任务类型有关,比如是CPU密集型还是IO密集型,总之,并行编程,难!

- END -


原文始发于微信公众号(十四个字节):Java并行流的使用及速度比较