package io.activej.csp.binary;

import io.activej.async.function.AsyncSupplier;
import io.activej.async.process.AbstractAsyncCloseable;
import io.activej.async.process.AsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufQueue;
import io.activej.common.exception.parse.ParseException;
import io.activej.csp.ChannelSupplier;
import io.activej.promise.Promise;
import java.util.Iterator;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/activej/csp/binary/BinaryChannelSupplier.class */
public abstract class BinaryChannelSupplier extends AbstractAsyncCloseable {
    public static final Exception UNEXPECTED_DATA_EXCEPTION = new ParseException(BinaryChannelSupplier.class, "Unexpected data after end-of-stream");
    public static final Exception UNEXPECTED_END_OF_STREAM_EXCEPTION = new ParseException(BinaryChannelSupplier.class, "Unexpected end-of-stream");
    protected final ByteBufQueue bufs;

    /* JADX INFO: Access modifiers changed from: protected */
    public BinaryChannelSupplier(ByteBufQueue byteBufQueue) {
        this.bufs = byteBufQueue;
    }

    protected BinaryChannelSupplier() {
        this.bufs = new ByteBufQueue();
    }

    public ByteBufQueue getBufs() {
        return this.bufs;
    }

    public abstract Promise<Void> needMoreData();

    public abstract Promise<Void> endOfStream();

    public static BinaryChannelSupplier ofIterable(Iterable<ByteBuf> iterable) {
        return of(ChannelSupplier.ofIterator(iterable.iterator()));
    }

    public static BinaryChannelSupplier ofIterator(Iterator<ByteBuf> it) {
        return of(ChannelSupplier.ofIterator(it));
    }

    public static BinaryChannelSupplier of(final ChannelSupplier<ByteBuf> channelSupplier) {
        return new BinaryChannelSupplier() { // from class: io.activej.csp.binary.BinaryChannelSupplier.1
            @Override // io.activej.csp.binary.BinaryChannelSupplier
            public Promise<Void> needMoreData() {
                return ChannelSupplier.this.get().then(byteBuf -> {
                    if (byteBuf == null) {
                        return Promise.ofException(UNEXPECTED_END_OF_STREAM_EXCEPTION);
                    }
                    this.bufs.add(byteBuf);
                    return Promise.complete();
                });
            }

            @Override // io.activej.csp.binary.BinaryChannelSupplier
            public Promise<Void> endOfStream() {
                if (this.bufs.isEmpty()) {
                    Promise promise = ChannelSupplier.this.get();
                    ChannelSupplier channelSupplier2 = ChannelSupplier.this;
                    return promise.then(byteBuf -> {
                        if (byteBuf == null) {
                            return Promise.complete();
                        }
                        byteBuf.recycle();
                        channelSupplier2.closeEx(UNEXPECTED_DATA_EXCEPTION);
                        return Promise.ofException(UNEXPECTED_DATA_EXCEPTION);
                    });
                }
                this.bufs.recycle();
                ChannelSupplier.this.closeEx(UNEXPECTED_DATA_EXCEPTION);
                return Promise.ofException(UNEXPECTED_DATA_EXCEPTION);
            }

            protected void onClosed(@NotNull Throwable th) {
                ChannelSupplier.this.closeEx(th);
            }
        };
    }

    public static BinaryChannelSupplier ofProvidedQueue(ByteBufQueue byteBufQueue, final AsyncSupplier<Void> asyncSupplier, final AsyncSupplier<Void> asyncSupplier2, final AsyncCloseable asyncCloseable) {
        return new BinaryChannelSupplier(byteBufQueue) { // from class: io.activej.csp.binary.BinaryChannelSupplier.2
            @Override // io.activej.csp.binary.BinaryChannelSupplier
            public Promise<Void> needMoreData() {
                return asyncSupplier.get();
            }

            @Override // io.activej.csp.binary.BinaryChannelSupplier
            public Promise<Void> endOfStream() {
                return asyncSupplier2.get();
            }

            protected void onClosed(@NotNull Throwable th) {
                asyncCloseable.closeEx(th);
            }
        };
    }

    public final <T> Promise<T> parse(ByteBufsDecoder<T> byteBufsDecoder) {
        Promise<Void> needMoreData;
        do {
            if (!this.bufs.isEmpty()) {
                try {
                    T tryDecode = byteBufsDecoder.tryDecode(this.bufs);
                    if (tryDecode != null) {
                        return Promise.of(tryDecode);
                    }
                } catch (Exception e) {
                    closeEx(e);
                    return Promise.ofException(e);
                }
            }
            needMoreData = needMoreData();
        } while (needMoreData.isResult());
        return needMoreData.whenException(this::closeEx).then(() -> {
            return parse(byteBufsDecoder);
        });
    }

    public final <T> Promise<T> parseRemaining(ByteBufsDecoder<T> byteBufsDecoder) {
        return parse(byteBufsDecoder).then(obj -> {
            if (this.bufs.isEmpty()) {
                return endOfStream().map(r3 -> {
                    return obj;
                });
            }
            closeEx(UNEXPECTED_DATA_EXCEPTION);
            return Promise.ofException(UNEXPECTED_DATA_EXCEPTION);
        });
    }

    public final <T> ChannelSupplier<T> parseStream(ByteBufsDecoder<T> byteBufsDecoder) {
        return ChannelSupplier.of(() -> {
            return parse(byteBufsDecoder).thenEx((obj, th) -> {
                return th == null ? Promise.of(obj) : (th == UNEXPECTED_END_OF_STREAM_EXCEPTION && this.bufs.isEmpty()) ? Promise.of((Object) null) : Promise.ofException(th);
            });
        }, this);
    }

    public Promise<Void> bindTo(BinaryChannelInput binaryChannelInput) {
        return binaryChannelInput.set(this);
    }

    protected void onCleanup() {
        this.bufs.recycle();
    }
}
