package io.vlingo.wire.fdx.outbound.rsocket;

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.util.DefaultPayload;
import io.vlingo.common.Completes;
import io.vlingo.common.Scheduler;
import io.vlingo.wire.fdx.outbound.ManagedOutboundChannel;
import io.vlingo.wire.node.Address;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/vlingo/wire/fdx/outbound/rsocket/RSocketOutboundChannel.class */
public class RSocketOutboundChannel implements ManagedOutboundChannel {
    private static final Logger logger = LoggerFactory.getLogger(RSocketOutboundChannel.class);
    private final Scheduler scheduler;
    private final Address address;
    private final Duration connectionTimeout;
    private final ClientTransport transport;
    private RSocket clientSocket;

    public RSocketOutboundChannel(Address address, ClientTransport clientTransport, io.vlingo.actors.Logger logger2) {
        this(address, clientTransport, Duration.ofMillis(100L), logger2);
    }

    public RSocketOutboundChannel(Address address, ClientTransport clientTransport, Duration duration, io.vlingo.actors.Logger logger2) {
        this.scheduler = new Scheduler();
        this.address = address;
        this.connectionTimeout = duration;
        this.transport = clientTransport;
    }

    @Override // io.vlingo.wire.fdx.outbound.ManagedOutboundChannel
    public void close() {
        if (this.clientSocket != null && !this.clientSocket.isDisposed()) {
            try {
                this.clientSocket.dispose();
            } catch (Throwable th) {
                logger.error("Unexpected error when closing outbound channel", th);
            }
        }
        this.clientSocket = null;
    }

    @Override // io.vlingo.wire.fdx.outbound.ManagedOutboundChannel
    public Completes<Void> writeAsync(ByteBuffer byteBuffer) {
        Completes<Void> using = Completes.using(this.scheduler);
        Mono<Void> writeAsyncInternal = writeAsyncInternal(byteBuffer);
        using.getClass();
        writeAsyncInternal.subscribe((v1) -> {
            r1.with(v1);
        }, th -> {
            using.failed();
        });
        return using;
    }

    @Override // io.vlingo.wire.fdx.outbound.ManagedOutboundChannel
    public void write(ByteBuffer byteBuffer) {
        writeAsyncInternal(byteBuffer).block();
    }

    private Mono<Void> writeAsyncInternal(ByteBuffer byteBuffer) {
        return (Mono) prepareSocket().map(rSocket -> {
            if (!rSocket.isDisposed()) {
                return rSocket.fireAndForget(DefaultPayload.create(byteBuffer)).doFinally(signalType -> {
                    logger.trace("Message sent to {}", this.address);
                }).doOnError(th -> {
                    logger.error("Failed write to {}, because: {}", new Object[]{this.address, th.getMessage(), th});
                });
            }
            logger.warn("RSocket outbound channel for {} is closed. Message dropped", this.address);
            return Mono.empty();
        }).orElseGet(() -> {
            logger.debug("RSocket outbound channel for {} not ready. Message dropped", this.address);
            return Mono.empty();
        });
    }

    private Optional<RSocket> prepareSocket() {
        if (this.clientSocket == null || this.clientSocket.isDisposed()) {
            try {
                this.clientSocket = (RSocket) RSocketFactory.connect().errorConsumer(th -> {
                    if (th instanceof ClosedChannelException) {
                        return;
                    }
                    logger.error("Unexpected error in RSocket outbound channel", th);
                }).frameDecoder(PayloadDecoder.ZERO_COPY).transport(this.transport).start().timeout(this.connectionTimeout, Mono.error(new TimeoutException("Timeout establishing connection for " + this.address))).block();
                logger.info("RSocket outbound channel opened for {}", this.address);
                this.clientSocket.onClose().doFinally(signalType -> {
                    logger.info("RSocket outbound channel for {} is closed", this.address);
                    close();
                }).subscribe(r1 -> {
                }, th2 -> {
                    logger.error("Unexpected error on closing outbound channel", th2);
                });
            } catch (Throwable th3) {
                logger.warn("Failed to create RSocket outbound channel for {}, because {}", this.address, th3.getMessage());
                close();
                return Optional.empty();
            }
        }
        return Optional.ofNullable(this.clientSocket);
    }
}
