/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.PartitionStatusChanged;
import org.infinispan.notifications.cachelistener.event.PartitionStatusChangedEvent;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.util.logging.Log;
import org.reactivestreams.Publisher;

@Scope(value=Scopes.NAMED_CACHE)
public class PartitionAwareClusterPublisherManager<K, V>
extends ClusterPublisherManagerImpl<K, V> {
    volatile AvailabilityMode currentMode = AvailabilityMode.AVAILABLE;
    protected final PartitionListener listener = new PartitionListener();
    @Inject
    protected ComponentRef<Cache<?, ?>> cache;
    private final Set<AtomicBoolean> pendingOperations = ConcurrentHashMap.newKeySet();

    @Override
    public void start() {
        super.start();
        this.cache.running().addListener(this.listener);
    }

    @Override
    public <R> CompletionStage<R> keyReduction(boolean parallelPublisher, IntSet segments, Set<K> keysToInclude, InvocationContext ctx, long explicitFlags, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        this.checkPartitionStatus();
        CompletionStage original = super.keyReduction(parallelPublisher, segments, keysToInclude, ctx, explicitFlags, deliveryGuarantee, transformer, finalizer);
        return this.registerStage(original);
    }

    @Override
    public <R> CompletionStage<R> entryReduction(boolean parallelPublisher, IntSet segments, Set<K> keysToInclude, InvocationContext ctx, long explicitFlags, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        this.checkPartitionStatus();
        CompletionStage original = super.entryReduction(parallelPublisher, segments, keysToInclude, ctx, explicitFlags, deliveryGuarantee, transformer, finalizer);
        return this.registerStage(original);
    }

    private <R> CompletionStage<R> registerStage(CompletionStage<R> original) {
        AtomicBoolean ab = this.registerOperation();
        return original.handle((value, t2) -> {
            this.pendingOperations.remove(ab);
            if (ab.get()) {
                throw Log.CLUSTER.partitionDegraded();
            }
            CompletableFutures.rethrowExceptionIfPresent(t2);
            return value;
        });
    }

    private AtomicBoolean registerOperation() {
        AtomicBoolean ab = new AtomicBoolean();
        this.pendingOperations.add(ab);
        if (this.isPartitionDegraded()) {
            ab.set(true);
        }
        return ab;
    }

    @Override
    public <R> SegmentPublisherSupplier<R> keyPublisher(IntSet segments, Set<K> keysToInclude, InvocationContext invocationContext, long explicitFlags, DeliveryGuarantee deliveryGuarantee, int batchSize, Function<? super Publisher<K>, ? extends Publisher<R>> transformer) {
        this.checkPartitionStatus();
        SegmentPublisherSupplier original = super.keyPublisher(segments, keysToInclude, invocationContext, explicitFlags, deliveryGuarantee, batchSize, transformer);
        return this.registerPublisher(original);
    }

    @Override
    public <R> SegmentPublisherSupplier<R> entryPublisher(IntSet segments, Set<K> keysToInclude, InvocationContext invocationContext, long explicitFlags, DeliveryGuarantee deliveryGuarantee, int batchSize, Function<? super Publisher<CacheEntry<K, V>>, ? extends Publisher<R>> transformer) {
        this.checkPartitionStatus();
        SegmentPublisherSupplier original = super.entryPublisher(segments, keysToInclude, invocationContext, explicitFlags, deliveryGuarantee, batchSize, transformer);
        return this.registerPublisher(original);
    }

    private <R> SegmentPublisherSupplier<R> registerPublisher(final SegmentPublisherSupplier<R> original) {
        return new SegmentPublisherSupplier<R>(){

            @Override
            public Publisher<SegmentPublisherSupplier.Notification<R>> publisherWithSegments() {
                return this.handleEarlyTermination(SegmentPublisherSupplier::publisherWithSegments);
            }

            @Override
            public Publisher<R> publisherWithoutSegments() {
                return this.handleEarlyTermination(SegmentPublisherSupplier::publisherWithoutSegments);
            }

            private <S> Flowable<S> handleEarlyTermination(Function<SegmentPublisherSupplier<R>, Publisher<S>> function) {
                AtomicBoolean ab = PartitionAwareClusterPublisherManager.this.registerOperation();
                return Flowable.fromPublisher(function.apply(original)).doOnNext(s2 -> PartitionAwareClusterPublisherManager.this.checkPendingOperation(ab)).doOnComplete(() -> PartitionAwareClusterPublisherManager.this.checkPendingOperation(ab)).doFinally(() -> PartitionAwareClusterPublisherManager.this.pendingOperations.remove(ab));
            }
        };
    }

    private void checkPendingOperation(AtomicBoolean ab) {
        if (ab.get()) {
            throw Log.CLUSTER.partitionDegraded();
        }
    }

    private void checkPartitionStatus() {
        if (this.isPartitionDegraded()) {
            throw Log.CLUSTER.partitionDegraded();
        }
    }

    private boolean isPartitionDegraded() {
        return this.currentMode != AvailabilityMode.AVAILABLE;
    }

    @Listener
    private class PartitionListener {
        private PartitionListener() {
        }

        @PartitionStatusChanged
        public void onPartitionChange(PartitionStatusChangedEvent<K, ?> event) {
            if (!event.isPre()) {
                AvailabilityMode newMode;
                PartitionAwareClusterPublisherManager.this.currentMode = newMode = event.getAvailabilityMode();
                if (newMode == AvailabilityMode.DEGRADED_MODE) {
                    PartitionAwareClusterPublisherManager.this.pendingOperations.forEach(ab -> ab.set(true));
                }
            }
        }
    }
}

