随着多核处理器的出现,提升应用程序处理速度最有效的方式是编写能充分发挥多核能力的软件。可以通过切分大型的任务,让每个子任务并行运行,在Java中目前有直接使用线程的方式、使用Fork/Join框架和JDK8中的并行流来达到这一目的。
这段代码演示了分段求和用线程方式的实现。
分别开启2个线程,给予不同范围的数值进行求和,最后调用join()方法等到线程执行完毕,将2个线程的结果相加得到结果。
Thread方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import java.util.Random;public class ThreadJoinTest {static class Computer extends Thread { private int start; private int end; private int result; private int []array; public Computer (int []array , int start , int end) { this .array = array; this .start = start; this .end = end; } public void run () { for (int i = start; i < end ; i++) { result += array[i]; if (result < 0 ) result &= Integer.MAX_VALUE; } } public int getResult () { return result; } } private final static int COUNTER = 100000001 ; public static void main (String []args) throws InterruptedException { int []array = new int [COUNTER]; Random random = new Random(); for (int i = 0 ; i < COUNTER ; i++) { array[i] = Math.abs(random.nextInt()); } long start = System.currentTimeMillis(); Computer c1 = new Computer(array , 0 , COUNTER / 2 ); Computer c2 = new Computer(array , (COUNTER / 2 ) + 1 , COUNTER); c1.start(); c2.start(); c1.join(); c2.join(); System.out.println(System.currentTimeMillis() - start); System.out.println((c1.getResult() + c2.getResult()) & Integer.MAX_VALUE); }
Fork/Join 关于Fork/Join框架的介绍的详细介绍,还可以查看方腾飞老师的文章http://ifeve.com/talk-concurrency-forkjoin/,介绍得非常详细。
Fork/Join的实现是要继承RecursiveTask 类,实现compute()**方法,如例子所示,同样也是求和的计算,用继承的类,将要求和的范围传入,在 compute()**方法内部进行任务拆分,最后将每个任务的结果进行相加。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public class ForkJoinSumCalculator extends RecursiveTask <Long > { private final long [] numbers; private final int start; private final int end; public static final long THRESHHOLD = 10_100 ; public ForkJoinSumCalculator (long [] numbers) { this (numbers, 0 , numbers.length); } private ForkJoinSumCalculator (long [] numbers, int start, int end) { this .numbers = numbers; this .start = start; this .end = end; } @Override protected Long compute () { int length = end - start; if (length < THRESHHOLD) { return computeSequentially(); } ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2 ); leftTask.fork(); ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2 , end); Long rightResult = rightTask.compute(); Long leftResult = leftTask.join(); return leftResult + rightResult; } private Long computeSequentially () { long sum = 0 ; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static void main (String[] args) { long [] numbers = LongStream.rangeClosed(1 , 1000000 ).toArray(); ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); Long sum = new ForkJoinPool().invoke(task); System.out.println(sum); } }
Java8-parallelStream Java8做这种求和的操作很简洁,一行代码就可以解决。
1 IntStream.rangeClosed(0 , 100000000 ).parallel().sum()
之前在网上了解到说Java 8 并行流采用共享线程池,对性能造成了严重影响。可以包装流来调用自己的线程池解决性能问题。
如果要使用线程池,可采用以下方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ForkJoinPool forkJoinPool = new ForkJoinPool(3 ); forkJoinPool.submit(() -> { firstRange.parallelStream().forEach((number) -> { try { Thread.sleep(5 ); } catch (InterruptedException e) { } }); }); ForkJoinPool forkJoinPool2 = new ForkJoinPool(3 ); forkJoinPool2.submit(() -> { secondRange.parallelStream().forEach((number) -> { try { Thread.sleep(5 ); } catch (InterruptedException e) { } }); });
CompletableFuture Future接口 在介绍CompletableFuture 之前,必须要说一下Future 接口,Future 接口在Java5中被引入,它建模了一种**异步计算** ,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。这样就不需要等到方法结果,调用线程可以接着去做其他事情,这样就完成了异步调用,方法结果只能通过get()方法获取,get()方法是阻塞的,会阻塞直到异步方法运行完成返回结果。打个比方,把衣服放进洗衣机清洗,这个时候我们可以去做点别的事情,等到洗衣机清洗完成后,我们才会去取干净的衣服。下面展示一个简单的示例,只需要将操作封装到Callable 接口中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class FutureTaskTest { static class CallImpl implements Callable <String > { private String input; public CallImpl (String input) { this .input = input; } public String call () { try { Random random = new Random(); Thread.sleep((random.nextInt() + this .input.hashCode()) & 10000 ); } catch (InterruptedException e) { e.printStackTrace(); } return "输入参数:" + input; } } public static void main (String []args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(2 ); @SuppressWarnings("unchecked") List<Future<String>> list = Arrays.asList( executorService.submit(new CallImpl("t1" )) , executorService.submit(new CallImpl("t2" )) , executorService.submit(new CallImpl("t3" )) ); for (Future<String> future : list) { String result = future.get(); System.out.println(result + "\t" + System.currentTimeMillis()); } executorService.shutdown(); } }
Future接口的局限性 Future提供了isDone()方法检测异步计算是否已经结束,get()方法等待异步操作结束,以及获取计算的结果。但是这些操作依然很难实现:等到所有Future任务完成,通知线程获取结果并合并 。下面就一起来看看JDK8中新引入的CompletableFuture 。
使用CompletableFuture构建异步应用 CompletableFuture 实现了Future 接口,它的complete()**方法就相当于结束 CompletableFuture**对象的执行,并设置变量的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 public class Shop { private static final Random random = new Random(); private String name; public Shop (String name) { this .name = name; } public String getPrice (String product) { double price = calculatePrice(product); Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)]; return String.format("%s:%.2f:%s" , name, price, code); } public Future<Double> getPriceAsync1 (String product) { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread(() -> { try { double price = calculatePrice(product); futurePrice.complete(price); } catch (Exception e) { futurePrice.completeExceptionally(e); } }).start(); return futurePrice; } public Future<Double> getPriceAsync (String product) { return CompletableFuture.supplyAsync(() -> calculatePrice(product)); } private double calculatePrice (String product) { randomDelay(); return random.nextDouble() * product.charAt(0 ) + product.charAt(1 ); } public static void delay () { try { Thread.sleep(1000L ); } catch (InterruptedException e) { throw new RuntimeException(e); } } public static void randomDelay () { int delay = 500 + random.nextInt(2000 ); try { Thread.sleep(delay); } catch (InterruptedException e) { throw new RuntimeException(e); } } public static void main (String[] args) { Shop shop = new Shop("BestShop" ); long start = System.nanoTime(); Future<Double> futurePrice = shop.getPriceAsync("my favorte product" ); long invocationTime = (System.nanoTime() - start) / 1_000_000 ; System.out.println("Invocation returned after " + invocationTime + " msecs" ); try { double price = futurePrice.get(); System.out.printf("Price is %.2f%n" , price); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } long retrievalTime = ((System.nanoTime() - start) / 1_000_000 ); System.out.println("Price returned after " + retrievalTime + " msecs" ); } public String getName () { return name; } }
上面的例子来自于《Java8实战》,模拟了一个耗时的操作,然后通过CompletableFuture 包装成异步方法进行调用。注意代码里演示了两种方式,一种自己new一个线程再调用complete方法,还有就是用CompletableFuture 自身提供的工厂方法,CompletableFuture.supplyAsync ,它能更容易地完成整个流程,还不用担心实现的细节。
现在看来好像和Future 方式也没有什么区别,都是包装一下最后通过get()**方法获取结果,但是 CompletableFuture**配合Java8用起来就非常厉害了,它提供了很多方便的方法,下面进行一个演示。
同样是价格查询,我们现在接到一个需求,就是获取一个商品在不同商店的报价,一般来说用传统的方式就是写一个for
循环,遍历商店然后获取价格,要想效率快一点我们也可以用并行流的方式来查询,但是并行流返回的结果是无序的。下面再将异步也引入,我们可以实现有序的并行操作:
1 2 3 4 5 private static List<String> findPrices_1 (String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor)).collect(Collectors.toList()); return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()); }
这里创建CompletableFuture 和调用**join()**方法是两个不同的流是有原因的,如果只在一个流里,那么就没有异步的效果了,下一个Future必须等到上一个完成后才会被创建和执行。上面的代码执行效率并不会比并行流的效率差。
默认情况下,并行流和CompletableFuture默认都是用固定数目的线程,都是取决于Runtime. getRuntime().availableProcessors()**的返回值。并行流的线程池并不好控制,其本质是内部隐含使用了ForkJoinPool线程池,最大并发数可以通过系统变量设置。所以 CompletableFuture也就具有了优势,它允许配置自定义的线程池,这也可以为实际应用程序带来性能上的提升(并行流无法提供的API), CompletableFuture.supplyAsync(Supplier supplier,Executor executor)**提供了重载的方法来指定执行器使用自定义的线程池。
1 2 3 4 5 6 7 8 9 10 private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100 ), new ThreadFactory() { @Override public Thread newThread (Runnable r) { Thread t = new Thread(r); t.setDaemon(true ); return t; } });
并行–使用流还是CompletableFutures? 现在我们知道对集合进行并行计算有两种方式:
转化为并行流,利用map开展工作。
取出每一个元素,创建线程,在CompletableFuture 内对其进行操作
后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助我们确保整体的计算不会因为线程都在等待I/O而发生阻塞。
那么如何选择呢,建议如下:
进行计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
如果并行操作设计等到I/O的操作(网络连接,请求等),那么使用CompletableFuture 灵活性更好,通过控制线程数量来优化程序的运行。
CompletableFuture还提供了了一些非常有用的操作例如,thenApply(),thenCompose(),thenCombine()等
thenApply()是操作完成后将结果传入进行转换
thenCompose()是对两个异步操作进行串联,第一个操作完成时,对第一个CompletableFuture 对象调用thenCompose,并向其传递一个函数。当第一个CompletableFuture 执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture 的返回做输入计算出第二个CompletableFuture 对象。
thenCombine()会异步执行两个CompletableFuture 任务,然后等待它们计算出结果后再进行计算。
1 2 3 4 5 6 7 8 9 10 11 12 private static List<String> findPrices (String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))) .collect(Collectors.toList()); return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()); }
通常而言,名称中不带Async的方法和它的前一个任务一样,在同一个线程中运行;而名称以Async结尾的方法会将后续的任务提交到一个线程池,所以每个任务都是由不同线程处理的,例如thenApplyAsync(),thenComposeAsync()等。
最后看一段利用**thenAccept()**来使用异步计算结果的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static Stream<CompletableFuture<String>> findPricesStream(String product) { return shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor))); } public static void main (String[] args) { long start = System.nanoTime(); CompletableFuture[] futures = findPricesStream("myPhone27S" ) .map(f -> f.thenAccept(s -> System.out.println(s + " (done in " + ((System.nanoTime() - start) / 1_000_000 ) + " msecs)" ))) .toArray(CompletableFuture[]::new ); CompletableFuture.anyOf(futures).join(); long duration = (System.nanoTime() - start) / 1_000_000 ; System.out.println("Done in " + duration + " msecs" ); }
这样就几乎无需等待findPricesStream 的调用,实现了一个真正的异步方法。