并发执行n个异步任务并获取它们的返回值,怎么实现?

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

在开发企业应用时,有如下这种常见的业务场景:

假如现在有100个task,平均每个task执行耗时约为50ms,如果是串行执行,则100个task将耗时5s,这对于一个性能要求比较高的接口来说是灾难。


因此,最简单的改造方式是并发执行。但我们要获取到这100个task的执行结果(少一个都不行)才能执行后续操作,然而并发执行是异步的,如果不加以控制的话是不太可能做到这一点的。如果让你去开发这个需求,你如何实现?


我总结了如下四种方式:

  1. 通过Thread+FutureTask+Callable Task

  2. 通过线程池+Callable Task

  3. 通过CountDownLatch+Runnable Task(建议)

  4. 通过CompletionService+Callable Task(强烈建议)


这里我建议采用第3、4两种方式,效果更高一点。而1、2两种方式其实原理是一样的,只不过代码写起来不一样而已,线程池的submit(Callable)方法其实也是利用了FutureTask。

至于1、2两种方式效率低的原因下面会有说明。


1、通过Thread+FutureTask+Callable Task

定义Callable Task:

public class CallableTask implements Callable<String> {
    private int id;
    public CallableTask(int id){
        this.id = id;
    }

    @Override
    public String call() throws Exception {
        Thread.sleep(50);
        return "success"+id;
    }
}


核心代码(并发执行n个异步任务并获取它们的返回值):

public class AsyncTaskByFutureTask {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<FutureTask> ftList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            FutureTask<String> ft = new FutureTask<>(new CallableTask(i));
            new Thread(ft).start();
            ftList.add(ft);
        }

        // 按顺序从FutureTask中拿到返回值并打印
        for (FutureTask ft : ftList) {
            System.out.println(ft.get());
        }
    }
}


输出结果:

success0
success1
success2
success3
success4
success5
success6
success7
success8
success9


这里我们发现,无论执行多少,次,输出结果的顺序都是success0到success9,这个我们不难得出答案:因为FutureTask是按顺序添加到list中的。


但这样会导致:如果5号task比0号task先执行完,而0号task由于还没执行完阻塞在ft.get()方法,这样就白白浪费了等待的时间,这也是1、2两种方式效率低的原因。


而下面要讲的CompletionService,就利用任务完成队列解决了这个问题:任务一完成,就添加到任务完成队列中,然后我们获取结果时,就从任务队列中去取,这样的话谁先完成任务谁就会被优先拿到,不会浪费时间。


2、通过线程池+Callable Task

核心代码(并发执行n个异步任务并获取它们的返回值):

public class AsyncTaskByFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(10);
        List<Future> fList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Future<String> f = es.submit(new CallableTask(i));
            fList.add(f);
        }
        for (Future ft : fList) {
            System.out.println(ft.get());
        }
        es.shutdown();
    }
}


这种方式与方式1原理一样,输出结果也一样,但利用了线程池,更加优雅一点。


3、通过CountDownLatch+Runnable Task

定义Runnable Task:

public class RunnableTask implements Runnable {
    private int id;
    private CountDownLatch latch;
    private List<String> list;
    public RunnableTask(int id, CountDownLatch latch, List<String> list){
        this.id = id;
        this.latch = latch;
        this.list = list;
    }

    @Override
    public void run() 
{
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        list.add("success"+id);
        latch.countDown();
    }
}


核心代码(并发执行n个异步任务并获取它们的返回值):

public class AsyncTaskByCountDownLatch {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(10);
        ExecutorService es = Executors.newFixedThreadPool(10);

        // 这里必须要用线程安全的List
        List<String> list = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 10; i++) {
            es.submit(new RunnableTask(i,latch,list));
        }
        // 程序将在这里阻塞,当CountDownLatch减为0(即所有任务都执行完毕,并将返回值添加到list中)就被唤醒
        latch.await();
        list.forEach(System.out::println);
        es.shutdown();
    }
}


这种方式是利用了Runnable Task而非Callable Task。


4、通过CompletionService+Callable Task

核心代码(并发执行n个异步任务并获取它们的返回值):

public class AsyncTaskByCompletionService {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService es = Executors.newFixedThreadPool(10);
        CompletionService<String> completionService = new ExecutorCompletionService<>(es);

        for (int i = 0; i < 10; i++) {
            completionService.submit(new CallableTask(i));
        }
        List<String> res = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Future<String> f = completionService.take();
            String s = f.get();
            res.add(s);
        }
        res.forEach(System.out::println);
        es.shutdown();
    }
}


输出结果:

success8
success6
success5
success4
success0
success2
success7
success1
success9
success3


可以看到输出结果并不是success0到success9,这正是CompletionService不会按顺序去拿任务的返回值的体现。


这里对CompletionService做一个简单的介绍:

ExecutorCompletionService是CompletionService的一个实现,它是Executor和BlockingQueue功能的融合体,Executor完成计算任务,BlockingQueue(即任务完成队列)负责保存异步任务的执行结果。


上面程序中的completionService.take方法将会阻塞,直到有返回值返回。take方法会从CompletionService的任务完成队列中去拿最先完成的任务的返回值,而不是按顺序去取。


原文始发于微信公众号(初心JAVA):并发执行n个异步任务并获取它们的返回值,怎么实现?