package io.rsocket.aeron;

import io.netty.buffer.Unpooled;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.aeron.internal.reactivestreams.AeronChannel;
import org.agrona.concurrent.UnsafeBuffer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:io/rsocket/aeron/AeronDuplexConnection.class */
public class AeronDuplexConnection implements DuplexConnection {
    private final String name;
    private final AeronChannel channel;
    private final MonoProcessor<Void> onClose = MonoProcessor.create();

    public AeronDuplexConnection(String str, AeronChannel aeronChannel) {
        this.name = str;
        this.channel = aeronChannel;
    }

    public Mono<Void> send(Publisher<Frame> publisher) {
        return this.channel.send(Flux.from(publisher).map(frame -> {
            return new UnsafeBuffer(frame.content().nioBuffer());
        }));
    }

    public Flux<Frame> receive() {
        return this.channel.receive().map(directBuffer -> {
            return Frame.from(Unpooled.wrappedBuffer(directBuffer.byteBuffer()));
        }).doOnError((v0) -> {
            v0.printStackTrace();
        });
    }

    public void dispose() {
        try {
            this.channel.dispose();
            this.onClose.onComplete();
        } catch (Exception e) {
            this.onClose.onError(e);
        }
    }

    public boolean isDisposed() {
        return this.channel.isDisposed();
    }

    public Mono<Void> onClose() {
        return this.onClose;
    }

    public String toString() {
        return "AeronDuplexConnection{name='" + this.name + "', channel=" + this.channel + ", onClose=" + this.onClose + '}';
    }
}
