package com.elarian;

import com.elarian.model.ClientConfig;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.multipart.HttpPostBodyUtil;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.Resume;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.ByteBufPayload;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.netty.tcp.TcpClient;
import reactor.util.retry.Retry;

/* loaded from: input_file:com/elarian/Client.class */
abstract class Client<B, C> {
    private final ClientConfig clientOpts;
    protected RSocket socket;
    private TcpClientTransport transport;
    private Resume resume;
    private Function<B, Mono<C>> globalNotificationHandler;
    private static final boolean debug = System.getenv().containsKey("DEBUG");
    protected Function<Payload, Mono<Payload>> requestHandler = new Function<Payload, Mono<Payload>>() { // from class: com.elarian.Client.1
        @Override // java.util.function.Function
        public Mono<Payload> apply(final Payload payload) {
            return new Mono<Payload>() { // from class: com.elarian.Client.1.1
                @Override // reactor.core.publisher.Mono, reactor.core.CorePublisher
                public void subscribe(CoreSubscriber<? super Payload> coreSubscriber) {
                    try {
                        Object deserializeNotification = Client.this.deserializeNotification(Client.this.getBytesFromPayload(payload));
                        Mono error = Mono.error(new Error("notification handler is not setup;"));
                        if (Client.this.globalNotificationHandler != null) {
                            error = (Mono) Client.this.globalNotificationHandler.apply(deserializeNotification);
                        }
                        error.subscribe(obj -> {
                            coreSubscriber.onNext(ByteBufPayload.create(Client.this.serializeNotificationReply(obj)));
                            coreSubscriber.onComplete();
                        }, th -> {
                            th.printStackTrace();
                            coreSubscriber.onError(th);
                        });
                    } catch (Exception e) {
                        coreSubscriber.onError(e);
                    }
                }
            };
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    public static void log(String str) {
        if (debug) {
            System.out.println(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Client(ClientConfig clientConfig) {
        this.clientOpts = clientConfig;
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (isConnected()) {
                disconnect("Application shutting down!");
            }
        }));
    }

    public void connect(ConnectionListener connectionListener) {
        connect(connectionListener, true);
    }

    public void connect(ConnectionListener connectionListener, boolean z) throws RuntimeException {
        if (isConnected()) {
            throw new RuntimeException("Client is already connected");
        }
        if (connectionListener == null) {
            throw new RuntimeException("listener is required");
        }
        byte[] serializeSetupPayload = serializeSetupPayload(this.clientOpts);
        this.transport = TcpClientTransport.create(TcpClient.create().secure().host(this.clientOpts.connectionConfig.host).port(this.clientOpts.connectionConfig.port).doOnConnect(tcpClientConfig -> {
            log("Connecting");
            connectionListener.onConnecting();
        }).doOnDisconnected(connection -> {
            log("Disconnected");
            if (!z) {
                disconnect();
                connectionListener.onClosed();
            } else {
                long j = this.clientOpts.connectionConfig.reconnectTimeout;
                log("Will attempt to reconnect in " + j + "ms");
                Mono.delay(Duration.ofSeconds(j)).subscribe(l -> {
                    log("Attempting to reconnect...");
                    connect(connectionListener);
                });
            }
        }));
        this.resume = new Resume().sessionDuration(Duration.ofMillis(this.clientOpts.connectionConfig.lifetime)).cleanupStoreOnKeepAlive().retry(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(2L)).doBeforeRetry(retrySignal -> {
            log("Disconnected. Retrying...");
        }));
        RSocketConnector keepAlive = RSocketConnector.create().metadataMimeType(HttpPostBodyUtil.DEFAULT_BINARY_CONTENT_TYPE).dataMimeType(HttpPostBodyUtil.DEFAULT_BINARY_CONTENT_TYPE).payloadDecoder(PayloadDecoder.ZERO_COPY).setupPayload(ByteBufPayload.create(serializeSetupPayload)).acceptor(SocketAcceptor.forRequestResponse(this.requestHandler)).keepAlive(Duration.ofMillis(this.clientOpts.connectionConfig.keepAlive), Duration.ofMillis(this.clientOpts.connectionConfig.lifetime));
        if (this.clientOpts.connectionConfig.isResumable) {
            keepAlive.resume(this.resume);
        }
        connectionListener.onPending();
        keepAlive.connect(this.transport).subscribe(rSocket -> {
            this.socket = rSocket;
            this.socket.onClose().subscribe(r2 -> {
                log("Connection SIGNAL");
            }, th -> {
                log("Connection ERROR: " + th.getMessage());
                disconnect(th.getMessage());
                connectionListener.onError(th);
            }, () -> {
                log("Connection CLOSED");
            });
            Mono.delay(Duration.ofSeconds(1L)).subscribe(l -> {
                if (isConnected()) {
                    log("Connected");
                    connectionListener.onConnected();
                }
            }, th2 -> {
            });
        }, th -> {
            log("Connection ERROR: " + th.getMessage());
            disconnect(th.getMessage());
            connectionListener.onError(th);
        });
    }

    public void disconnect() {
        disconnect("");
    }

    public void disconnect(String str) {
        if (str == null || str.isEmpty()) {
            log("Disconnecting from server... ");
        } else {
            log("Disconnecting from server, REASON: " + str);
        }
        this.socket.dispose();
        this.socket = null;
    }

    public boolean isConnected() {
        return (this.socket == null || this.socket.isDisposed()) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerGlobalNotificationHandler(Function<B, Mono<C>> function) {
        this.globalNotificationHandler = function;
    }

    protected abstract byte[] serializeNotificationReply(C c);

    protected abstract byte[] serializeSetupPayload(ClientConfig clientConfig);

    protected abstract B deserializeNotification(byte[] bArr) throws RuntimeException;

    /* JADX INFO: Access modifiers changed from: protected */
    public <D> Mono<D> buildCommandReply(final byte[] bArr, final Function<byte[], D> function) {
        return !isConnected() ? Mono.error(new RuntimeException("client is not connected!")) : new Mono<D>() { // from class: com.elarian.Client.2
            @Override // reactor.core.publisher.Mono, reactor.core.CorePublisher
            public void subscribe(CoreSubscriber<? super D> coreSubscriber) {
                Mono<Payload> requestResponse = Client.this.socket.requestResponse(ByteBufPayload.create(bArr));
                Function function2 = function;
                Consumer<? super Payload> consumer = payload -> {
                    try {
                        Object apply = function2.apply(Client.this.getBytesFromPayload(payload));
                        if (apply == null) {
                            throw new Error("Failed to deserialize command response!");
                        }
                        coreSubscriber.onNext(apply);
                        coreSubscriber.onComplete();
                    } catch (Exception e) {
                        coreSubscriber.onError(e);
                    }
                };
                Objects.requireNonNull(coreSubscriber);
                requestResponse.subscribe(consumer, coreSubscriber::onError);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public byte[] getBytesFromPayload(Payload payload) {
        byte[] bArr;
        ByteBuf sliceData = payload.sliceData();
        int readableBytes = sliceData.readableBytes();
        if (sliceData.hasArray()) {
            bArr = sliceData.array();
        } else {
            bArr = new byte[readableBytes];
            sliceData.getBytes(sliceData.readerIndex(), bArr);
        }
        sliceData.release();
        return bArr;
    }
}
