CompletableFuture学习

最近在学习SimBot、HttpClient和Jsoup时遇到了CompletableFuture,所以决定学习一下

初识CompletableFuture

CompletableFuturejava.util.concurrent库在java 8中新增的主要工具,同传统的Future相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性

CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利

开启一个异步任务

supplyAsync

import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

public class SmallThread {

    /**
     * 睡眠
     *
     * @param millis
     */
    public static void sleepMillis(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 输出线程信息
     *
     * @param tag
     */
    public static void printTimeAndThread(String tag) {
        String result = new StringJoiner("\t|\t")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.valueOf(Thread.currentThread().getId()))
                .add(Thread.currentThread().getName())
                .add(tag)
                .toString();
        System.out.println(result);
    }

    public static void main(String[] args) {
        printTimeAndThread("小白进入餐厅");
        printTimeAndThread("小白点餐");

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            printTimeAndThread("初识炒菜");
            sleepMillis(200);
            printTimeAndThread("初始打饭");
            sleepMillis(100);
            return "饭做好了";
        });

        printTimeAndThread("小白打游戏");
        printTimeAndThread(cf1.join() + "小白吃饭");
    }

}

运行结果

1697419197424 | 1 | main | 小白进入餐厅
1697419197425 | 1 | main | 小白点餐
1697419197485 | 1 | main | 小白打游戏
1697419197485 | 12 | ForkJoinPool.commonPool-worker-1 | 初识炒菜
1697419197688 | 12 | ForkJoinPool.commonPool-worker-1 | 初始打饭
1697419197798 | 1 | main | 饭做好了小白吃饭

.runAsync

与supplyAsync方法的区别是:没有返回值

连接两个异步任务

.thenCompose

thenCompose链式调用表示完成这个任务之后的下一个任务

传入参数是Function<T,R>

import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;

public class SmallThread {

    /**
     * 睡眠
     *
     * @param millis
     */
    public static void sleepMillis(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 输出线程信息
     *
     * @param tag
     */
    public static void printTimeAndThread(String tag) {
        String result = new StringJoiner("\t|\t")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.valueOf(Thread.currentThread().getId()))
                .add(Thread.currentThread().getName())
                .add(tag)
                .toString();
        System.out.println(result);
    }

    public static void main(String[] args) {
        printTimeAndThread("小白进入餐厅");
        printTimeAndThread("小白点餐");

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            printTimeAndThread("厨师炒菜");
            sleepMillis(200);
            return "菜做好了";
        }).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
            printTimeAndThread("服务员打饭");
            sleepMillis(100);
            return s + ", 饭打好了";
        }));

        printTimeAndThread("小白打游戏");
        printTimeAndThread(cf1.join() + "小白吃饭");
    }

}

运行结果:

1697420176242 | 1 | main | 小白进入餐厅
1697420176242 | 1 | main | 小白点餐
1697420176294 | 12 | ForkJoinPool.commonPool-worker-1 | 厨师炒菜
1697420176294 | 1 | main | 小白打游戏
1697420176505 | 12 | ForkJoinPool.commonPool-worker-1 | 服务员打饭
1697420176616 | 1 | main | 菜做好了, 饭打好了小白吃饭

.thenComposeAsync

有点难描述(以后再补充)

合并两个异步任务

.thenCombine

thenCombine链式调用表示开启再开启一个线程去做别的任务,并且之后要对这几个线程的结果做处理。

import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

public class SmallThread {

    /**
     * 睡眠
     *
     * @param millis
     */
    public static void sleepMillis(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 输出线程信息
     *
     * @param tag
     */
    public static void printTimeAndThread(String tag) {
        String result = new StringJoiner("\t|\t")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.valueOf(Thread.currentThread().getId()))
                .add(Thread.currentThread().getName())
                .add(tag)
                .toString();
        System.out.println(result);
    }

    public static void main(String[] args) {
        printTimeAndThread("小白进入餐厅");
        printTimeAndThread("小白点餐");

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            printTimeAndThread("厨师炒菜");
            sleepMillis(200);
            return "菜做好了";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            printTimeAndThread("服务员蒸饭");
            sleepMillis(100);
            return "饭蒸好了";
        }), (s, s2) -> s + s2);

        printTimeAndThread("小白打游戏");
        printTimeAndThread(cf1.join() + "小白吃饭");
    }

}

运行结果:

1697420701948 | 1 | main | 小白进入餐厅
1697420701949 | 1 | main | 小白点餐
1697420701998 | 12 | ForkJoinPool.commonPool-worker-1 | 厨师炒菜
1697420701999 | 13 | ForkJoinPool.commonPool-worker-2 | 服务员蒸饭
1697420701999 | 1 | main | 小白打游戏
1697420702206 | 1 | main | 菜做好了饭蒸好了小白吃饭

thenAccetpBoth

与thenCombine的区别:接收前面两个任务的结果,但处理后没有返回值

runAfterBoth

与thenCombine的区别:不关心前两个任务的结果,并且没有返回值

部分总结

image-20231016110214490

任务的后置处理

.thenApply

虽然分为thenApply上下两部分代码,但是在CompletableFuture看来,这两个任务是一个线程要完成的任务,所以CompletableFuture会把这段代码封装成一个任务去交给一个线程去运行

import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

public class SmallThread {

    /**
     * 睡眠
     *
     * @param millis
     */
    public static void sleepMillis(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 输出线程信息
     *
     * @param tag
     */
    public static void printTimeAndThread(String tag) {
        String result = new StringJoiner("\t|\t")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.valueOf(Thread.currentThread().getId()))
                .add(Thread.currentThread().getName())
                .add(tag)
                .toString();
        System.out.println(result);
    }

    public static void main(String[] args) {
        printTimeAndThread("小白吃完了");
        printTimeAndThread("小白结账,要求开发票");

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            printTimeAndThread("服务员收款");
            sleepMillis(100);

            return "500元";
        }).thenApply(s -> {
            printTimeAndThread("服务员开发票");
            sleepMillis(200);
            return "500元发票";
        });

        printTimeAndThread("小白打电话");
        printTimeAndThread("小白拿到" + cf1.join());
    }

}

运行结果:

1697425830251 | 1 | main | 小白吃完了
1697425830252 | 1 | main | 小白结账,要求开发票
1697425830327 | 12 | ForkJoinPool.commonPool-worker-1 | 服务员收款
1697425830328 | 1 | main | 小白打电话
1697425830427 | 12 | ForkJoinPool.commonPool-worker-1 | 服务员开发票
1697425830642 | 1 | main | 小白拿到500元发票

.thenApplyAsync

CompletableFuture会把两段代码当成两个任务,交给不同的线程去运行,只不过第二个任务会等第一个任务执行完毕再去运行

thenAccept

与thenApply的区别是:会接收前面任务的结果,但自己的任务不需要提供返回值

thenRun

与thenApply的区别是:不会接收前面任务的结果,也不会提供返回值

获取最先完成的任务

.applyToEither

package ink.luckycat;

import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

public class SmallThread {

    /**
     * 睡眠
     *
     * @param millis
     */
    public static void sleepMillis(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 输出线程信息
     *
     * @param tag
     */
    public static void printTimeAndThread(String tag) {
        String result = new StringJoiner("\t|\t")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.valueOf(Thread.currentThread().getId()))
                .add(Thread.currentThread().getName())
                .add(tag)
                .toString();
        System.out.println(result);
    }

    public static void main(String[] args) {
        printTimeAndThread("小白吃完了");
        printTimeAndThread("小白坐公交");

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            printTimeAndThread("700");
            sleepMillis(300);

            return "700";
        }).applyToEither(CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                printTimeAndThread("800");
                sleepMillis(200);
                return "800";
            }
        }), s -> s);

        printTimeAndThread("小白坐" + cf2.join() + "号车");
    }

}

运行结果:

1697426313225 | 1 | main | 小白吃完了
1697426313225 | 1 | main | 小白坐公交
1697426313279 | 12 | ForkJoinPool.commonPool-worker-1 | 700
1697426313279 | 13 | ForkJoinPool.commonPool-worker-2 | 800
1697426313494 | 1 | main | 小白坐800号车

acceptEither

与applyToEither的区别:会得到最快完成的任务的结果,但是没有返回值

runAfterEither

与applyToEither的区别:既不接收其他任务的结果,也没有返回值

处理异常

.exceptionally

package ink.luckycat;

import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

public class SmallThread {

    /**
     * 睡眠
     *
     * @param millis
     */
    public static void sleepMillis(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 输出线程信息
     *
     * @param tag
     */
    public static void printTimeAndThread(String tag) {
        String result = new StringJoiner("\t|\t")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.valueOf(Thread.currentThread().getId()))
                .add(Thread.currentThread().getName())
                .add(tag)
                .toString();
        System.out.println(result);
    }

    public static void main(String[] args) {
        printTimeAndThread("小白吃完了");
        printTimeAndThread("小白坐公交");

//        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
//            printTimeAndThread("服务员收款");
//            sleepMillis(300);
//
//            return "500元";
//        }).thenApplyAsync(s -> {
//            printTimeAndThread("服务员开发票");
//            sleepMillis(200);
//            return s + "发票";
//        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            printTimeAndThread("700");
            sleepMillis(300);

            return "700";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            printTimeAndThread("800");
            sleepMillis(200);
            return "800";
        }), s -> {
            if ("800".equals(s)){
                throw new RuntimeException("撞树上了");
            }
            return s;
        }).exceptionally(throwable -> {
            printTimeAndThread("坐出租车");
            return "出租车";
        });

//        printTimeAndThread("小白打电话");
        printTimeAndThread("小白坐" + cf2.join() + "号车");
    }

}

运行结果:

1697426613784 | 1 | main | 小白吃完了
1697426613784 | 1 | main | 小白坐公交
1697426613832 | 12 | ForkJoinPool.commonPool-worker-1 | 700
1697426613832 | 13 | ForkJoinPool.commonPool-worker-2 | 800
1697426614047 | 13 | ForkJoinPool.commonPool-worker-2 | 坐出租车
1697426614047 | 1 | main | 小白坐出租车号车

handle

和exception的区别:如果前面的程序正常执行,则会继续正常执行;如果前面的程序出现异常,那么handle则会捕获异常;无论前面的程序是否正常执行,handle都会执行自己的最后一部分代码,返回一个结果,让后面的程序继续执行

whenComplete

和handle类似,区别就是不提供返回值