/*
 * Decompiled with CFR 0.152.
 */
package org.finos.tracdap.common.concurrent.flow;

import io.netty.util.concurrent.OrderedEventExecutor;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow;

public class EventLoopProcessor<T>
implements Flow.Processor<T, T> {
    private final OrderedEventExecutor executor;
    private Flow.Subscriber<? super T> target;
    private Flow.Subscription sourceSubscription;

    public EventLoopProcessor(OrderedEventExecutor executor) {
        this.executor = executor;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        this.target = subscriber;
        EventLoopSubscription targetSubscription = new EventLoopSubscription(this.sourceSubscription);
        this.executor.execute(() -> this.target.onSubscribe(targetSubscription));
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.sourceSubscription = subscription;
    }

    @Override
    public void onNext(T message) {
        this.executor.execute(() -> this.target.onNext(message));
    }

    @Override
    public void onError(Throwable error) {
        Throwable completionError = error instanceof CompletionException ? error : new CompletionException(error.getMessage(), error);
        this.executor.execute(() -> this.target.onError(completionError));
    }

    @Override
    public void onComplete() {
        this.executor.execute(this.target::onComplete);
    }

    private class EventLoopSubscription
    implements Flow.Subscription {
        private final Flow.Subscription sourceSubscription;

        EventLoopSubscription(Flow.Subscription sourceSubscription) {
            this.sourceSubscription = sourceSubscription;
        }

        @Override
        public void request(long n) {
            EventLoopProcessor.this.executor.execute(() -> this.sourceSubscription.request(n));
        }

        @Override
        public void cancel() {
            EventLoopProcessor.this.executor.execute(this.sourceSubscription::cancel);
        }
    }
}

