package io.reactivesocket.transport.netty;

import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;

/* loaded from: input_file:io/reactivesocket/transport/netty/NettyDuplexConnection.class */
public class NettyDuplexConnection implements DuplexConnection {
    private final NettyInbound in;
    private final NettyOutbound out;
    private final NettyContext context;

    public NettyDuplexConnection(NettyInbound nettyInbound, NettyOutbound nettyOutbound, NettyContext nettyContext) {
        this.in = nettyInbound;
        this.out = nettyOutbound;
        this.context = nettyContext;
    }

    @Override // io.reactivesocket.DuplexConnection
    public Mono<Void> send(Publisher<Frame> publisher) {
        return Flux.from(publisher).concatMap(this::sendOne).then();
    }

    @Override // io.reactivesocket.DuplexConnection
    public Mono<Void> sendOne(Frame frame) {
        return this.out.sendObject(frame.content()).then();
    }

    @Override // io.reactivesocket.DuplexConnection
    public Flux<Frame> receive() {
        return this.in.receive2().map(byteBuf -> {
            return Frame.from(byteBuf.retain());
        });
    }

    @Override // io.reactivesocket.DuplexConnection
    public Mono<Void> close() {
        return Mono.fromRunnable(() -> {
            if (this.context.isDisposed()) {
                return;
            }
            this.context.channel().close();
        });
    }

    @Override // io.reactivesocket.DuplexConnection
    public Mono<Void> onClose() {
        return this.context.onClose();
    }

    @Override // io.reactivesocket.Availability
    public double availability() {
        return this.context.isDisposed() ? 0.0d : 1.0d;
    }
}
