0%

JDK8-Fork/Join和CompletableFuture的使用

随着多核处理器的出现,提升应用程序处理速度最有效的方式是编写能充分发挥多核能力的软件。可以通过切分大型的任务,让每个子任务并行运行,在Java中目前有直接使用线程的方式、使用Fork/Join框架和JDK8中的并行流来达到这一目的。

这段代码演示了分段求和用线程方式的实现。

分别开启2个线程,给予不同范围的数值进行求和,最后调用join()方法等到线程执行完毕,将2个线程的结果相加得到结果。

Thread方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.Random;

public class ThreadJoinTest {

static class Computer extends Thread {
private int start;
private int end;
private int result;
private int []array;

public Computer(int []array , int start , int end) {
this.array = array;
this.start = start;
this.end = end;
}

public void run() {
for(int i = start; i < end ; i++) {
result += array[i];
if(result < 0) result &= Integer.MAX_VALUE;
}
}

public int getResult() {
return result;
}
}

private final static int COUNTER = 100000001;

public static void main(String []args) throws InterruptedException {
int []array = new int[COUNTER];
Random random = new Random();
for(int i = 0 ; i < COUNTER ; i++) {
array[i] = Math.abs(random.nextInt());
}
long start = System.currentTimeMillis();
Computer c1 = new Computer(array , 0 , COUNTER / 2);
Computer c2 = new Computer(array , (COUNTER / 2) + 1 , COUNTER);
c1.start();
c2.start();
c1.join();
c2.join();
System.out.println(System.currentTimeMillis() - start);
//System.out.println(c1.getResult());
System.out.println((c1.getResult() + c2.getResult()) & Integer.MAX_VALUE);
}

Fork/Join

关于Fork/Join框架的介绍的详细介绍,还可以查看方腾飞老师的文章http://ifeve.com/talk-concurrency-forkjoin/,介绍得非常详细。

Fork/Join的实现是要继承RecursiveTask类,实现compute()**方法,如例子所示,同样也是求和的计算,用继承的类,将要求和的范围传入,在compute()**方法内部进行任务拆分,最后将每个任务的结果进行相加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 继承RecursiveTask来创建可以用于fork/join框架的任务
public class ForkJoinSumCalculator extends RecursiveTask<Long> {

// 要求和的数组
private final long[] numbers;
// 子任务处理的数组的起始位置和终止位置
private final int start;
private final int end;

public static final long THRESHHOLD = 10_100;


public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}

private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
// 该任务负责求和的部分的大小
int length = end - start;

// 如果大小小于或等于阈值,顺序计算结果
if (length < THRESHHOLD) {
return computeSequentially();
}


// 创建一个子任务来为数组的前一半求和
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
// 利用另一个ForkJoinPool线程异步执行新创建的子任务
leftTask.fork();
// 创建一个任务为数组的后一半求和
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);

// 同步执行第二个子任务,有可能允许进一步递归划分
Long rightResult = rightTask.compute();
// 读取第一个子任务的结果,如果尚未完成就等待
Long leftResult = leftTask.join();
// 该任务的结果是两个子任务结果的组合
return leftResult + rightResult;
}

private Long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}

return sum;
}


public static void main(String[] args) {
long[] numbers = LongStream.rangeClosed(1, 1000000).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
Long sum = new ForkJoinPool().invoke(task);
System.out.println(sum);
}
}

Java8-parallelStream

Java8做这种求和的操作很简洁,一行代码就可以解决。

1
IntStream.rangeClosed(0, 100000000).parallel().sum()

之前在网上了解到说Java 8 并行流采用共享线程池,对性能造成了严重影响。可以包装流来调用自己的线程池解决性能问题。

如果要使用线程池,可采用以下方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ForkJoinPool forkJoinPool = new ForkJoinPool(3);  
forkJoinPool.submit(() -> {
firstRange.parallelStream().forEach((number) -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) { }
});
});

ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);
forkJoinPool2.submit(() -> {
secondRange.parallelStream().forEach((number) -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
}
});
});

CompletableFuture

Future接口

在介绍CompletableFuture之前,必须要说一下Future接口,Future接口在Java5中被引入,它建模了一种**异步计算**,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。这样就不需要等到方法结果,调用线程可以接着去做其他事情,这样就完成了异步调用,方法结果只能通过get()方法获取,get()方法是阻塞的,会阻塞直到异步方法运行完成返回结果。打个比方,把衣服放进洗衣机清洗,这个时候我们可以去做点别的事情,等到洗衣机清洗完成后,我们才会去取干净的衣服。下面展示一个简单的示例,只需要将操作封装到Callable接口中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class FutureTaskTest {

static class CallImpl implements Callable<String> {

private String input;

public CallImpl(String input) {
this.input = input;
}
public String call() {
try {//延迟一点点
Random random = new Random();
Thread.sleep((random.nextInt() + this.input.hashCode()) & 10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "输入参数:" + input;
}
}

public static void main(String []args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
@SuppressWarnings("unchecked")
List<Future<String>> list = Arrays.asList(
executorService.submit(new CallImpl("t1")) ,
executorService.submit(new CallImpl("t2")) ,
executorService.submit(new CallImpl("t3"))
);
/*List<Future<String>> list = executorService.invokeAll(Arrays.asList(
new CallImpl("t1"),
new CallImpl("t2"),
new CallImpl("t3")
));*/
for(Future<String> future : list) {
String result = future.get();//如果没返回,会阻塞
System.out.println(result + "\t" + System.currentTimeMillis());
}
executorService.shutdown();
}
}

Future接口的局限性

Future提供了isDone()方法检测异步计算是否已经结束,get()方法等待异步操作结束,以及获取计算的结果。但是这些操作依然很难实现:等到所有Future任务完成,通知线程获取结果并合并。下面就一起来看看JDK8中新引入的CompletableFuture

使用CompletableFuture构建异步应用

CompletableFuture实现了Future接口,它的complete()**方法就相当于结束CompletableFuture**对象的执行,并设置变量的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* @author yangfan
* @date 2017/04/11
*/
public class Shop {

private static final Random random = new Random();

private String name;

public Shop(String name) {
this.name = name;
}


public String getPrice(String product) {
double price = calculatePrice(product);
Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s", name, price, code);
}

public Future<Double> getPriceAsync1(String product) {
// 创建CompletableFuture对象,它会包含计算的结果
CompletableFuture<Double> futurePrice = new CompletableFuture<>();

// 在另一个线程中以异步方式执行计算
new Thread(() -> {
try {
double price = calculatePrice(product);
// 需长时间计算的任务结束并得出结果时,设置Future的返回值
futurePrice.complete(price);
} catch (Exception e) {
//异常处理
futurePrice.completeExceptionally(e);
}
}).start();

// 无需等待还没结束的计算,直接返回Future对象
return futurePrice;

}

public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}


private double calculatePrice(String product) {
randomDelay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}

public static void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public static void randomDelay() {
int delay = 500 + random.nextInt(2000);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
// 查询商店,试图去的商品的价格
Future<Double> futurePrice = shop.getPriceAsync("my favorte product");
long invocationTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("Invocation returned after " + invocationTime + " msecs");

// 执行更多任务,比如查询其他商店
//doSomethingElse();
// 在计算商品价格的同时


try {
// 从Future对象中读取价格,如果价格未知,会发生阻塞
double price = futurePrice.get();
System.out.printf("Price is %.2f%n", price);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}

long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");



}

public String getName() {
return name;
}
}

上面的例子来自于《Java8实战》,模拟了一个耗时的操作,然后通过CompletableFuture包装成异步方法进行调用。注意代码里演示了两种方式,一种自己new一个线程再调用complete方法,还有就是用CompletableFuture自身提供的工厂方法,CompletableFuture.supplyAsync,它能更容易地完成整个流程,还不用担心实现的细节。

现在看来好像和Future方式也没有什么区别,都是包装一下最后通过get()**方法获取结果,但是CompletableFuture**配合Java8用起来就非常厉害了,它提供了很多方便的方法,下面进行一个演示。

同样是价格查询,我们现在接到一个需求,就是获取一个商品在不同商店的报价,一般来说用传统的方式就是写一个for循环,遍历商店然后获取价格,要想效率快一点我们也可以用并行流的方式来查询,但是并行流返回的结果是无序的。下面再将异步也引入,我们可以实现有序的并行操作:

1
2
3
4
5
private static List<String> findPrices_1(String product) {
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor)).collect(Collectors.toList());
return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
}

这里创建CompletableFuture和调用**join()**方法是两个不同的流是有原因的,如果只在一个流里,那么就没有异步的效果了,下一个Future必须等到上一个完成后才会被创建和执行。上面的代码执行效率并不会比并行流的效率差。

默认情况下,并行流和CompletableFuture默认都是用固定数目的线程,都是取决于Runtime. getRuntime().availableProcessors()**的返回值。并行流的线程池并不好控制,其本质是内部隐含使用了ForkJoinPool线程池,最大并发数可以通过系统变量设置。所以CompletableFuture也就具有了优势,它允许配置自定义的线程池,这也可以为实际应用程序带来性能上的提升(并行流无法提供的API),CompletableFuture.supplyAsync(Supplier supplier,Executor executor)**提供了重载的方法来指定执行器使用自定义的线程池。

1
2
3
4
5
6
7
8
9
10
// 创建一个线程池,线程池中线程的数目为100和商店数目二者中较小的一个值
private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
// 使用守护线程---这种方式不会阻止程序关掉
t.setDaemon(true);
return t;
}
});

并行–使用流还是CompletableFutures?

现在我们知道对集合进行并行计算有两种方式:

  1. 转化为并行流,利用map开展工作。
  2. 取出每一个元素,创建线程,在CompletableFuture内对其进行操作

后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助我们确保整体的计算不会因为线程都在等待I/O而发生阻塞。

那么如何选择呢,建议如下:

  • 进行计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
  • 如果并行操作设计等到I/O的操作(网络连接,请求等),那么使用CompletableFuture灵活性更好,通过控制线程数量来优化程序的运行。

CompletableFuture还提供了了一些非常有用的操作例如,thenApply(),thenCompose(),thenCombine()等

  • thenApply()是操作完成后将结果传入进行转换
  • thenCompose()是对两个异步操作进行串联,第一个操作完成时,对第一个CompletableFuture对象调用thenCompose,并向其传递一个函数。当第一个CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture的返回做输入计算出第二个CompletableFuture对象。
  • thenCombine()会异步执行两个CompletableFuture任务,然后等待它们计算出结果后再进行计算。
1
2
3
4
5
6
7
8
9
10
11
12
private static List<String> findPrices(String product) {

List<CompletableFuture<String>> priceFutures = shops.stream()
// 以异步方式取得每个shop中指定产品的原始价格
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
// Quote对象存在时,对其返回值进行转换
.map(future -> future.thenApply(Quote::parse))
// 使用另一个异步任务构造期望的Future,申请折扣
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
.collect(Collectors.toList());
return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
}

通常而言,名称中不带Async的方法和它的前一个任务一样,在同一个线程中运行;而名称以Async结尾的方法会将后续的任务提交到一个线程池,所以每个任务都是由不同线程处理的,例如thenApplyAsync(),thenComposeAsync()等。

最后看一段利用**thenAccept()**来使用异步计算结果的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 这里演示获取最先返回的数据
public static Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)));
}

public static void main(String[] args) {

long start = System.nanoTime();
// System.out.println(findPrices("myPhone27S"));
CompletableFuture[] futures = findPricesStream("myPhone27S")
.map(f -> f.thenAccept(s -> System.out.println(s + " (done in " +
((System.nanoTime() - start) / 1_000_000) + " msecs)")))
.toArray(CompletableFuture[]::new);
// CompletableFuture.allOf(futures).join();
CompletableFuture.anyOf(futures).join();
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration + " msecs");

}

这样就几乎无需等待findPricesStream的调用,实现了一个真正的异步方法。