package org.finos.tracdap.common.concurrent.flow;

import io.netty.util.concurrent.OrderedEventExecutor;
import java.util.Objects;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow;

/* loaded from: input_file:org/finos/tracdap/common/concurrent/flow/EventLoopProcessor.class */
public class EventLoopProcessor<T> implements Flow.Processor<T, T> {
    private final OrderedEventExecutor executor;
    private Flow.Subscriber<? super T> target;
    private Flow.Subscription sourceSubscription;

    /* loaded from: input_file:org/finos/tracdap/common/concurrent/flow/EventLoopProcessor$EventLoopSubscription.class */
    private class EventLoopSubscription implements Flow.Subscription {
        private final Flow.Subscription sourceSubscription;

        EventLoopSubscription(Flow.Subscription subscription) {
            this.sourceSubscription = subscription;
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void request(long j) {
            EventLoopProcessor.this.executor.execute(() -> {
                this.sourceSubscription.request(j);
            });
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void cancel() {
            OrderedEventExecutor orderedEventExecutor = EventLoopProcessor.this.executor;
            Flow.Subscription subscription = this.sourceSubscription;
            Objects.requireNonNull(subscription);
            orderedEventExecutor.execute(subscription::cancel);
        }
    }

    public EventLoopProcessor(OrderedEventExecutor orderedEventExecutor) {
        this.executor = orderedEventExecutor;
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        this.target = subscriber;
        EventLoopSubscription eventLoopSubscription = new EventLoopSubscription(this.sourceSubscription);
        this.executor.execute(() -> {
            this.target.onSubscribe(eventLoopSubscription);
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        this.sourceSubscription = subscription;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(T t) {
        this.executor.execute(() -> {
            this.target.onNext(t);
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        Throwable completionException = th instanceof CompletionException ? th : new CompletionException(th.getMessage(), th);
        this.executor.execute(() -> {
            this.target.onError(completionException);
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        OrderedEventExecutor orderedEventExecutor = this.executor;
        Flow.Subscriber<? super T> subscriber = this.target;
        Objects.requireNonNull(subscriber);
        orderedEventExecutor.execute(subscriber::onComplete);
    }
}
