Will parallel stream work fine with distinct operation?

  • A+

I was reading about statelessness and came across this in doc:

Stream pipeline results may be nondeterministic or incorrect if the behavioral parameters to the stream operations are stateful. A stateful lambda (or other object implementing the appropriate functional interface) is one whose result depends on any state which might change during the execution of the stream pipeline.

Now if I have the a list of string (strList say) and then trying to remove duplicate strings from it using parallel streams in the following way:

List<String> resultOne = strList.parallelStream().distinct().collect(Collectors.toList()); 

or in case we want case insensitive:

List<String> result2 = strList.parallelStream().map(String::toLowerCase)                        .distinct().collect(Collectors.toList()); 

Can this code have any problem as parallel streams will split the input and distinct in one chunk does not necessarily mean distinct in the whole input?


Roughly pointing out the relevant parts of the doc:

Intermediate operations are further divided into stateless and stateful operations. Stateless operations, such as filter and map, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements. Stateful operations, such as distinct and sorted, may incorporate state from previously seen elements when processing new elements

Stateful operations may need to process the entire input before producing a result. For example, one cannot produce any results from sorting a stream until one has seen all elements of the stream. As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel, with minimal data buffering

If you read further down (section on ordering):

Streams may or may not have a defined encounter order. Whether or not a stream has an encounter order depends on the source and the intermediate operations. Certain stream sources (such as List or arrays) are intrinsically ordered, whereas others (such as HashSet) are not. Some intermediate operations, such as sorted(), may impose an encounter order on an otherwise unordered stream, and others may render an ordered stream unordered, such as BaseStream.unordered(). Further, some terminal operations may ignore encounter order, such as forEach().


For parallel streams, relaxing the ordering constraint can sometimes enable more efficient execution. Certain aggregate operations, such as filtering duplicates (distinct()) or grouped reductions (Collectors.groupingBy()) can be implemented more efficiently if ordering of elements is not relevant. Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism. In cases where the stream has an encounter order, but the user does not particularly care about that encounter order, explicitly de-ordering the stream with unordered() may improve parallel performance for some stateful or terminal operations. However, most stream pipelines, such as the "sum of weight of blocks" example above, still parallelize efficiently even under ordering constraints.

In conclusion,

  • distinct will work fine with parallel streams, but as you may already know, it has to consume the entire stream before continuing and this uses a lot of data.
  • If the source of the items is an unordered collection (such as hashset) or the stream is unordered(), then distinct is not worried about ordering the output and thus will be efficient

Solution is to add .unordered() to the stream pipeline if you are not worried about order and would like to see more performance.

List<String> result2 = strList.parallelStream()                               .unordered()                               .map(String::toLowerCase)                               .distinct()                               .collect(Collectors.toList()); 

Alas there is no (available builtin) concurrent hashset in Java (unless they got clever with ConcurrentHashMap), so I can only leave you with the unfortunate possibility that distinct is implemented in a blocking fashion using a regular Java set. In which case, I don't see any benefit of doing a parallel distinct.


:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: