• 已删除用户
Administrator
发布于 2018-09-20 / 5 阅读
0

ParallelStreams 的陷阱

Stream 是 JDK8 中新增加的一个特性,被 Java 猿统称为流。ParallelStream 其实就是一个并行执行的流,它通过默认的 ForkJoinPool,可能提高你的多线程任务的速度。个人非常喜欢使用 Stream 这个新特性,这里搬录一个曾使用 ParallelStreams 导致同事排查了一整天的 BUG。NPE 如同薛定谔的猫,打了断点监测就不出现,不打断点就会大概率出现。

直接上代码

public class ParallelStreamTest {
    public static void main(String[] args) {
        List<ClassA> listA = Lists.newArrayList();
        for (int i = 0; i < 100; i++) {
            ClassA a = new ClassA();
            a.setProperty("ClassA Property " + i);
            listA.add(a);
        }

        List<ClassB> listB = Lists.newArrayList();
        listA.parallelStream().forEach(a -> {
            ClassB b = new ClassB();
            BeanUtil.copyProperties(a, b);
            listB.add(b);
        });

        listB.forEach(b -> {
            System.out.println(b.getProperty());
        });
    }

    @Data
    static class ClassA {
        private String property;
    }

    @Data
    static class ClassB {
        private String property;
    }
}

对的,就这么几行代码!希望通过两层ParallelStream并行流,快速初始化listB数据。

执行结果

结果 1
正常

结果 2

Exception in thread "main" java.lang.NullPointerException
	at com.demo.test.ParallelStreamTest.lambda$main$1(ParallelStreamTest.java:36)
	at java.util.ArrayList.forEach(ArrayList.java:1249)
	at com.demo.test.ParallelStreamTest.main(ParallelStreamTest.java:35)

结果 3

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
	at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
	at com.demo.test.ParallelStreamTest.main(ParallelStreamTest.java:19)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 10
	at java.util.ArrayList.add(ArrayList.java:459)
	at com.demo.test.ParallelStreamTest.lambda$main$0(ParallelStreamTest.java:22)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)