package org.finos.tracdap.common.concurrent.flow;

import io.netty.util.concurrent.OrderedEventExecutor;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.finos.tracdap.common.exception.ETracInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/finos/tracdap/common/concurrent/flow/HubProcessor.class */
public class HubProcessor<T> implements Flow.Processor<T, T> {
    private final Logger log;
    private Flow.Subscription sourceSubscription;
    private final Map<Flow.Subscriber<? super T>, HubTargetState> targets;
    private final AtomicBoolean sourceGuard;
    private final ConcurrentMap<Flow.Subscriber<?>, Object> targetGuard;
    private final LinkedList<T> messageBuffer;
    private final Consumer<T> releaseFunc;
    private final OrderedEventExecutor eventLoop;
    private long messageBufferStart;
    private long messageBufferEnd;
    private long sourceRequestIndex;
    private boolean completeFlag;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/finos/tracdap/common/concurrent/flow/HubProcessor$HubSubscription.class */
    public class HubSubscription implements Flow.Subscription {
        private final Flow.Subscriber<? super T> target;

        private HubSubscription(Flow.Subscriber<? super T> subscriber) {
            this.target = subscriber;
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void request(long j) {
            HubProcessor.this.eventLoop.submit(() -> {
                HubProcessor.this.requestTargetMessages(this.target, j);
            });
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void cancel() {
            HubProcessor.this.eventLoop.submit(() -> {
                HubProcessor.this.cancelTargetSubscription(this.target);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/finos/tracdap/common/concurrent/flow/HubProcessor$HubTargetState.class */
    public static class HubTargetState {
        Flow.Subscription subscription;
        int requestIndex;
        int receiveIndex;
        boolean completeFlag;
        boolean badError;

        private HubTargetState() {
        }
    }

    public HubProcessor(OrderedEventExecutor orderedEventExecutor, Consumer<T> consumer) {
        this.log = LoggerFactory.getLogger(getClass());
        this.targets = new HashMap();
        this.sourceGuard = new AtomicBoolean(false);
        this.targetGuard = new ConcurrentHashMap();
        this.messageBuffer = new LinkedList<>();
        this.releaseFunc = consumer;
        this.eventLoop = orderedEventExecutor;
        this.messageBufferStart = 0L;
        this.messageBufferEnd = 0L;
        this.sourceRequestIndex = 0L;
    }

    public HubProcessor(OrderedEventExecutor orderedEventExecutor) {
        this(orderedEventExecutor, null);
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        if (this.targetGuard.putIfAbsent(subscriber, new Object()) == null) {
            doNewSubscription(subscriber);
        } else {
            IllegalStateException illegalStateException = new IllegalStateException("Duplicate subscription in hub processor (this is a bug)");
            this.eventLoop.execute(() -> {
                subscriber.onError(illegalStateException);
            });
        }
    }

    private void doNewSubscription(Flow.Subscriber<? super T> subscriber) {
        HubSubscription hubSubscription = new HubSubscription(subscriber);
        HubTargetState hubTargetState = new HubTargetState();
        hubTargetState.subscription = hubSubscription;
        this.targets.put(subscriber, hubTargetState);
        subscriber.onSubscribe(hubSubscription);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        if (!this.sourceGuard.compareAndSet(false, true)) {
            throw new IllegalStateException("Hub processor subscribed to multiple upstream sources");
        }
        if (this.targetGuard.isEmpty()) {
            throw new IllegalStateException("Hub processor connected to source before any targets");
        }
        this.eventLoop.execute(() -> {
            doSubscribe(subscription);
        });
    }

    private void doSubscribe(Flow.Subscription subscription) {
        this.sourceSubscription = subscription;
        if (this.targets.isEmpty()) {
            this.sourceSubscription.cancel();
            return;
        }
        long asLong = this.targets.values().stream().map(hubTargetState -> {
            return Integer.valueOf(hubTargetState.requestIndex);
        }).mapToLong(num -> {
            return num.intValue();
        }).max().getAsLong();
        this.sourceSubscription.request(asLong);
        this.sourceRequestIndex = asLong;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(T t) {
        this.eventLoop.submit(() -> {
            doNext(t);
        });
    }

    private void doNext(T t) {
        this.messageBuffer.add(t);
        this.messageBufferEnd++;
        this.eventLoop.submit(this::dispatchMessages);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        this.eventLoop.submit(() -> {
            doError(th);
        });
    }

    private void doError(Throwable th) {
        clearBuffer();
        Throwable completionException = th instanceof CompletionException ? th : new CompletionException(th.getMessage(), th);
        for (Map.Entry<Flow.Subscriber<? super T>, HubTargetState> entry : this.targets.entrySet()) {
            sendTargetOnError(entry.getKey(), entry.getValue(), completionException);
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        this.eventLoop.submit(this::doComplete);
    }

    private void doComplete() {
        this.completeFlag = true;
        this.eventLoop.submit(this::dispatchMessages);
    }

    private void dispatchMessages() {
        if (this.targets.isEmpty()) {
            clearBuffer();
            return;
        }
        for (int i = 0; i < this.messageBufferEnd - this.messageBufferStart; i++) {
            long j = i + this.messageBufferStart;
            T t = this.messageBuffer.get(i);
            for (Map.Entry<Flow.Subscriber<? super T>, HubTargetState> entry : this.targets.entrySet()) {
                Flow.Subscriber<? super T> key = entry.getKey();
                HubTargetState value = entry.getValue();
                if (j == value.receiveIndex && j < value.requestIndex && !value.badError) {
                    sendTargetOnNext(key, value, t);
                }
            }
        }
        if (this.completeFlag) {
            for (Map.Entry<Flow.Subscriber<? super T>, HubTargetState> entry2 : this.targets.entrySet()) {
                Flow.Subscriber<? super T> key2 = entry2.getKey();
                HubTargetState value2 = entry2.getValue();
                if (value2.receiveIndex == this.messageBufferEnd && !value2.completeFlag) {
                    sendTargetOnComplete(key2, value2);
                }
            }
        }
        List list = (List) this.targets.entrySet().stream().filter(entry3 -> {
            return ((HubTargetState) entry3.getValue()).badError;
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
        Iterator it = list.iterator();
        while (it.hasNext()) {
            this.targets.remove((Flow.Subscriber) it.next());
        }
        if (this.targets.isEmpty()) {
            clearBuffer();
        } else {
            long asLong = this.targets.values().stream().map(hubTargetState -> {
                return Integer.valueOf(hubTargetState.receiveIndex);
            }).mapToLong(num -> {
                return num.intValue();
            }).min().getAsLong();
            while (asLong > this.messageBufferStart) {
                this.messageBuffer.pop();
                this.messageBufferStart++;
            }
        }
        if (!this.targets.isEmpty() || list.isEmpty()) {
            return;
        }
        this.sourceSubscription.cancel();
    }

    private void requestTargetMessages(Flow.Subscriber<? super T> subscriber, long j) {
        this.targets.get(subscriber).requestIndex = (int) (r0.requestIndex + j);
        if (r0.requestIndex > this.sourceRequestIndex && this.sourceSubscription != null) {
            this.sourceSubscription.request(r0.requestIndex - this.sourceRequestIndex);
            this.sourceRequestIndex = r0.requestIndex;
        }
        this.eventLoop.submit(this::dispatchMessages);
    }

    private void cancelTargetSubscription(Flow.Subscriber<? super T> subscriber) {
        this.targets.remove(subscriber);
        if (this.targets.isEmpty()) {
            clearBuffer();
            if (this.sourceSubscription != null) {
                this.sourceSubscription.cancel();
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void sendTargetOnNext(Flow.Subscriber<? super T> subscriber, HubTargetState hubTargetState, T t) {
        try {
            hubTargetState.receiveIndex++;
            subscriber.onNext(t);
        } catch (Throwable th) {
            hubTargetState.completeFlag = true;
            hubTargetState.badError = true;
            reportUnhandledError(subscriber, th);
        }
    }

    private void sendTargetOnComplete(Flow.Subscriber<? super T> subscriber, HubTargetState hubTargetState) {
        try {
            hubTargetState.completeFlag = true;
            subscriber.onComplete();
        } catch (Throwable th) {
            hubTargetState.badError = true;
            reportUnhandledError(subscriber, th);
        }
    }

    private void sendTargetOnError(Flow.Subscriber<? super T> subscriber, HubTargetState hubTargetState, Throwable th) {
        try {
            hubTargetState.completeFlag = true;
            subscriber.onError(th);
        } catch (Throwable th2) {
            hubTargetState.badError = true;
            this.log.warn("Following a previous error, the stream processing pipeline was not successfully notified");
            this.log.warn("This is a bug, and may cause hanging and/or resource leaks");
            this.log.warn(th2.getMessage(), th2);
        }
    }

    private void reportUnhandledError(Flow.Subscriber<? super T> subscriber, Throwable th) {
        ETracInternal eTracInternal = new ETracInternal("An error occurred in the stream processing pipeline and has not been handled", th);
        this.log.error("An error occurred in the stream processing pipeline and has not been handled", eTracInternal);
        try {
            subscriber.onError(eTracInternal);
        } catch (Throwable th2) {
            this.log.warn("Following a previous unhandled error, the stream processing pipeline was not successfully notified");
            this.log.warn("This is a bug, and may cause hanging and/or resource leaks");
            this.log.warn(th2.getMessage(), th2);
        }
    }

    private void clearBuffer() {
        if (this.releaseFunc != null) {
            this.messageBuffer.forEach(this.releaseFunc);
        }
        this.messageBuffer.clear();
        this.messageBufferStart = this.messageBufferEnd;
    }
}
