package io.activej.http.stream;

import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufQueue;
import io.activej.common.Checks;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelOutput;
import io.activej.csp.binary.BinaryChannelInput;
import io.activej.csp.binary.BinaryChannelSupplier;
import io.activej.csp.dsl.WithBinaryChannelInput;
import io.activej.csp.dsl.WithChannelTransformer;
import io.activej.csp.process.AbstractCommunicatingProcess;
import io.activej.promise.Promise;
import java.util.Objects;

/* loaded from: input_file:io/activej/http/stream/BufsConsumerDelimiter.class */
public final class BufsConsumerDelimiter extends AbstractCommunicatingProcess implements WithChannelTransformer<BufsConsumerDelimiter, ByteBuf, ByteBuf>, WithBinaryChannelInput<BufsConsumerDelimiter> {
    private ByteBufQueue bufs;
    private BinaryChannelSupplier input;
    private ChannelConsumer<ByteBuf> output;
    private int remaining;

    private BufsConsumerDelimiter(int i) {
        this.remaining = i;
    }

    public static BufsConsumerDelimiter create(int i) {
        Checks.checkState(i >= 0, "Cannot create delimiter with number of remaining bytes that is less than 0");
        return new BufsConsumerDelimiter(i);
    }

    /* renamed from: getInput, reason: merged with bridge method [inline-methods] */
    public BinaryChannelInput m49getInput() {
        return binaryChannelSupplier -> {
            Checks.checkState(this.input == null, "Input already set");
            this.input = sanitize(binaryChannelSupplier);
            this.bufs = binaryChannelSupplier.getBufs();
            if (this.input != null && this.output != null) {
                startProcess();
            }
            return getProcessCompletion();
        };
    }

    public ChannelOutput<ByteBuf> getOutput() {
        return channelConsumer -> {
            Checks.checkState(this.output == null, "Output already set");
            this.output = sanitize(channelConsumer);
            if (this.input == null || this.output == null) {
                return;
            }
            startProcess();
        };
    }

    protected void beforeProcess() {
        Checks.checkState(this.input != null, "Input was not set");
        Checks.checkState(this.output != null, "Output was not set");
    }

    protected void doProcess() {
        if (this.remaining != 0) {
            ByteBufQueue byteBufQueue = new ByteBufQueue();
            this.remaining -= this.bufs.drainTo(byteBufQueue, this.remaining);
            this.output.acceptAll(byteBufQueue.asIterator()).whenResult(() -> {
                if (this.remaining != 0) {
                    this.input.needMoreData().whenResult(this::doProcess);
                    return;
                }
                Promise endOfStream = this.input.endOfStream();
                ChannelConsumer<ByteBuf> channelConsumer = this.output;
                Objects.requireNonNull(channelConsumer);
                endOfStream.then(channelConsumer::acceptEndOfStream).whenResult(this::completeProcess);
            });
        } else {
            Promise endOfStream = this.input.endOfStream();
            ChannelConsumer<ByteBuf> channelConsumer = this.output;
            Objects.requireNonNull(channelConsumer);
            endOfStream.then(channelConsumer::acceptEndOfStream).whenResult(this::completeProcess);
        }
    }

    protected void doClose(Throwable th) {
        this.input.closeEx(th);
        this.output.closeEx(th);
    }
}
