/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Function;
import java.lang.invoke.MethodHandles;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collector;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.infinispan.BaseCacheStream;
import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.infinispan.DoubleCacheStream;
import org.infinispan.IntCacheStream;
import org.infinispan.LongCacheStream;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.commons.util.AbstractIterator;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.Closeables;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.marshall.core.MarshallableFunctions;
import org.infinispan.reactive.publisher.PublisherReducers;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.impl.AbstractCacheStream;
import org.infinispan.stream.impl.CacheBiConsumers;
import org.infinispan.stream.impl.CacheIntermediatePublisher;
import org.infinispan.stream.impl.DistributedDoubleCacheStream;
import org.infinispan.stream.impl.DistributedIntCacheStream;
import org.infinispan.stream.impl.DistributedLongCacheStream;
import org.infinispan.stream.impl.IntermediateCacheStream;
import org.infinispan.stream.impl.intops.object.DistinctOperation;
import org.infinispan.stream.impl.intops.object.FilterOperation;
import org.infinispan.stream.impl.intops.object.FlatMapOperation;
import org.infinispan.stream.impl.intops.object.FlatMapToDoubleOperation;
import org.infinispan.stream.impl.intops.object.FlatMapToIntOperation;
import org.infinispan.stream.impl.intops.object.FlatMapToLongOperation;
import org.infinispan.stream.impl.intops.object.LimitOperation;
import org.infinispan.stream.impl.intops.object.MapOperation;
import org.infinispan.stream.impl.intops.object.MapToDoubleOperation;
import org.infinispan.stream.impl.intops.object.MapToIntOperation;
import org.infinispan.stream.impl.intops.object.MapToLongOperation;
import org.infinispan.stream.impl.intops.object.PeekOperation;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

public class DistributedCacheStream<Original, R>
extends AbstractCacheStream<Original, R, Stream<R>, CacheStream<R>>
implements CacheStream<R> {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private final int maxSegment;

    public DistributedCacheStream(Address localAddress, boolean parallel, InvocationContext ctx, long explicitFlags, int distributedBatchSize, Executor executor, ComponentRegistry registry, java.util.function.Function<? super Original, ?> toKeyFunction, ClusterPublisherManager<?, ?> clusterPublisherManager) {
        super(localAddress, parallel, ctx, explicitFlags, distributedBatchSize, executor, registry, toKeyFunction, clusterPublisherManager);
        Configuration configuration = registry.getComponent(Configuration.class);
        this.maxSegment = configuration.clustering().hash().numSegments();
    }

    protected DistributedCacheStream(AbstractCacheStream other) {
        super(other);
        Configuration configuration = this.registry.getComponent(Configuration.class);
        this.maxSegment = configuration.clustering().hash().numSegments();
    }

    @Override
    protected Log getLog() {
        return log;
    }

    @Override
    protected CacheStream<R> unwrap() {
        return this;
    }

    @Override
    public CacheStream<R> filter(Predicate<? super R> predicate) {
        return (CacheStream)this.addIntermediateOperation(new FilterOperation<R>(predicate));
    }

    @Override
    public <R1> CacheStream<R1> map(java.util.function.Function<? super R, ? extends R1> mapper) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        this.addIntermediateOperationMap(new MapOperation<R, R1>(mapper));
        return this;
    }

    @Override
    public IntCacheStream mapToInt(ToIntFunction<? super R> mapper) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        this.addIntermediateOperationMap(new MapToIntOperation<R>(mapper));
        return this.intCacheStream();
    }

    @Override
    public LongCacheStream mapToLong(ToLongFunction<? super R> mapper) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        this.addIntermediateOperationMap(new MapToLongOperation<R>(mapper));
        return this.longCacheStream();
    }

    @Override
    public DoubleCacheStream mapToDouble(ToDoubleFunction<? super R> mapper) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        this.addIntermediateOperationMap(new MapToDoubleOperation<R>(mapper));
        return this.doubleCacheStream();
    }

    @Override
    public <R1> CacheStream<R1> flatMap(java.util.function.Function<? super R, ? extends Stream<? extends R1>> mapper) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        this.addIntermediateOperationMap(new FlatMapOperation(mapper));
        return this;
    }

    @Override
    public IntCacheStream flatMapToInt(java.util.function.Function<? super R, ? extends IntStream> mapper) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        this.addIntermediateOperationMap(new FlatMapToIntOperation<R>(mapper));
        return this.intCacheStream();
    }

    @Override
    public LongCacheStream flatMapToLong(java.util.function.Function<? super R, ? extends LongStream> mapper) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        this.addIntermediateOperationMap(new FlatMapToLongOperation<R>(mapper));
        return this.longCacheStream();
    }

    @Override
    public DoubleCacheStream flatMapToDouble(java.util.function.Function<? super R, ? extends DoubleStream> mapper) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        this.addIntermediateOperationMap(new FlatMapToDoubleOperation<R>(mapper));
        return this.doubleCacheStream();
    }

    @Override
    public CacheStream<R> distinct() {
        this.addIntermediateOperation(DistinctOperation.getInstance());
        return new IntermediateCacheStream(this).distinct();
    }

    @Override
    public CacheStream<R> sorted() {
        return new IntermediateCacheStream(this).sorted();
    }

    @Override
    public CacheStream<R> sorted(Comparator<? super R> comparator) {
        return new IntermediateCacheStream(this).sorted((Comparator)comparator);
    }

    @Override
    public CacheStream<R> peek(Consumer<? super R> action) {
        return (CacheStream)this.addIntermediateOperation(new PeekOperation<R>(action));
    }

    @Override
    public CacheStream<R> limit(long maxSize) {
        this.addIntermediateOperation(new LimitOperation(maxSize));
        return new IntermediateCacheStream(this).limit(maxSize);
    }

    @Override
    public CacheStream<R> skip(long n) {
        return new IntermediateCacheStream(this).skip(n);
    }

    @Override
    public R reduce(R identity, BinaryOperator<R> accumulator) {
        return this.performPublisherOperation(PublisherReducers.reduce(identity, accumulator), PublisherReducers.reduce(accumulator));
    }

    @Override
    public Optional<R> reduce(BinaryOperator<R> accumulator) {
        java.util.function.Function<Publisher<R>, CompletionStage<R>> function = PublisherReducers.reduce(accumulator);
        R value = this.performPublisherOperation(function, function);
        return Optional.ofNullable(value);
    }

    @Override
    public <U> U reduce(U identity, BiFunction<U, ? super R, U> accumulator, BinaryOperator<U> combiner) {
        return this.performPublisherOperation(PublisherReducers.reduce(identity, accumulator), PublisherReducers.reduce(combiner));
    }

    @Override
    public <R1> R1 collect(Supplier<R1> supplier, BiConsumer<R1, ? super R> accumulator, BiConsumer<R1, R1> combiner) {
        return this.performPublisherOperation(PublisherReducers.collect(supplier, accumulator), PublisherReducers.accumulate(combiner));
    }

    @Override
    public <R1, A> R1 collect(Collector<? super R, A, R1> collector) {
        A intermediateResult = this.performPublisherOperation(PublisherReducers.collectorReducer(collector), PublisherReducers.collectorFinalizer(collector));
        if (collector.characteristics().contains((Object)Collector.Characteristics.IDENTITY_FINISH)) {
            return (R1)intermediateResult;
        }
        return collector.finisher().apply(intermediateResult);
    }

    @Override
    public Optional<R> min(Comparator<? super R> comparator) {
        java.util.function.Function<Publisher<? super R>, CompletionStage<? super R>> function = PublisherReducers.min(comparator);
        R value = this.performPublisherOperation(function, function);
        return Optional.ofNullable(value);
    }

    @Override
    public Optional<R> max(Comparator<? super R> comparator) {
        java.util.function.Function<Publisher<? super R>, CompletionStage<? super R>> function = PublisherReducers.max(comparator);
        R value = this.performPublisherOperation(function, function);
        return Optional.ofNullable(value);
    }

    @Override
    public boolean anyMatch(Predicate<? super R> predicate) {
        return this.performPublisherOperation(PublisherReducers.anyMatch(predicate), PublisherReducers.or());
    }

    @Override
    public boolean allMatch(Predicate<? super R> predicate) {
        return this.performPublisherOperation(PublisherReducers.allMatch(predicate), PublisherReducers.and());
    }

    @Override
    public boolean noneMatch(Predicate<? super R> predicate) {
        return this.performPublisherOperation(PublisherReducers.noneMatch(predicate), PublisherReducers.and());
    }

    @Override
    public Optional<R> findFirst() {
        return this.findAny();
    }

    @Override
    public Optional<R> findAny() {
        java.util.function.Function function = PublisherReducers.findFirst();
        Object value = this.performPublisherOperation(function, function);
        return Optional.ofNullable(value);
    }

    @Override
    public long count() {
        return this.performPublisherOperation(PublisherReducers.count(), PublisherReducers.add());
    }

    @Override
    public Iterator<R> iterator() {
        Publisher publisherToSubscribeTo;
        CompletionSegmentTracker segmentTracker;
        log.tracef("Distributed iterator invoked with rehash: %s", (Object)this.rehashAware);
        java.util.function.Function usedTransformer = this.intermediateOperations.isEmpty() ? MarshallableFunctions.identity() : new CacheIntermediatePublisher(this.intermediateOperations);
        DeliveryGuarantee deliveryGuarantee = this.rehashAware ? DeliveryGuarantee.EXACTLY_ONCE : DeliveryGuarantee.AT_MOST_ONCE;
        SegmentPublisherSupplier publisher = this.toKeyFunction == null ? this.cpm.keyPublisher(this.segmentsToFilter, this.keysToFilter, this.invocationContext, this.explicitFlags, deliveryGuarantee, this.distributedBatchSize, usedTransformer) : this.cpm.entryPublisher(this.segmentsToFilter, this.keysToFilter, this.invocationContext, this.explicitFlags, deliveryGuarantee, this.distributedBatchSize, usedTransformer);
        if (this.segmentCompletionListener != null) {
            segmentTracker = new CompletionSegmentTracker(this.segmentCompletionListener);
            publisherToSubscribeTo = Flowable.fromPublisher(publisher.publisherWithSegments()).mapOptional(segmentTracker);
        } else {
            segmentTracker = null;
            publisherToSubscribeTo = publisher.publisherWithoutSegments();
        }
        final CloseableIterator realIterator = Closeables.iterator(Flowable.fromPublisher(publisherToSubscribeTo).onErrorResumeNext(RxJavaInterop.cacheExceptionWrapper()), this.distributedBatchSize);
        this.onClose(realIterator::close);
        if (segmentTracker != null) {
            return new AbstractIterator<R>(){

                @Override
                protected R getNext() {
                    if (realIterator.hasNext()) {
                        Object value = realIterator.next();
                        segmentTracker.returningObject(value);
                        return value;
                    }
                    segmentTracker.onComplete();
                    return null;
                }
            };
        }
        return realIterator;
    }

    @Override
    public Spliterator<R> spliterator() {
        return Spliterators.spliterator(this.iterator(), Long.MAX_VALUE, 4096);
    }

    @Override
    public void forEach(Consumer<? super R> action) {
        this.peek((Consumer)action).iterator().forEachRemaining(ignore -> {});
    }

    @Override
    public <K, V> void forEach(BiConsumer<Cache<K, V>, ? super R> action) {
        this.peek((Consumer)CacheBiConsumers.objectConsumer(action)).iterator().forEachRemaining(ignore -> {});
    }

    @Override
    public void forEachOrdered(Consumer<? super R> action) {
        this.forEach(action);
    }

    @Override
    public Object[] toArray() {
        return this.performPublisherOperation(PublisherReducers.toArrayReducer(), PublisherReducers.toArrayFinalizer());
    }

    @Override
    public <A> A[] toArray(IntFunction<A[]> generator) {
        java.util.function.Function function = PublisherReducers.toArrayReducer(generator);
        return this.performPublisherOperation(function, PublisherReducers.toArrayFinalizer(generator));
    }

    @Override
    public CacheStream<R> sequentialDistribution() {
        this.parallelDistribution = false;
        return this;
    }

    @Override
    public CacheStream<R> parallelDistribution() {
        this.parallelDistribution = true;
        return this;
    }

    @Override
    public CacheStream<R> filterKeySegments(Set<Integer> segments) {
        this.segmentsToFilter = IntSets.from(segments);
        return this;
    }

    @Override
    public CacheStream<R> filterKeySegments(IntSet segments) {
        this.segmentsToFilter = segments;
        return this;
    }

    @Override
    public CacheStream<R> filterKeys(Set<?> keys) {
        this.keysToFilter = keys;
        return this;
    }

    @Override
    public CacheStream<R> distributedBatchSize(int batchSize) {
        this.distributedBatchSize = batchSize;
        return this;
    }

    @Override
    public CacheStream<R> segmentCompletionListener(BaseCacheStream.SegmentCompletionListener listener) {
        this.segmentCompletionListener = this.segmentCompletionListener == null ? listener : DistributedCacheStream.composeWithExceptions(this.segmentCompletionListener, listener);
        return this;
    }

    @Override
    public CacheStream<R> disableRehashAware() {
        this.rehashAware = false;
        return this;
    }

    @Override
    public CacheStream<R> timeout(long timeout, TimeUnit unit) {
        if (timeout <= 0L) {
            throw new IllegalArgumentException("Timeout must be greater than 0");
        }
        this.timeout = timeout;
        this.timeoutUnit = unit;
        return this;
    }

    protected DistributedIntCacheStream intCacheStream() {
        return new DistributedIntCacheStream((AbstractCacheStream)this);
    }

    protected DistributedDoubleCacheStream doubleCacheStream() {
        return new DistributedDoubleCacheStream((AbstractCacheStream)this);
    }

    protected DistributedLongCacheStream longCacheStream() {
        return new DistributedLongCacheStream((AbstractCacheStream)this);
    }

    private class CompletionSegmentTracker<R>
    implements Function<SegmentPublisherSupplier.Notification<R>, Optional<? extends R>> {
        private final Consumer<Supplier<PrimitiveIterator.OfInt>> listener;
        private final Map<R, IntSet> awaitingNotification;
        volatile IntSet completedSegments;

        private CompletionSegmentTracker(Consumer<Supplier<PrimitiveIterator.OfInt>> listener) {
            this.listener = Objects.requireNonNull(listener);
            this.awaitingNotification = new HashMap<R, IntSet>();
            this.completedSegments = IntSets.mutableEmptySet(DistributedCacheStream.this.maxSegment);
        }

        @Override
        public Optional<R> apply(SegmentPublisherSupplier.Notification<R> r) throws Throwable {
            if (r.isSegmentComplete()) {
                this.completedSegments.set(r.completedSegment());
                return Optional.empty();
            }
            R value = r.value();
            if (!this.completedSegments.isEmpty()) {
                log.tracef("Going to complete segments %s when %s is iterated upon", (Object)this.completedSegments, (Object)Util.toStr(value));
                this.awaitingNotification.put(value, this.completedSegments);
                this.completedSegments = IntSets.mutableEmptySet(DistributedCacheStream.this.maxSegment);
            }
            return Optional.of(value);
        }

        public void returningObject(Object value) {
            IntSet segments = this.awaitingNotification.remove(value);
            if (segments != null) {
                log.tracef("Notifying listeners of segments %s complete now that %s is returning", (Object)segments, (Object)Util.toStr(value));
                this.listener.accept(segments::iterator);
            }
        }

        public void onComplete() {
            log.tracef("Completing last segments of: %s", (Object)this.completedSegments);
            this.listener.accept(this.completedSegments::iterator);
            this.completedSegments.clear();
        }
    }
}

