package org.asyncflows.core.streams;

import org.asyncflows.core.CoreFlows;
import org.asyncflows.core.Promise;
import org.asyncflows.core.data.Maybe;
import org.asyncflows.core.data.Tuple2;
import org.asyncflows.core.function.AFunction;
import org.asyncflows.core.function.AResolver;
import org.asyncflows.core.function.AsyncFunctionUtil;
import org.asyncflows.core.util.RequestQueue;

/* loaded from: input_file:org/asyncflows/core/streams/PushStreamBuilder.class */
public class PushStreamBuilder<T> extends StreamBuilder<T> {
    private final SinkConnector<T> connector;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/asyncflows/core/streams/PushStreamBuilder$SinkConnector.class */
    public interface SinkConnector<T> {
        void connect(ASink<T> aSink);
    }

    public PushStreamBuilder(SinkConnector<T> sinkConnector) {
        this.connector = sinkConnector;
    }

    @Override // org.asyncflows.core.streams.StreamBuilder
    public AStream<T> localStream() {
        return pull().localStream();
    }

    @Override // org.asyncflows.core.streams.StreamBuilder
    public void connect(ASink<? super T> aSink) {
        this.connector.connect(aSink);
    }

    @Override // org.asyncflows.core.streams.StreamBuilder
    public PushStreamBuilder<T> push() {
        return this;
    }

    @Override // org.asyncflows.core.streams.StreamBuilder
    public PullStreamBuilder<T> pull() {
        Tuple2 local = RandevuQueue.local();
        this.connector.connect((ASink) local.getValue1());
        return new PullStreamBuilder<>((AStream) local.getValue2());
    }

    @Override // org.asyncflows.core.streams.StreamBuilder
    public <N> PushStreamBuilder<N> map(AFunction<T, N> aFunction) {
        return new PushStreamBuilder<>(aSink -> {
            this.connector.connect(new TransformSinkBase<N, T>(aSink) { // from class: org.asyncflows.core.streams.PushStreamBuilder.1
                @Override // org.asyncflows.core.streams.ASink
                public Promise<Void> put(T t) {
                    Promise evaluate = AsyncFunctionUtil.evaluate(aFunction, t);
                    return requestQueue().run(() -> {
                        return evaluate.flatMapOutcome(outcome -> {
                            return outcome.isSuccess() ? ((ASink) this.wrapped).put(outcome.value()) : failNext(outcome.failure());
                        });
                    }).listen(outcomeChecker());
                }
            });
        });
    }

    @Override // org.asyncflows.core.streams.StreamBuilder
    public <N> PushStreamBuilder<N> flatMapMaybe(AFunction<T, Maybe<N>> aFunction) {
        return new PushStreamBuilder<>(aSink -> {
            this.connector.connect(new TransformSinkBase<N, T>(aSink) { // from class: org.asyncflows.core.streams.PushStreamBuilder.2
                @Override // org.asyncflows.core.streams.ASink
                public Promise<Void> put(T t) {
                    Promise evaluate = AsyncFunctionUtil.evaluate(aFunction, t);
                    return requestQueue().run(() -> {
                        return evaluate.flatMapOutcome(outcome -> {
                            return !isValidAndOpen() ? invalidationPromise() : outcome.isSuccess() ? ((Maybe) outcome.value()).isEmpty() ? CoreFlows.aVoid() : ((ASink) this.wrapped).put(((Maybe) outcome.value()).value()) : failNext(outcome.failure());
                        }).listen(outcomeChecker());
                    });
                }
            });
        });
    }

    @Override // org.asyncflows.core.streams.StreamBuilder
    public <N> PushStreamBuilder<N> flatMapStream(AFunction<T, AStream<N>> aFunction) {
        return new PushStreamBuilder<>(aSink -> {
            this.connector.connect(new TransformSinkBase<N, T>(aSink) { // from class: org.asyncflows.core.streams.PushStreamBuilder.3
                @Override // org.asyncflows.core.streams.ASink
                public Promise<Void> put(T t) {
                    Promise evaluate = AsyncFunctionUtil.evaluate(aFunction, t);
                    RequestQueue requestQueue = requestQueue();
                    ASink aSink = aSink;
                    return requestQueue.run(() -> {
                        return evaluate.flatMapOutcome(outcome -> {
                            return !isValidAndOpen() ? invalidationPromise() : outcome.isSuccess() ? AsyncStreams.aForStream((AStream) outcome.value()).consume(obj -> {
                                return aSink.put(obj).thenFlatGet(AsyncFunctionUtil.booleanSupplier(true));
                            }) : failNext(outcome.failure());
                        }).listen(outcomeChecker());
                    });
                }
            });
        });
    }

    @Override // org.asyncflows.core.streams.StreamBuilder
    public PushStreamBuilder<T> window(final int i) {
        return new PushStreamBuilder<>(new SinkConnector<T>() { // from class: org.asyncflows.core.streams.PushStreamBuilder.4
            @Override // org.asyncflows.core.streams.PushStreamBuilder.SinkConnector
            public void connect(final ASink<T> aSink) {
                PushStreamBuilder.this.connector.connect(new TransformSinkBase<T, T>(aSink) { // from class: org.asyncflows.core.streams.PushStreamBuilder.4.1
                    private int active;
                    private final AResolver<Void> countdownObserver = outcome -> {
                        this.active--;
                        requestQueue().resume();
                    };

                    @Override // org.asyncflows.core.streams.ChainedSinkBase, org.asyncflows.core.util.CloseableInvalidatingBase
                    protected void onInvalidation(Throwable th) {
                        this.active = Integer.MIN_VALUE;
                        requestQueue().resume();
                        super.onInvalidation(th);
                    }

                    @Override // org.asyncflows.core.streams.ASink
                    public Promise<Void> put(T t) {
                        RequestQueue requestQueue = requestQueue();
                        int i2 = i;
                        ASink aSink2 = aSink;
                        return requestQueue.runSeqWhile(() -> {
                            if (!isValidAndOpen()) {
                                return invalidationPromise();
                            }
                            if (this.active >= i2) {
                                return requestQueue().suspendThenTrue();
                            }
                            CoreFlows.aNow(() -> {
                                return aSink2.put(t);
                            }).listen(outcomeChecker()).listen(this.countdownObserver);
                            return CoreFlows.aFalse();
                        });
                    }
                });
            }
        });
    }

    @Override // org.asyncflows.core.streams.StreamBuilder
    public Promise<Void> consume(final AFunction<T, Boolean> aFunction) {
        SinkBase<T> sinkBase = new SinkBase<T>() { // from class: org.asyncflows.core.streams.PushStreamBuilder.5
            private final RequestQueue requests = new RequestQueue();

            @Override // org.asyncflows.core.streams.ASink
            public Promise<Void> put(T t) {
                RequestQueue requestQueue = this.requests;
                AFunction aFunction2 = aFunction;
                return requestQueue.run(() -> {
                    return !isValidAndOpen() ? invalidationPromise() : AsyncFunctionUtil.evaluate(aFunction2, t).flatMap(bool -> {
                        if (!bool.booleanValue()) {
                            startClosing();
                        }
                        return CoreFlows.aVoid();
                    }).listen(outcomeChecker());
                });
            }
        };
        this.connector.connect(sinkBase);
        return sinkBase.finished();
    }
}
