package org.asyncflows.core.streams;

import org.asyncflows.core.CoreFlows;
import org.asyncflows.core.Promise;
import org.asyncflows.core.data.Maybe;
import org.asyncflows.core.function.AResolver;
import org.asyncflows.core.util.CloseableInvalidatingBase;
import org.asyncflows.core.util.NeedsExport;
import org.asyncflows.core.vats.Vat;

/* loaded from: input_file:org/asyncflows/core/streams/StreamBase.class */
public abstract class StreamBase<A> extends CloseableInvalidatingBase implements AStream<A>, NeedsExport<AStream<A>> {
    private final AResolver<Maybe<A>> streamOutcomeObserver = outcome -> {
        if (!outcome.isSuccess()) {
            invalidate(outcome.failure());
            startClosing();
        } else {
            if (outcome.value() == null || !((Maybe) outcome.value()).isEmpty()) {
                return;
            }
            startClosing();
        }
    };

    @Override // org.asyncflows.core.streams.AStream
    public final Promise<Maybe<A>> next() {
        Promise<Maybe<A>> aFailure;
        try {
            ensureValidAndOpen();
            aFailure = produce();
        } catch (Throwable th) {
            aFailure = CoreFlows.aFailure(th);
        }
        return aFailure.listen(this.streamOutcomeObserver);
    }

    protected abstract Promise<Maybe<A>> produce() throws Throwable;

    @Override // org.asyncflows.core.util.NeedsExport
    public AStream<A> export(Vat vat) {
        return StreamExportUtil.export(vat, this);
    }
}
