Internal changes for limit and unordered stream

  • A+
Category:Languages

Basically this came up while trying to answer another question. Suppose this code:

    AtomicInteger i = new AtomicInteger(0);     AtomicInteger count = new AtomicInteger(0);     IntStream.generate(() -> i.incrementAndGet())             .parallel()             .peek(x -> count.incrementAndGet())             .limit(5)             .forEach(System.out::println);      System.out.println("count = " + count); 

I understand the fact that IntStream#generate is an unordered infinite stream and for it to finish there has to be a short-circuiting operation (limit in this case). I also understand that the Supplier is free to be called as many number of times the Stream implementation feels like before it reaches that limit.

Running this under java-8, would print count always 512 (may be not always, but it is so on my machine).

On the contrast running this under java-10 rarely exceeds 5. So my question is what changed internally (some code lines would be perfect!) that the short-circuiting happens so much better (I tried an still try to answer this on my own btw, having the sources and trying to do some diffs... )


The change happened somewhere between Java 9, beta 103 and Java 9, beta 120.

The responsible class is StreamSpliterators.UnorderedSliceSpliterator.OfInt, resp. its super class StreamSpliterators.UnorderedSliceSpliterator.

The old version of the class looked like

abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {     static final int CHUNK_SIZE = 1 << 7;      // The spliterator to slice     protected final T_SPLITR s;     protected final boolean unlimited;     private final long skipThreshold;     private final AtomicLong permits;      UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {         this.s = s;         this.unlimited = limit < 0;         this.skipThreshold = limit >= 0 ? limit : 0;         this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);     }      UnorderedSliceSpliterator(T_SPLITR s,                               UnorderedSliceSpliterator<T, T_SPLITR> parent) {         this.s = s;         this.unlimited = parent.unlimited;         this.permits = parent.permits;         this.skipThreshold = parent.skipThreshold;     } 

        @Override         public void forEachRemaining(Consumer<? super T> action) {             Objects.requireNonNull(action);              ArrayBuffer.OfRef<T> sb = null;             PermitStatus permitStatus;             while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {                 if (permitStatus == PermitStatus.MAYBE_MORE) {                     // Optimistically traverse elements up to a threshold of CHUNK_SIZE                     if (sb == null)                         sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);                     else                         sb.reset();                     long permitsRequested = 0;                     do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);                     if (permitsRequested == 0)                         return;                     sb.forEach(action, acquirePermits(permitsRequested));                 }                 else {                     // Must be UNLIMITED; let 'er rip                     s.forEachRemaining(action);                     return;                 }             }         } 

As we can see, it attempts to buffer up to CHUNK_SIZE = 1 << 7 elements in each spliterator, which may end up at “number of CPU cores”×128 elements.

In contrast, the new version looks like

abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {     static final int CHUNK_SIZE = 1 << 7;      // The spliterator to slice     protected final T_SPLITR s;     protected final boolean unlimited;     protected final int chunkSize;     private final long skipThreshold;     private final AtomicLong permits;      UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {         this.s = s;         this.unlimited = limit < 0;         this.skipThreshold = limit >= 0 ? limit : 0;         this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,             ((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE;         this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);     }      UnorderedSliceSpliterator(T_SPLITR s,                               UnorderedSliceSpliterator<T, T_SPLITR> parent) {         this.s = s;         this.unlimited = parent.unlimited;         this.permits = parent.permits;         this.skipThreshold = parent.skipThreshold;         this.chunkSize = parent.chunkSize;     } 

        @Override         public void forEachRemaining(Consumer<? super T> action) {             Objects.requireNonNull(action);              ArrayBuffer.OfRef<T> sb = null;             PermitStatus permitStatus;             while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {                 if (permitStatus == PermitStatus.MAYBE_MORE) {                     // Optimistically traverse elements up to a threshold of chunkSize                     if (sb == null)                         sb = new ArrayBuffer.OfRef<>(chunkSize);                     else                         sb.reset();                     long permitsRequested = 0;                     do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);                     if (permitsRequested == 0)                         return;                     sb.forEach(action, acquirePermits(permitsRequested));                 }                 else {                     // Must be UNLIMITED; let 'er rip                     s.forEachRemaining(action);                     return;                 }             }         } 

So now there is an instance field chunkSize. When there is a defined limit and the expression ((skip + limit) / AbstractTask.LEAF_TARGET) + 1 evaluates to a smaller value than CHUNK_SIZE, that smaller value will be used. So when having small limits, the chunkSize will be much smaller. In your case with a limit of 5, the chunk size will always be 1.

Comment

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: