package io.vlingo.wire.fdx.outbound.rsocket;

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.util.DefaultPayload;
import io.vlingo.actors.Logger;
import io.vlingo.common.Completes;
import io.vlingo.common.Scheduler;
import io.vlingo.wire.fdx.outbound.ManagedOutboundChannel;
import io.vlingo.wire.node.Address;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Optional;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/vlingo/wire/fdx/outbound/rsocket/RSocketOutboundChannel.class */
public class RSocketOutboundChannel implements ManagedOutboundChannel {
    private final Address address;
    private final Logger logger;
    private RSocket clientSocket;
    private final Duration connectionTimeout;
    private final ClientTransport transport;
    private final Scheduler scheduler;

    public RSocketOutboundChannel(Address address, ClientTransport clientTransport, Logger logger) {
        this(address, clientTransport, Duration.ofMillis(100L), logger);
    }

    public RSocketOutboundChannel(Address address, ClientTransport clientTransport, Duration duration, Logger logger) {
        this.scheduler = new Scheduler();
        this.address = address;
        this.logger = logger;
        this.connectionTimeout = duration;
        this.transport = clientTransport;
    }

    @Override // io.vlingo.wire.fdx.outbound.ManagedOutboundChannel
    public void close() {
        if (this.clientSocket != null && !this.clientSocket.isDisposed()) {
            try {
                this.clientSocket.dispose();
            } catch (Throwable th) {
                this.logger.error("Unexpected error on closing outbound channel", th);
            }
        }
        this.clientSocket = null;
    }

    @Override // io.vlingo.wire.fdx.outbound.ManagedOutboundChannel
    public Completes<Void> writeAsync(ByteBuffer byteBuffer) {
        Completes<Void> using = Completes.using(this.scheduler);
        Mono<Void> _writeAsync = _writeAsync(byteBuffer);
        using.getClass();
        _writeAsync.subscribe((v1) -> {
            r1.with(v1);
        }, th -> {
            using.failed();
        });
        return using;
    }

    private Mono<Void> _writeAsync(ByteBuffer byteBuffer) {
        return (Mono) prepareSocket().map(rSocket -> {
            return rSocket.isDisposed() ? Mono.fromRunnable(() -> {
                this.logger.warn("RSocket outbound channel for {} is closed. Message dropped", new Object[]{this.address});
            }).then() : rSocket.fireAndForget(DefaultPayload.create(byteBuffer)).onErrorResume(th -> {
                if (!(th instanceof ClosedChannelException)) {
                    this.logger.error("Failed write to {}, because: {}", new Object[]{this.address, th.getMessage(), th});
                    return Mono.empty();
                }
                rSocket.dispose();
                this.logger.error("Connection with {} closed", new Object[]{this.address, th});
                return Mono.error(th);
            });
        }).orElseGet(() -> {
            return Mono.fromRunnable(() -> {
                this.logger.debug("RSocket outbound channel for {} not ready. Message dropped", new Object[]{this.address});
            });
        });
    }

    @Override // io.vlingo.wire.fdx.outbound.ManagedOutboundChannel
    public void write(ByteBuffer byteBuffer) {
        _writeAsync(byteBuffer).block();
    }

    private Optional<RSocket> prepareSocket() {
        if (this.clientSocket == null) {
            try {
                this.clientSocket = (RSocket) RSocketFactory.connect().errorConsumer(th -> {
                    this.logger.error("Unexpected error in outbound channel", th);
                }).frameDecoder(PayloadDecoder.ZERO_COPY).transport(this.transport).start().timeout(this.connectionTimeout).block();
                this.logger.info("RSocket outbound channel opened for {}", new Object[]{this.address});
                this.clientSocket.onClose().doFinally(signalType -> {
                    this.logger.info("RSocket outbound channel for {} is closed", new Object[]{this.address});
                }).subscribe(r1 -> {
                }, th2 -> {
                    this.logger.error("Unexpected error on closing outbound channel", th2);
                });
            } catch (Throwable th3) {
                this.logger.warn("Failed to create RSocket outbound channel for {}, because {}", new Object[]{this.address, th3.getMessage()});
                close();
                return Optional.empty();
            }
        }
        return Optional.ofNullable(this.clientSocket);
    }
}
