/*
 * Decompiled with CFR 0.152.
 */
package org.finos.tracdap.common.concurrent.flow;

import io.netty.util.concurrent.OrderedEventExecutor;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.finos.tracdap.common.exception.ETracInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HubProcessor<T>
implements Flow.Processor<T, T> {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private Flow.Subscription sourceSubscription;
    private final Map<Flow.Subscriber<? super T>, HubTargetState> targets = new HashMap<Flow.Subscriber<? super T>, HubTargetState>();
    private final AtomicBoolean sourceGuard = new AtomicBoolean(false);
    private final ConcurrentMap<Flow.Subscriber<?>, Object> targetGuard = new ConcurrentHashMap();
    private final LinkedList<T> messageBuffer = new LinkedList();
    private final Consumer<T> releaseFunc;
    private final OrderedEventExecutor eventLoop;
    private long messageBufferStart;
    private long messageBufferEnd;
    private long sourceRequestIndex;
    private boolean completeFlag;

    public HubProcessor(OrderedEventExecutor eventLoop, Consumer<T> releaseFunc) {
        this.releaseFunc = releaseFunc;
        this.eventLoop = eventLoop;
        this.messageBufferStart = 0L;
        this.messageBufferEnd = 0L;
        this.sourceRequestIndex = 0L;
    }

    public HubProcessor(OrderedEventExecutor eventLoop) {
        this(eventLoop, null);
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        Object priorTarget = this.targetGuard.putIfAbsent(subscriber, new Object());
        if (priorTarget != null) {
            IllegalStateException err = new IllegalStateException("Duplicate subscription in hub processor (this is a bug)");
            this.eventLoop.execute(() -> subscriber.onError(err));
            return;
        }
        this.eventLoop.execute(() -> this.doNewSubscription(subscriber));
    }

    private void doNewSubscription(Flow.Subscriber<? super T> subscriber) {
        HubSubscription subscription = new HubSubscription(subscriber);
        HubTargetState state = new HubTargetState();
        state.subscription = subscription;
        this.targets.put(subscriber, state);
        subscriber.onSubscribe(subscription);
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        boolean priorSourceOk = this.sourceGuard.compareAndSet(false, true);
        if (!priorSourceOk) {
            throw new IllegalStateException("Hub processor subscribed to multiple upstream sources");
        }
        if (this.targetGuard.isEmpty()) {
            throw new IllegalStateException("Hub processor connected to source before any targets");
        }
        this.eventLoop.execute(() -> this.doSubscribe(subscription));
    }

    private void doSubscribe(Flow.Subscription subscription) {
        this.sourceSubscription = subscription;
        if (this.targets.isEmpty()) {
            this.sourceSubscription.cancel();
            return;
        }
        long maxRequest = this.targets.values().stream().map(state -> state.requestIndex).mapToLong(x -> x.intValue()).max().getAsLong();
        this.sourceSubscription.request(maxRequest);
        this.sourceRequestIndex = maxRequest;
    }

    @Override
    public void onNext(T message) {
        this.eventLoop.submit(() -> this.doNext(message));
    }

    private void doNext(T message) {
        this.messageBuffer.add(message);
        ++this.messageBufferEnd;
        this.eventLoop.submit(this::dispatchMessages);
    }

    @Override
    public void onError(Throwable error) {
        this.eventLoop.submit(() -> this.doError(error));
    }

    private void doError(Throwable error) {
        this.clearBuffer();
        Throwable completionError = error instanceof CompletionException ? error : new CompletionException(error.getMessage(), error);
        for (Map.Entry<Flow.Subscriber<T>, HubTargetState> target : this.targets.entrySet()) {
            this.sendTargetOnError(target.getKey(), target.getValue(), completionError);
        }
    }

    @Override
    public void onComplete() {
        this.eventLoop.submit(this::doComplete);
    }

    private void doComplete() {
        this.completeFlag = true;
        this.eventLoop.submit(this::dispatchMessages);
    }

    private void dispatchMessages() {
        if (this.targets.isEmpty()) {
            this.clearBuffer();
            return;
        }
        int i = 0;
        while ((long)i < this.messageBufferEnd - this.messageBufferStart) {
            long messageIndex = (long)i + this.messageBufferStart;
            T message = this.messageBuffer.get(i);
            for (Map.Entry<Flow.Subscriber<T>, HubTargetState> subscriberState : this.targets.entrySet()) {
                Flow.Subscriber<? super T> targetSubscriber = subscriberState.getKey();
                HubTargetState target2 = subscriberState.getValue();
                if (messageIndex != (long)target2.receiveIndex || messageIndex >= (long)target2.requestIndex || target2.badError) continue;
                this.sendTargetOnNext(targetSubscriber, target2, message);
            }
            ++i;
        }
        if (this.completeFlag) {
            for (Map.Entry<Flow.Subscriber<T>, HubTargetState> subscriberState : this.targets.entrySet()) {
                Flow.Subscriber<? super T> targetSubscriber = subscriberState.getKey();
                HubTargetState target3 = subscriberState.getValue();
                if ((long)target3.receiveIndex != this.messageBufferEnd || target3.completeFlag) continue;
                this.sendTargetOnComplete(targetSubscriber, target3);
            }
        }
        List failedTargets = this.targets.entrySet().stream().filter(x -> ((HubTargetState)x.getValue()).badError).map(Map.Entry::getKey).collect(Collectors.toList());
        for (Flow.Subscriber target4 : failedTargets) {
            this.targets.remove(target4);
        }
        if (this.targets.isEmpty()) {
            this.clearBuffer();
        } else {
            long minReceiveIndex = this.targets.values().stream().map(target -> target.receiveIndex).mapToLong(x -> x.intValue()).min().getAsLong();
            while (minReceiveIndex > this.messageBufferStart) {
                this.messageBuffer.pop();
                ++this.messageBufferStart;
            }
        }
        if (this.targets.isEmpty() && !failedTargets.isEmpty()) {
            this.sourceSubscription.cancel();
        }
    }

    private void requestTargetMessages(Flow.Subscriber<? super T> target, long n) {
        HubTargetState state = this.targets.get(target);
        state.requestIndex = (int)((long)state.requestIndex + n);
        if ((long)state.requestIndex > this.sourceRequestIndex && this.sourceSubscription != null) {
            this.sourceSubscription.request((long)state.requestIndex - this.sourceRequestIndex);
            this.sourceRequestIndex = state.requestIndex;
        }
        this.eventLoop.submit(this::dispatchMessages);
    }

    private void cancelTargetSubscription(Flow.Subscriber<? super T> target) {
        this.targets.remove(target);
        if (this.targets.isEmpty()) {
            this.clearBuffer();
            if (this.sourceSubscription != null) {
                this.sourceSubscription.cancel();
            }
        }
    }

    private void sendTargetOnNext(Flow.Subscriber<? super T> target, HubTargetState state, T message) {
        try {
            ++state.receiveIndex;
            target.onNext(message);
        }
        catch (Throwable e) {
            state.completeFlag = true;
            state.badError = true;
            this.reportUnhandledError(target, e);
        }
    }

    private void sendTargetOnComplete(Flow.Subscriber<? super T> target, HubTargetState state) {
        try {
            state.completeFlag = true;
            target.onComplete();
        }
        catch (Throwable e) {
            state.badError = true;
            this.reportUnhandledError(target, e);
        }
    }

    private void sendTargetOnError(Flow.Subscriber<? super T> target, HubTargetState state, Throwable error) {
        try {
            state.completeFlag = true;
            target.onError(error);
        }
        catch (Throwable secondaryError) {
            state.badError = true;
            this.log.warn("Following a previous error, the stream processing pipeline was not successfully notified");
            this.log.warn("This is a bug, and may cause hanging and/or resource leaks");
            this.log.warn(secondaryError.getMessage(), secondaryError);
        }
    }

    private void reportUnhandledError(Flow.Subscriber<? super T> target, Throwable error) {
        String msg = "An error occurred in the stream processing pipeline and has not been handled";
        ETracInternal err = new ETracInternal(msg, error);
        this.log.error(msg, (Throwable)err);
        try {
            target.onError(err);
        }
        catch (Throwable secondaryError) {
            this.log.warn("Following a previous unhandled error, the stream processing pipeline was not successfully notified");
            this.log.warn("This is a bug, and may cause hanging and/or resource leaks");
            this.log.warn(secondaryError.getMessage(), secondaryError);
        }
    }

    private void clearBuffer() {
        if (this.releaseFunc != null) {
            this.messageBuffer.forEach(this.releaseFunc);
        }
        this.messageBuffer.clear();
        this.messageBufferStart = this.messageBufferEnd;
    }

    private static class HubTargetState {
        Flow.Subscription subscription;
        int requestIndex;
        int receiveIndex;
        boolean completeFlag;
        boolean badError;

        private HubTargetState() {
        }
    }

    private class HubSubscription
    implements Flow.Subscription {
        private final Flow.Subscriber<? super T> target;

        private HubSubscription(Flow.Subscriber<? super T> target) {
            this.target = target;
        }

        @Override
        public void request(long n) {
            HubProcessor.this.eventLoop.submit(() -> HubProcessor.this.requestTargetMessages(this.target, n));
        }

        @Override
        public void cancel() {
            HubProcessor.this.eventLoop.submit(() -> HubProcessor.this.cancelTargetSubscription(this.target));
        }
    }
}

