Java 8 Generics: Reducing a Stream of Consumers to a single Consumer

  • A+

How can I write a method for combining a Stream of Consumers into a single Consumer using Consumer.andThen(Consumer)?

My first version was:

<T> Consumer<T> combine(Stream<Consumer<T>> consumers) { return consumers     .filter(Objects::nonNull)     .reduce(Consumer::andThen)     .orElse(noOpConsumer()); }  <T> Consumer<T> noOpConsumer() {     return value -> { /* do nothing */ }; } 

This version compiles with JavaC and Eclipse. But it is too specific: The Stream cannot be a Stream<SpecialConsumer>, and if the Consumers are not exactly of type T but a super type of it, it cannot be used:

Stream<? extends Consumer<? super Foo>> consumers = ...; combine(consumers); 

That won't compile, rightfully. The improved version would be:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) { return consumers     .filter(Objects::nonNull)     .reduce(Consumer::andThen)     .orElse(noOpConsumer()); } 

But neither Eclipse nor JavaC compile that:
Eclipse (4.7.3a):

The type Consumer does not define andThen(capture#7-of ? extends Consumer<? super T>, capture#7-of ? extends Consumer<? super T>) that is applicable here

JavaC (1.8.0172):

error: incompatible types: invalid method reference
incompatible types: Consumer<CAP#1> cannot be converted to Consumer<? super CAP#2>
where T is a type-variable:
T extends Object declared in method <T>combine(Stream<? extends Consumer<? super T>>)
where CAP#1,CAP#2 are fresh type-variables:
CAP#1 extends Object super: T from capture of ? super T
CAP#2 extends Object super: T from capture of ? super T

But it should work: Every subclass of Consumer can be used as a Consumer, too. And every Consumer of a super-type of X can consume Xs, too. I tried to add type parameters to each line of the stream version, but that won't help. But if I write it down with a traditional loop, it compiles:

<T> Consumer<T> combine(Collection<? extends Consumer<? super T>> consumers) { Consumer<T> result = noOpConsumer() for (Consumer<? super T> consumer : consumers) {     result = result.andThen(consumer); } return result; 

(Filtering out the null values is left out for conciseness.)

Therefore, my question is: How can I convince JavaC and Eclipse that my Code is correct? Or, if it is not correct: Why is the loop-version correct but not the Stream Version?


Method Optional<T> reduce(BinaryOperator<T> accumulator) can only accept elements of type T. It's a more strict version.

Use a three-argument version of the Stream.reduce(...) method instead:

<U> U reduce(U identity,              BiFunction<U, ? super T, U> accumulator              BinaryOperator<U> combiner); 

The BiFunction<U, ? super T, U> accumulator can accept parameters of two different types, so it's more suitable for your situation. Therefore, you can write:

<T> Consumer<T> combine(Stream<? extends Consumer<T>> consumers) {     Consumer<T> identity = t -> {};      return consumers.filter(Objects::nonNull)                     .reduce(identity, Consumer::andThen, Consumer::andThen); } 

The third argument BinaryOperator<U> combiner is called only in the parallel streams, but anyway it would be wise to provide a correct implementation of it.

For a better understanding you can rewrite the above code as follows:

<T> Consumer<T> combine(Stream<? extends Consumer<T>> consumers) {     Consumer<T> identity = t -> {};     BiFunction<Consumer<T>, Consumer<T>, Consumer<T>> accumulator = Consumer::andThen;     BinaryOperator<Consumer<T>> combiner = Consumer::andThen;      return consumers.filter(Objects::nonNull)                     .reduce(identity, accumulator, combiner); } 

? extends Consumer<T> guarantees that each element will behave as a Consumer<T>.


:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: