package io.rsocket.fragmentation;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.util.AbstractionLeakingFrameUtils;
import io.rsocket.util.NumberUtils;
import java.util.Objects;
import org.jctools.maps.NonBlockingHashMapLong;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.function.TupleUtils;

/* loaded from: input_file:io/rsocket/fragmentation/FragmentationDuplexConnection.class */
public final class FragmentationDuplexConnection implements DuplexConnection {
    private final ByteBufAllocator byteBufAllocator;
    private final DuplexConnection delegate;
    private final FrameFragmenter frameFragmenter;
    private final NonBlockingHashMapLong<FrameReassembler> frameReassemblers;

    public FragmentationDuplexConnection(DuplexConnection duplexConnection, int i) {
        this(PooledByteBufAllocator.DEFAULT, duplexConnection, i);
    }

    public FragmentationDuplexConnection(ByteBufAllocator byteBufAllocator, DuplexConnection duplexConnection, int i) {
        this.frameReassemblers = new NonBlockingHashMapLong<>();
        this.byteBufAllocator = (ByteBufAllocator) Objects.requireNonNull(byteBufAllocator, "byteBufAllocator must not be null");
        this.delegate = (DuplexConnection) Objects.requireNonNull(duplexConnection, "delegate must not be null");
        NumberUtils.requireNonNegative(i, "maxFragmentSize must be positive");
        this.frameFragmenter = new FrameFragmenter(byteBufAllocator, i);
    }

    @Override // io.rsocket.DuplexConnection, io.rsocket.Availability
    public double availability() {
        return this.delegate.availability();
    }

    public void dispose() {
        this.delegate.dispose();
    }

    public boolean isDisposed() {
        return this.delegate.isDisposed();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.delegate.onClose().doAfterTerminate(() -> {
            this.frameReassemblers.values().forEach((v0) -> {
                v0.dispose();
            });
        });
    }

    @Override // io.rsocket.DuplexConnection
    public Flux<Frame> receive() {
        return this.delegate.receive().map(AbstractionLeakingFrameUtils::fromAbstractionLeakingFrame).concatMap(TupleUtils.function((v1, v2) -> {
            return toReassembledFrames(v1, v2);
        }));
    }

    @Override // io.rsocket.DuplexConnection
    public Mono<Void> send(Publisher<Frame> publisher) {
        Objects.requireNonNull(publisher, "frames must not be null");
        return this.delegate.send(Flux.from(publisher).map(AbstractionLeakingFrameUtils::fromAbstractionLeakingFrame).concatMap(TupleUtils.function((v1, v2) -> {
            return toFragmentedFrames(v1, v2);
        })));
    }

    private Flux<Frame> toFragmentedFrames(int i, io.rsocket.framing.Frame frame) {
        return this.frameFragmenter.fragment(frame).map(frame2 -> {
            return AbstractionLeakingFrameUtils.toAbstractionLeakingFrame(this.byteBufAllocator, i, frame2);
        });
    }

    private Mono<Frame> toReassembledFrames(int i, io.rsocket.framing.Frame frame) {
        return Mono.justOrEmpty(((FrameReassembler) this.frameReassemblers.computeIfAbsent(Long.valueOf(i), l -> {
            return FrameReassembler.createFrameReassembler(this.byteBufAllocator);
        })).reassemble(frame)).map(frame2 -> {
            return AbstractionLeakingFrameUtils.toAbstractionLeakingFrame(this.byteBufAllocator, i, frame2);
        });
    }
}
