package io.activej.csp.binary;

import io.activej.async.function.AsyncRunnable;
import io.activej.async.process.AbstractAsyncCloseable;
import io.activej.async.process.AsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufs;
import io.activej.common.exception.MalformedDataException;
import io.activej.common.exception.TruncatedDataException;
import io.activej.common.exception.UnexpectedDataException;
import io.activej.common.function.FunctionEx;
import io.activej.csp.ChannelSupplier;
import io.activej.promise.Promise;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/activej/csp/binary/BinaryChannelSupplier.class */
public abstract class BinaryChannelSupplier extends AbstractAsyncCloseable {
    protected final ByteBufs bufs;

    /* JADX INFO: Access modifiers changed from: protected */
    public BinaryChannelSupplier(ByteBufs byteBufs) {
        this.bufs = byteBufs;
    }

    protected BinaryChannelSupplier() {
        this.bufs = new ByteBufs();
    }

    public ByteBufs getBufs() {
        return this.bufs;
    }

    public abstract Promise<Void> needMoreData();

    public abstract Promise<Void> endOfStream();

    public static BinaryChannelSupplier ofList(List<ByteBuf> list) {
        return of(ChannelSupplier.ofList(list));
    }

    public static BinaryChannelSupplier ofIterator(Iterator<ByteBuf> it) {
        return of(ChannelSupplier.ofIterator(it));
    }

    public static BinaryChannelSupplier of(final ChannelSupplier<ByteBuf> channelSupplier) {
        return new BinaryChannelSupplier() { // from class: io.activej.csp.binary.BinaryChannelSupplier.1
            @Override // io.activej.csp.binary.BinaryChannelSupplier
            public Promise<Void> needMoreData() {
                return ChannelSupplier.this.get().map(byteBuf -> {
                    if (byteBuf == null) {
                        throw new TruncatedDataException("Unexpected end-of-stream");
                    }
                    this.bufs.add(byteBuf);
                    return null;
                });
            }

            @Override // io.activej.csp.binary.BinaryChannelSupplier
            public Promise<Void> endOfStream() {
                if (this.bufs.isEmpty()) {
                    Promise promise = ChannelSupplier.this.get();
                    ChannelSupplier channelSupplier2 = ChannelSupplier.this;
                    return promise.map(byteBuf -> {
                        if (byteBuf == null) {
                            return null;
                        }
                        byteBuf.recycle();
                        UnexpectedDataException unexpectedDataException = new UnexpectedDataException("Unexpected data after end-of-stream");
                        channelSupplier2.closeEx(unexpectedDataException);
                        throw unexpectedDataException;
                    });
                }
                this.bufs.recycle();
                Exception unexpectedDataException = new UnexpectedDataException("Unexpected data after end-of-stream");
                ChannelSupplier.this.closeEx(unexpectedDataException);
                return Promise.ofException(unexpectedDataException);
            }

            protected void onClosed(@NotNull Exception exc) {
                ChannelSupplier.this.closeEx(exc);
            }
        };
    }

    public static BinaryChannelSupplier ofProvidedBufs(ByteBufs byteBufs, final AsyncRunnable asyncRunnable, final AsyncRunnable asyncRunnable2, final AsyncCloseable asyncCloseable) {
        return new BinaryChannelSupplier(byteBufs) { // from class: io.activej.csp.binary.BinaryChannelSupplier.2
            @Override // io.activej.csp.binary.BinaryChannelSupplier
            public Promise<Void> needMoreData() {
                return asyncRunnable.run();
            }

            @Override // io.activej.csp.binary.BinaryChannelSupplier
            public Promise<Void> endOfStream() {
                return asyncRunnable2.run();
            }

            protected void onClosed(@NotNull Exception exc) {
                asyncCloseable.closeEx(exc);
            }
        };
    }

    public final <T> Promise<T> decode(ByteBufsDecoder<T> byteBufsDecoder) {
        return doDecode(byteBufsDecoder, this);
    }

    @NotNull
    private <T> Promise<T> doDecode(ByteBufsDecoder<T> byteBufsDecoder, AsyncCloseable asyncCloseable) {
        Promise<Void> needMoreData;
        do {
            if (!this.bufs.isEmpty()) {
                try {
                    T tryDecode = byteBufsDecoder.tryDecode(this.bufs);
                    if (tryDecode != null) {
                        return Promise.of(tryDecode);
                    }
                } catch (MalformedDataException e) {
                    closeEx(e);
                    return Promise.ofException(e);
                }
            }
            needMoreData = needMoreData();
        } while (needMoreData.isResult());
        Objects.requireNonNull(asyncCloseable);
        return needMoreData.whenException(asyncCloseable::closeEx).then(() -> {
            return doDecode(byteBufsDecoder, asyncCloseable);
        });
    }

    public final <T> Promise<T> decodeRemaining(ByteBufsDecoder<T> byteBufsDecoder) {
        return decode(byteBufsDecoder).then(obj -> {
            if (this.bufs.isEmpty()) {
                return endOfStream().map(r3 -> {
                    return obj;
                });
            }
            UnexpectedDataException unexpectedDataException = new UnexpectedDataException("Unexpected data after end-of-stream");
            closeEx(unexpectedDataException);
            throw unexpectedDataException;
        });
    }

    public final <T> ChannelSupplier<T> decodeStream(ByteBufsDecoder<T> byteBufsDecoder) {
        return ChannelSupplier.of(() -> {
            return doDecode(byteBufsDecoder, exc -> {
                if ((exc instanceof TruncatedDataException) && this.bufs.isEmpty()) {
                    return;
                }
                closeEx(exc);
            }).map(FunctionEx.identity(), exc2 -> {
                if ((exc2 instanceof TruncatedDataException) && this.bufs.isEmpty()) {
                    return null;
                }
                throw exc2;
            });
        }, this);
    }

    public Promise<Void> bindTo(BinaryChannelInput binaryChannelInput) {
        return binaryChannelInput.set(this);
    }

    protected void onCleanup() {
        this.bufs.recycle();
    }
}
