package io.vlingo.wire.fdx.bidirectional.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import io.vlingo.actors.Logger;
import io.vlingo.wire.channel.ResponseChannelConsumer;
import io.vlingo.wire.fdx.bidirectional.ClientRequestResponseChannel;
import io.vlingo.wire.message.ByteBufferPool;
import io.vlingo.wire.node.Address;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedQueue;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.UnicastProcessor;

/* loaded from: input_file:io/vlingo/wire/fdx/bidirectional/rsocket/RSocketClientChannel.class */
public class RSocketClientChannel implements ClientRequestResponseChannel {
    private final UnicastProcessor<Payload> publisher;
    private final Logger logger;
    private final ChannelResponseHandler responseHandler;
    private final Address address;
    private final Duration connectionTimeout;
    private RSocket channelSocket;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vlingo/wire/fdx/bidirectional/rsocket/RSocketClientChannel$ChannelResponseHandler.class */
    public static class ChannelResponseHandler {
        private final ResponseChannelConsumer consumer;
        private final Logger logger;
        private final ByteBufferPool readBufferPool;

        private ChannelResponseHandler(ResponseChannelConsumer responseChannelConsumer, int i, int i2, Logger logger) {
            this.consumer = responseChannelConsumer;
            this.readBufferPool = new ByteBufferPool(i, i2);
            this.logger = logger;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handle(Payload payload) {
            ByteBufferPool.PooledByteBuffer accessFor = this.readBufferPool.accessFor("client-response", 25);
            try {
                try {
                    this.consumer.consume(accessFor.put(payload.getData()).flip());
                    payload.release();
                    if (accessFor.isInUse()) {
                        accessFor.release();
                    }
                } catch (Throwable th) {
                    this.logger.error("Unexpected error reading incoming payload", th);
                    payload.release();
                    if (accessFor.isInUse()) {
                        accessFor.release();
                    }
                }
            } catch (Throwable th2) {
                payload.release();
                if (accessFor.isInUse()) {
                    accessFor.release();
                }
                throw th2;
            }
        }
    }

    public RSocketClientChannel(Address address, ResponseChannelConsumer responseChannelConsumer, int i, int i2, Logger logger) {
        this(address, responseChannelConsumer, i, i2, logger, Duration.ofMillis(100L));
    }

    public RSocketClientChannel(Address address, ResponseChannelConsumer responseChannelConsumer, int i, int i2, Logger logger, Duration duration) {
        this.publisher = UnicastProcessor.create(new ConcurrentLinkedQueue());
        this.logger = logger;
        this.address = address;
        this.connectionTimeout = duration;
        this.responseHandler = new ChannelResponseHandler(responseChannelConsumer, i, i2, logger);
    }

    @Override // io.vlingo.wire.channel.RequestSenderChannel
    public void close() {
        if (this.channelSocket != null && !this.channelSocket.isDisposed()) {
            try {
                this.channelSocket.dispose();
            } catch (Throwable th) {
                this.logger.error("Unexpected error on closing channel socket", th);
            }
        }
        this.channelSocket = null;
    }

    @Override // io.vlingo.wire.channel.RequestSenderChannel
    public void requestWith(ByteBuffer byteBuffer) {
        prepareChannel();
        if (this.channelSocket == null || this.channelSocket.isDisposed()) {
            this.logger.debug("RSocket client channel for {} not ready. Message dropped", new Object[]{this.address});
            return;
        }
        ByteBuffer allocate = ByteBuffer.allocate(byteBuffer.capacity());
        allocate.put(byteBuffer);
        allocate.flip();
        this.publisher.onNext(DefaultPayload.create(allocate));
    }

    @Override // io.vlingo.wire.channel.ResponseListenerChannel
    public void probeChannel() {
        prepareChannel();
    }

    private void prepareChannel() {
        try {
            if (this.channelSocket == null) {
                this.channelSocket = (RSocket) RSocketFactory.connect().frameDecoder(PayloadDecoder.ZERO_COPY).transport(TcpClientTransport.create(this.address.hostName(), this.address.port())).start().timeout(this.connectionTimeout).doOnError(th -> {
                    this.logger.error("Failed to create channel socket for address {}", new Object[]{this.address, th});
                }).block();
                if (this.channelSocket != null) {
                    Flux onErrorResume = this.channelSocket.requestChannel(this.publisher).onErrorResume(th2 -> {
                        if (!(th2 instanceof ApplicationErrorException)) {
                            return Flux.error(th2);
                        }
                        this.logger.error("Server replied with an error: {}", new Object[]{th2.getMessage(), th2});
                        return Flux.empty();
                    });
                    ChannelResponseHandler channelResponseHandler = this.responseHandler;
                    channelResponseHandler.getClass();
                    Disposable subscribe = onErrorResume.subscribe(payload -> {
                        channelResponseHandler.handle(payload);
                    }, th3 -> {
                        this.publisher.cancel();
                        if (th3 instanceof ClosedChannelException) {
                            return;
                        }
                        this.logger.error("Received an unrecoverable error. Channel will be closed", th3);
                        throw Exceptions.propagate(th3);
                    });
                    this.logger.info("RSocket client channel opened for address {}", new Object[]{this.address});
                    this.channelSocket.onClose().doFinally(signalType -> {
                        if (!subscribe.isDisposed()) {
                            subscribe.dispose();
                        }
                        this.logger.info("RSocket client channel for address {} is closed", new Object[]{this.address});
                    }).subscribe(r1 -> {
                    }, th4 -> {
                        this.logger.error("Unexpected error on closing channel socket", th4);
                    });
                }
            }
        } catch (Throwable th5) {
            this.logger.warn("Failed to create RSocket client channel for {}, because {}", new Object[]{this.address, th5.getMessage()});
            close();
        }
    }
}
