/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl;

import java.util.ArrayDeque;
import java.util.PrimitiveIterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.BaseStream;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.Util;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.partitionhandling.impl.PartitionHandlingManager;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.stream.impl.CacheStreamIntermediateReducer;
import org.infinispan.stream.impl.intops.IntermediateOperation;
import org.infinispan.util.KeyValuePair;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.reactivestreams.Publisher;

public abstract class AbstractCacheStream<Original, T, S extends BaseStream<T, S>, S2 extends S>
implements BaseStream<T, S> {
    protected final Queue<IntermediateOperation> intermediateOperations;
    protected final Address localAddress;
    protected final ClusterPublisherManager cpm;
    protected final Executor executor;
    protected final ComponentRegistry registry;
    protected final PartitionHandlingManager partition;
    protected final KeyPartitioner keyPartitioner;
    protected final StateTransferLock stateTransferLock;
    protected final long explicitFlags;
    protected final Function<? super Original, ?> toKeyFunction;
    protected final InvocationContext invocationContext;
    protected Runnable closeRunnable = null;
    protected Boolean parallelDistribution;
    protected boolean parallel;
    protected boolean rehashAware = true;
    protected Set<?> keysToFilter;
    protected IntSet segmentsToFilter;
    protected int distributedBatchSize;
    protected Consumer<Supplier<PrimitiveIterator.OfInt>> segmentCompletionListener;
    protected IteratorOperation iteratorOperation = IteratorOperation.NO_MAP;
    protected long timeout = 30L;
    protected TimeUnit timeoutUnit = TimeUnit.SECONDS;

    protected AbstractCacheStream(Address localAddress, boolean parallel, InvocationContext ctx, long explicitFlags, int distributedBatchSize, Executor executor, ComponentRegistry registry, Function<? super Original, ?> toKeyFunction, ClusterPublisherManager<?, ?> clusterPublisherManager) {
        this.localAddress = localAddress;
        this.parallel = parallel;
        this.invocationContext = ctx;
        this.explicitFlags = explicitFlags;
        this.distributedBatchSize = distributedBatchSize;
        this.executor = executor;
        this.registry = registry;
        this.toKeyFunction = toKeyFunction;
        this.partition = registry.getComponent(PartitionHandlingManager.class);
        this.keyPartitioner = registry.getComponent(KeyPartitioner.class);
        this.stateTransferLock = registry.getComponent(StateTransferLock.class);
        this.cpm = clusterPublisherManager;
        this.intermediateOperations = new ArrayDeque<IntermediateOperation>();
    }

    protected AbstractCacheStream(AbstractCacheStream<Original, T, S, S2> other) {
        this.intermediateOperations = other.intermediateOperations;
        this.localAddress = other.localAddress;
        this.invocationContext = other.invocationContext;
        this.explicitFlags = other.explicitFlags;
        this.executor = other.executor;
        this.registry = other.registry;
        this.toKeyFunction = other.toKeyFunction;
        this.partition = other.partition;
        this.keyPartitioner = other.keyPartitioner;
        this.stateTransferLock = other.stateTransferLock;
        this.cpm = other.cpm;
        this.closeRunnable = other.closeRunnable;
        this.parallel = other.parallel;
        this.parallelDistribution = other.parallelDistribution;
        this.rehashAware = other.rehashAware;
        this.keysToFilter = other.keysToFilter;
        this.segmentsToFilter = other.segmentsToFilter;
        this.distributedBatchSize = other.distributedBatchSize;
        this.segmentCompletionListener = other.segmentCompletionListener;
        this.iteratorOperation = other.iteratorOperation;
        this.timeout = other.timeout;
        this.timeoutUnit = other.timeoutUnit;
    }

    protected abstract Log getLog();

    protected S2 addIntermediateOperation(IntermediateOperation<T, S, T, S> intermediateOperation) {
        intermediateOperation.handleInjection(this.registry);
        this.addIntermediateOperation(this.intermediateOperations, intermediateOperation);
        return this.unwrap();
    }

    protected void addIntermediateOperationMap(IntermediateOperation<T, S, ?, ?> intermediateOperation) {
        intermediateOperation.handleInjection(this.registry);
        this.addIntermediateOperation(this.intermediateOperations, intermediateOperation);
    }

    protected void addIntermediateOperation(Queue<IntermediateOperation> intermediateOperations, IntermediateOperation<T, S, ?, ?> intermediateOperation) {
        intermediateOperations.add(intermediateOperation);
    }

    protected abstract S2 unwrap();

    @Override
    public boolean isParallel() {
        return this.parallel;
    }

    @Override
    public S2 sequential() {
        this.parallel = false;
        return this.unwrap();
    }

    @Override
    public S2 parallel() {
        this.parallel = true;
        return this.unwrap();
    }

    @Override
    public S2 unordered() {
        return this.unwrap();
    }

    @Override
    public S2 onClose(Runnable closeHandler) {
        this.closeRunnable = this.closeRunnable == null ? closeHandler : Util.composeWithExceptions(this.closeRunnable, closeHandler);
        return this.unwrap();
    }

    @Override
    public void close() {
        if (this.closeRunnable != null) {
            this.closeRunnable.run();
        }
    }

    <R> R performPublisherOperation(Function<? super Publisher<T>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        Function<Publisher<T>, CompletionStage<R>> usedTransformer = this.intermediateOperations.isEmpty() ? transformer : new CacheStreamIntermediateReducer(this.intermediateOperations, transformer);
        DeliveryGuarantee guarantee = this.rehashAware ? DeliveryGuarantee.EXACTLY_ONCE : DeliveryGuarantee.AT_MOST_ONCE;
        CompletionStage stage = this.toKeyFunction == null ? this.cpm.keyReduction(this.parallel, this.segmentsToFilter, this.keysToFilter, this.invocationContext, this.explicitFlags, guarantee, usedTransformer, finalizer) : this.cpm.entryReduction(this.parallel, this.segmentsToFilter, this.keysToFilter, this.invocationContext, this.explicitFlags, guarantee, usedTransformer, finalizer);
        return CompletionStages.join(stage);
    }

    protected boolean isPrimaryOwner(ConsistentHash ch, Object key) {
        return this.localAddress.equals(ch.locatePrimaryOwnerForSegment(this.keyPartitioner.getSegment(key)));
    }

    protected static Consumer<Supplier<PrimitiveIterator.OfInt>> composeWithExceptions(Consumer<Supplier<PrimitiveIterator.OfInt>> a, Consumer<Supplier<PrimitiveIterator.OfInt>> b) {
        return segments -> {
            try {
                a.accept((Supplier<PrimitiveIterator.OfInt>)segments);
            }
            catch (Throwable e1) {
                try {
                    b.accept((Supplier<PrimitiveIterator.OfInt>)segments);
                }
                catch (Throwable e2) {
                    try {
                        e1.addSuppressed(e2);
                    }
                    catch (Throwable throwable) {
                        // empty catch block
                    }
                }
                throw e1;
            }
            b.accept((Supplier<PrimitiveIterator.OfInt>)segments);
        };
    }

    static enum IteratorOperation {
        NO_MAP,
        MAP{

            @Override
            public <In, Out> Function<In, Out> getFunction() {
                return e -> ((KeyValuePair)e).getValue();
            }
        }
        ,
        FLAT_MAP;


        public <In, Out> Function<In, Out> getFunction() {
            return null;
        }
    }
}

