1、基本介绍
1、创建方式
1、Array的Stream创建
1、直接创建
// mainStream stream = Stream.of("a", "b", "c"); String [] strArray = new String[] {"a", "b", "c"}; stream = Stream.of(strArray);
// Stream.of()@SafeVarargs@SuppressWarnings("varargs") // Creating a stream from an array is safepublic static<T> Stream<T> of(T... values) { return Arrays.stream(values); }
2、直接使用Arrays.stream工具创建
// mainString [] strArray = new String[] {"a", "b", "c"}; stream = Arrays.stream(strArray);
下面是Arrays.stream的具体实现
// Arrays.stream()public static <T> Stream<T> stream(T[] array) { return stream(array, 0, array.length); }
/** * Arrays.stream() * @param startInclusive 起始坐标 * @param endExclusive 最终坐标 */public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) { return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false); }
StreamSupport.stream的实现使用的是ReferencePipeline.Head<>这个方法,注意这个方法,这个方法是Stream流水线解决方案的核心之一
// StreamSupport.stream()public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }
注意这里的Spliterator,这个类是Stream实现并行的核心类。这里Array生成的spliterator的特征值是ordered和immutable。(目前没看到关于特征值的相关操作,具体解释可以看源码的注释)
/** * ReferencePipeline.Head<>() * 默认生成一个ordered、immutable的Spliterator */public static <T> Spliterator<T> spliterator(T[] array, int startInclusive, int endExclusive) { return Spliterators.spliterator(array, startInclusive, endExclusive, Spliterator.ORDERED | Spliterator.IMMUTABLE); }
2、Collection的Stream创建
// mainList<Integer> integers = new ArrayList<>(); integers.stream();
Collection的Stream的创建使用的是Collection.stream方法
// Collection.stream()default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); }
这个spliterator(),创建的是Spliterator下的特增值为sized、subSized的Spliterator
// Collection.spliterator()@Override default Spliterator<E> spliterator() { return Spliterators.spliterator(this, 0); }
// Spliterators.spliterator()public static <T> Spliterator<T> spliterator(Collection<? extends T> c, int characteristics) { return new IteratorSpliterator<>(Objects.requireNonNull(c), characteristics); }
/** * IteratorSpliterator<>() * 默认生成一个sized、subSized的Spliterator */public IteratorSpliterator(Collection<? extends T> collection, int characteristics) { this.collection = collection; this.it = null; this.characteristics = (characteristics & Spliterator.CONCURRENT) == 0 ? characteristics | Spliterator.SIZED | Spliterator.SUBSIZED : characteristics; }
最后还是将Spliterator放入ReferencePipeline.Head<>方法创建了Stream
// ReferencePipeline.Head<>public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }
3、其他创建方式
1、Stream.iterate()
Stream.iterate(1,i->i++)
该方法放入一个seed值作为种子值,使用第二个参数方法生成一个无限大小的Stream,特征值与Array的Stream特征值相同。
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) { Objects.requireNonNull(f); final Iterator<T> iterator = new Iterator<T>() { @SuppressWarnings("unchecked") T t = (T) Streams.NONE; @Override public boolean hasNext() { return true; } @Override public T next() { return t = (t == Streams.NONE) ? seed : f.apply(t); } }; return StreamSupport.stream(Spliterators.spliteratorUnknownSize( iterator, Spliterator.ORDERED | Spliterator.IMMUTABLE), false); }
2、Stream.generate()
Stream.generate(Math::random)
该方法没有放入种子值,放入的是一个Supplier,该类就是java1.8以后加入的函数式接口,该接口只有一个方法就是get()方法,用于提供生成Stream需要的每一个的数据,最后生成长度最大为9223372036854775807L(2的63次方-1)的Stream。
public static<T> Stream<T> generate(Supplier<T> s) { Objects.requireNonNull(s); return StreamSupport.stream( new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false); }
@FunctionalInterfacepublic interface Supplier<T> { /** * Gets a result. * * @return a result */ T get(); }
2、中间操作(intermediate operation)
每一个流能有多个中间操作,中间操作的作用就是将原始的流转化为需要的流,并且为惰性操作,关于惰性操作后面会有具体介绍。并且中间操作可分为有状态和无状态两种,两种不同的操作在构成Stream流水线时会使用不同的创建方式和操作。
1、有状态操作(statefulOp)
1、Stream<T> distinct();
// 除去流种重复的元素
2、Stream<T> limit(long maxSize);
// 只取前几个元素
3、Stream<T> skip(long n);
// 跳过前几个元素
4、Stream<T> sorted();
// 根据自然排序对流排序
5、Stream<T> sorted(Comparator<? super T> comparator);
// 根据自己实现的排序对流排序
2、无状态操作(statelessOp)
1、Stream<T> filter(Predicate<? super T> predicate);
// 根据过滤规则过滤流种的元素
2、<R> Stream<R> map(Function<? super T, ? extends R> mapper);
// 将每一个元素映射成另一个元素
3、<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
// 和上一个map映射不同的是,扁平映射会将流中的最基础的元素映射出来
4、Stream<T> peek(Consumer<? super T> action);
// 每一个元素都要做一下这个action
5、IntStream mapToInt(ToIntFunction<? super T> mapper);
// 映射为IntStream
6、LongStream mapToLong(ToLongFunction<? super T> mapper);
// 映射为LongStream
7、DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
// 映射为DoubleStream
2、结束操作(terminal operation)
每个流只能有一个结束操作,结束操作会将和之前的中间操作一同起作用,在使用了结束操作后该流便被消费掉了,不能再次使用。由于流可以是无限大的,所以也会有短路操作,当无限大的流使用了短路操作并且满足了短路条件时便会直接结束。
1、非短路操作
1、void forEach(Consumer<? super T> action);
// 每一个元素都要做一下这个aciton
2、void forEachOrdered(Consumer<? super T> action);
// 确保并行时保持顺序执行这个action
3、Object[] toArray();
// 转化成Object数组
4、<A> A[] toArray(IntFunction<A[]> generator);
// 转化成自己定义的数组
5、T reduce(T identity, BinaryOperator<T> accumulator);
// 汇聚,有起始值,操作
6、Optional<T> reduce(BinaryOperator<T> accumulator);
// 汇聚,无起始值,返回的是Optional对象
7、<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> comb iner);
// 汇聚,有起始值,操作,合并
8、<R> R collect(Supplier<R> supplier,BiConsumer<R, ? super T> accumulator,BiConsumer<R, R> combiner);
// 可变汇聚,自己实现汇聚,容器、操作、合并操作
9、<R, A> R collect(Collector<? super T, A, R> collector);
// 可变汇聚,Collectors有封装工具
10、Optional<T> min(Comparator<? super T> comparator);
// 封装了reduce,使用自己的比较器找到最小的
11、Optional<T> max(Comparator<? super T> comparator);
// 封装了reduce,使用自己的比较器找到最大的
12、long count();
// 封装了reduce,把每个数变成1再求和
2、短路操作(short-circuiting)
1、boolean anyMatch(Predicate<? super T> predicate);
// 有一个符合判断
2、boolean noneMatch(Predicate<? super T> predicate);
// 没有一个符合判断
3、Optional<T> findFirst();
// 有序的,找到第一个
4、Optional<T> findAny();
// 不要求有序的,找到一个
5、boolean (Predicate<? super T> predicate);
// 全部符合判断
2、Stream流水线解决方案
1、ReferencePipeLine
Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。跟Stream相关类和接口的继承关系图示。
下面是源码,Head表示Source stage,例如Collection.stream(),这里面没有对数据的操作,StatelessOp和StatefuleOp分别对应无状态和有状态的中间操作
/** * Source stage of a ReferencePipeline. * * @param <E_IN> type of elements in the upstream source * @param <E_OUT> type of elements in produced by this stage * @since 1.8 */ static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> @Override final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) { throw new UnsupportedOperationException(); }
/** * Base class for a stateless intermediate stage of a Stream. * * @param <E_IN> type of elements in the upstream source * @param <E_OUT> type of elements in produced by this stage * @since 1.8 */ abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>
/** * Base class for a stateful intermediate stage of a Stream. * * @param <E_IN> type of elements in the upstream source * @param <E_OUT> type of elements in produced by this stage * @since 1.8 */ abstract static class StatefulOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>
通过之前就可以看到的ReferencePipeline.Head<>生成第一个Source stage,紧接着调用一系列的中间操作,不断产生新的Stream。这些Stream对象以双向链表的形式组织在一起,构成整个流水线,由于每个Stage都记录了前一个Stage和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作。这就是Stream记录操作的方式。
2、Sink
上面讲到的是Stream流水线如何流水操作计算,但是如何组合就要看Sink这个接口了,该接口包含以下方法
方法名 | 作用 |
---|---|
void begin(long size) | 开始遍历元素之前调用该方法,通知Sink做好准备。 |
void end() | 所有元素遍历完成之后调用,通知Sink没有更多的元素了。 |
boolean cancellationRequested() | 是否可以结束操作,可以让短路操作尽早结束。 |
void accept(T t) | 遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。 |
通过以上方法,各个Stage之间的调用就实现了,每个Stage将自己的操作封装到Sink中,然后只需要访问下一个Stage的accept方法即可。
下面分别举无状态中间操作和有状态中间操作对这几个方法的实现。
首先是map的实现
// Stream.map() @Override @SuppressWarnings("unchecked") public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireN
转载请标明出处:http://www.cnblogs.com/MoEee/p/6490573.html
http://www.cnblogs.com/MoEee/p/7229073.htm
l