CompletableFuture学习
最近在学习SimBot、HttpClient和Jsoup时遇到了CompletableFuture,所以决定学习一下
初识CompletableFuture
CompletableFuture
是java.util.concurrent
库在java 8
中新增的主要工具,同传统的Future
相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性
CompletableFuture
实现了CompletionStage
接口和Future
接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future
组合处理的能力,使Java
在处理多任务的协同工作时更加顺畅便利
开启一个异步任务
supplyAsync
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
public class SmallThread {
/**
* 睡眠
*
* @param millis
*/
public static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* 输出线程信息
*
* @param tag
*/
public static void printTimeAndThread(String tag) {
String result = new StringJoiner("\t|\t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
public static void main(String[] args) {
printTimeAndThread("小白进入餐厅");
printTimeAndThread("小白点餐");
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("初识炒菜");
sleepMillis(200);
printTimeAndThread("初始打饭");
sleepMillis(100);
return "饭做好了";
});
printTimeAndThread("小白打游戏");
printTimeAndThread(cf1.join() + "小白吃饭");
}
}
运行结果:
1697419197424 | 1 | main | 小白进入餐厅
1697419197425 | 1 | main | 小白点餐
1697419197485 | 1 | main | 小白打游戏
1697419197485 | 12 | ForkJoinPool.commonPool-worker-1 | 初识炒菜
1697419197688 | 12 | ForkJoinPool.commonPool-worker-1 | 初始打饭
1697419197798 | 1 | main | 饭做好了小白吃饭
.runAsync
与supplyAsync方法的区别是:没有返回值
连接两个异步任务
.thenCompose
thenCompose链式调用表示完成这个任务之后的下一个任务
传入参数是Function<T,R>
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;
public class SmallThread {
/**
* 睡眠
*
* @param millis
*/
public static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* 输出线程信息
*
* @param tag
*/
public static void printTimeAndThread(String tag) {
String result = new StringJoiner("\t|\t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
public static void main(String[] args) {
printTimeAndThread("小白进入餐厅");
printTimeAndThread("小白点餐");
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("厨师炒菜");
sleepMillis(200);
return "菜做好了";
}).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
printTimeAndThread("服务员打饭");
sleepMillis(100);
return s + ", 饭打好了";
}));
printTimeAndThread("小白打游戏");
printTimeAndThread(cf1.join() + "小白吃饭");
}
}
运行结果:
1697420176242 | 1 | main | 小白进入餐厅
1697420176242 | 1 | main | 小白点餐
1697420176294 | 12 | ForkJoinPool.commonPool-worker-1 | 厨师炒菜
1697420176294 | 1 | main | 小白打游戏
1697420176505 | 12 | ForkJoinPool.commonPool-worker-1 | 服务员打饭
1697420176616 | 1 | main | 菜做好了, 饭打好了小白吃饭
.thenComposeAsync
有点难描述(以后再补充)
合并两个异步任务
.thenCombine
thenCombine链式调用表示开启再开启一个线程去做别的任务,并且之后要对这几个线程的结果做处理。
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
public class SmallThread {
/**
* 睡眠
*
* @param millis
*/
public static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* 输出线程信息
*
* @param tag
*/
public static void printTimeAndThread(String tag) {
String result = new StringJoiner("\t|\t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
public static void main(String[] args) {
printTimeAndThread("小白进入餐厅");
printTimeAndThread("小白点餐");
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("厨师炒菜");
sleepMillis(200);
return "菜做好了";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
printTimeAndThread("服务员蒸饭");
sleepMillis(100);
return "饭蒸好了";
}), (s, s2) -> s + s2);
printTimeAndThread("小白打游戏");
printTimeAndThread(cf1.join() + "小白吃饭");
}
}
运行结果:
1697420701948 | 1 | main | 小白进入餐厅
1697420701949 | 1 | main | 小白点餐
1697420701998 | 12 | ForkJoinPool.commonPool-worker-1 | 厨师炒菜
1697420701999 | 13 | ForkJoinPool.commonPool-worker-2 | 服务员蒸饭
1697420701999 | 1 | main | 小白打游戏
1697420702206 | 1 | main | 菜做好了饭蒸好了小白吃饭
thenAccetpBoth
与thenCombine的区别:接收前面两个任务的结果,但处理后没有返回值
runAfterBoth
与thenCombine的区别:不关心前两个任务的结果,并且没有返回值
部分总结
任务的后置处理
.thenApply
虽然分为thenApply上下两部分代码,但是在CompletableFuture看来,这两个任务是一个线程要完成的任务,所以CompletableFuture会把这段代码封装成一个任务去交给一个线程去运行
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
public class SmallThread {
/**
* 睡眠
*
* @param millis
*/
public static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* 输出线程信息
*
* @param tag
*/
public static void printTimeAndThread(String tag) {
String result = new StringJoiner("\t|\t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
public static void main(String[] args) {
printTimeAndThread("小白吃完了");
printTimeAndThread("小白结账,要求开发票");
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("服务员收款");
sleepMillis(100);
return "500元";
}).thenApply(s -> {
printTimeAndThread("服务员开发票");
sleepMillis(200);
return "500元发票";
});
printTimeAndThread("小白打电话");
printTimeAndThread("小白拿到" + cf1.join());
}
}
运行结果:
1697425830251 | 1 | main | 小白吃完了
1697425830252 | 1 | main | 小白结账,要求开发票
1697425830327 | 12 | ForkJoinPool.commonPool-worker-1 | 服务员收款
1697425830328 | 1 | main | 小白打电话
1697425830427 | 12 | ForkJoinPool.commonPool-worker-1 | 服务员开发票
1697425830642 | 1 | main | 小白拿到500元发票
.thenApplyAsync
CompletableFuture会把两段代码当成两个任务,交给不同的线程去运行,只不过第二个任务会等第一个任务执行完毕再去运行
thenAccept
与thenApply的区别是:会接收前面任务的结果,但自己的任务不需要提供返回值
thenRun
与thenApply的区别是:不会接收前面任务的结果,也不会提供返回值
获取最先完成的任务
.applyToEither
package ink.luckycat;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
public class SmallThread {
/**
* 睡眠
*
* @param millis
*/
public static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* 输出线程信息
*
* @param tag
*/
public static void printTimeAndThread(String tag) {
String result = new StringJoiner("\t|\t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
public static void main(String[] args) {
printTimeAndThread("小白吃完了");
printTimeAndThread("小白坐公交");
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("700");
sleepMillis(300);
return "700";
}).applyToEither(CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
printTimeAndThread("800");
sleepMillis(200);
return "800";
}
}), s -> s);
printTimeAndThread("小白坐" + cf2.join() + "号车");
}
}
运行结果:
1697426313225 | 1 | main | 小白吃完了
1697426313225 | 1 | main | 小白坐公交
1697426313279 | 12 | ForkJoinPool.commonPool-worker-1 | 700
1697426313279 | 13 | ForkJoinPool.commonPool-worker-2 | 800
1697426313494 | 1 | main | 小白坐800号车
acceptEither
与applyToEither的区别:会得到最快完成的任务的结果,但是没有返回值
runAfterEither
与applyToEither的区别:既不接收其他任务的结果,也没有返回值
处理异常
.exceptionally
package ink.luckycat;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
public class SmallThread {
/**
* 睡眠
*
* @param millis
*/
public static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* 输出线程信息
*
* @param tag
*/
public static void printTimeAndThread(String tag) {
String result = new StringJoiner("\t|\t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
public static void main(String[] args) {
printTimeAndThread("小白吃完了");
printTimeAndThread("小白坐公交");
// CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
// printTimeAndThread("服务员收款");
// sleepMillis(300);
//
// return "500元";
// }).thenApplyAsync(s -> {
// printTimeAndThread("服务员开发票");
// sleepMillis(200);
// return s + "发票";
// });
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("700");
sleepMillis(300);
return "700";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
printTimeAndThread("800");
sleepMillis(200);
return "800";
}), s -> {
if ("800".equals(s)){
throw new RuntimeException("撞树上了");
}
return s;
}).exceptionally(throwable -> {
printTimeAndThread("坐出租车");
return "出租车";
});
// printTimeAndThread("小白打电话");
printTimeAndThread("小白坐" + cf2.join() + "号车");
}
}
运行结果:
1697426613784 | 1 | main | 小白吃完了
1697426613784 | 1 | main | 小白坐公交
1697426613832 | 12 | ForkJoinPool.commonPool-worker-1 | 700
1697426613832 | 13 | ForkJoinPool.commonPool-worker-2 | 800
1697426614047 | 13 | ForkJoinPool.commonPool-worker-2 | 坐出租车
1697426614047 | 1 | main | 小白坐出租车号车
handle
和exception的区别:如果前面的程序正常执行,则会继续正常执行;如果前面的程序出现异常,那么handle则会捕获异常;无论前面的程序是否正常执行,handle都会执行自己的最后一部分代码,返回一个结果,让后面的程序继续执行
whenComplete
和handle类似,区别就是不提供返回值