package io.reactivesocket.aeron;

import io.netty.buffer.Unpooled;
import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivesocket.aeron.internal.reactivestreams.AeronChannel;
import org.agrona.concurrent.UnsafeBuffer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:io/reactivesocket/aeron/AeronDuplexConnection.class */
public class AeronDuplexConnection implements DuplexConnection {
    private final String name;
    private final AeronChannel channel;
    private final MonoProcessor<Void> emptySubject = MonoProcessor.create();

    public AeronDuplexConnection(String str, AeronChannel aeronChannel) {
        this.name = str;
        this.channel = aeronChannel;
    }

    public Mono<Void> send(Publisher<Frame> publisher) {
        return this.channel.send(Flux.from(publisher).map(frame -> {
            return new UnsafeBuffer(frame.content().nioBuffer());
        }));
    }

    public Flux<Frame> receive() {
        return this.channel.receive().map(directBuffer -> {
            return Frame.from(Unpooled.wrappedBuffer(directBuffer.byteBuffer()));
        }).doOnError(th -> {
            th.printStackTrace();
        });
    }

    public double availability() {
        return this.channel.isActive() ? 1.0d : 0.0d;
    }

    public Mono<Void> close() {
        return Mono.defer(() -> {
            try {
                this.channel.close();
                this.emptySubject.onComplete();
            } catch (Exception e) {
                this.emptySubject.onError(e);
            }
            return this.emptySubject;
        });
    }

    public Mono<Void> onClose() {
        return this.emptySubject;
    }

    public String toString() {
        return "AeronDuplexConnection{name='" + this.name + "', channel=" + this.channel + ", emptySubject=" + this.emptySubject + '}';
    }
}
