package org.asyncflows.core.streams;

import org.asyncflows.core.CoreFlows;
import org.asyncflows.core.Promise;
import org.asyncflows.core.data.Maybe;
import org.asyncflows.core.function.ACloseable;
import org.asyncflows.core.function.AResolver;
import org.asyncflows.core.util.ChainedClosable;
import org.asyncflows.core.util.NeedsExport;
import org.asyncflows.core.vats.Vat;

/* loaded from: input_file:org/asyncflows/core/streams/ChainedStreamBase.class */
public abstract class ChainedStreamBase<O, I extends ACloseable> extends ChainedClosable<I> implements AStream<O>, NeedsExport<AStream<O>> {
    private final AResolver<Maybe<O>> streamOutcomeObserver;

    /* JADX INFO: Access modifiers changed from: protected */
    public ChainedStreamBase(I i) {
        super(i);
        this.streamOutcomeObserver = outcome -> {
            if (!outcome.isSuccess()) {
                invalidate(outcome.failure());
                startClosing();
            } else {
                if (outcome.value() == null || !((Maybe) outcome.value()).isEmpty()) {
                    return;
                }
                startClosing();
            }
        };
    }

    @Override // org.asyncflows.core.streams.AStream
    public Promise<Maybe<O>> next() {
        Promise<Maybe<O>> aFailure;
        if (!isValidAndOpen()) {
            return (Promise<Maybe<O>>) invalidationPromise();
        }
        try {
            aFailure = produce();
        } catch (Throwable th) {
            aFailure = CoreFlows.aFailure(th);
        }
        return aFailure.listen(this.streamOutcomeObserver);
    }

    protected abstract Promise<Maybe<O>> produce();

    @Override // org.asyncflows.core.util.NeedsExport
    public AStream<O> export(Vat vat) {
        return AStreamProxyFactory.createProxy(vat, this);
    }
}
