package io.rsocket.fragmentation;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.frame.FrameLengthFlyweight;
import io.rsocket.frame.FrameType;
import java.util.Objects;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/fragmentation/FragmentationDuplexConnection.class */
public final class FragmentationDuplexConnection implements DuplexConnection {
    private static final int MIN_MTU_SIZE = 64;
    private static final Logger logger = LoggerFactory.getLogger(FragmentationDuplexConnection.class);
    private final DuplexConnection delegate;
    private final int mtu;
    private final ByteBufAllocator allocator;
    private final FrameReassembler frameReassembler;
    private final boolean encodeLength;
    private final String type;

    public FragmentationDuplexConnection(DuplexConnection duplexConnection, ByteBufAllocator byteBufAllocator, int i, boolean z, String str) {
        Objects.requireNonNull(duplexConnection, "delegate must not be null");
        Objects.requireNonNull(byteBufAllocator, "byteBufAllocator must not be null");
        this.encodeLength = z;
        this.allocator = byteBufAllocator;
        this.delegate = duplexConnection;
        this.mtu = assertMtu(i);
        this.frameReassembler = new FrameReassembler(byteBufAllocator);
        this.type = str;
        duplexConnection.onClose().doFinally(signalType -> {
            this.frameReassembler.dispose();
        }).subscribe();
    }

    private boolean shouldFragment(FrameType frameType, int i) {
        return frameType.isFragmentable() && i > this.mtu;
    }

    @Nullable
    public static <T> Mono<T> checkMtu(int i) {
        if (isInsufficientMtu(i)) {
            return Mono.error(new IllegalArgumentException(String.format("smallest allowed mtu size is %d bytes, provided: %d", 64, Integer.valueOf(i))));
        }
        return null;
    }

    private static int assertMtu(int i) {
        if (isInsufficientMtu(i)) {
            throw new IllegalArgumentException(String.format("smallest allowed mtu size is %d bytes, provided: %d", 64, Integer.valueOf(i)));
        }
        return i;
    }

    private static boolean isInsufficientMtu(int i) {
        return (i > 0 && i < 64) || i < 0;
    }

    @Override // io.rsocket.DuplexConnection
    public Mono<Void> send(Publisher<ByteBuf> publisher) {
        return Flux.from(publisher).concatMap(this::sendOne).then();
    }

    @Override // io.rsocket.DuplexConnection
    public Mono<Void> sendOne(ByteBuf byteBuf) {
        FrameType frameType = FrameHeaderFlyweight.frameType(byteBuf);
        return shouldFragment(frameType, byteBuf.readableBytes()) ? logger.isDebugEnabled() ? this.delegate.send(Flux.from(FrameFragmenter.fragmentFrame(this.allocator, this.mtu, byteBuf, frameType, this.encodeLength)).doOnNext(byteBuf2 -> {
            ByteBuf frame = this.encodeLength ? FrameLengthFlyweight.frame(byteBuf2) : byteBuf2;
            logger.debug("{} - stream id {} - frame type {} - \n {}", new Object[]{this.type, Integer.valueOf(FrameHeaderFlyweight.streamId(frame)), FrameHeaderFlyweight.frameType(frame), ByteBufUtil.prettyHexDump(frame)});
        })) : this.delegate.send(Flux.from(FrameFragmenter.fragmentFrame(this.allocator, this.mtu, byteBuf, frameType, this.encodeLength))) : this.delegate.sendOne(encode(byteBuf));
    }

    private ByteBuf encode(ByteBuf byteBuf) {
        return this.encodeLength ? FrameLengthFlyweight.encode(this.allocator, byteBuf.readableBytes(), byteBuf) : byteBuf;
    }

    private ByteBuf decode(ByteBuf byteBuf) {
        return this.encodeLength ? FrameLengthFlyweight.frame(byteBuf).retain() : byteBuf;
    }

    @Override // io.rsocket.DuplexConnection
    public Flux<ByteBuf> receive() {
        return this.delegate.receive().handle((byteBuf, synchronousSink) -> {
            this.frameReassembler.reassembleFrame(decode(byteBuf), synchronousSink);
        });
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.delegate.onClose();
    }

    public void dispose() {
        this.delegate.dispose();
    }
}
