package com.questdb.net.ha;

import com.questdb.JournalKey;
import com.questdb.JournalWriter;
import com.questdb.ex.IncompatibleJournalException;
import com.questdb.ex.JournalException;
import com.questdb.ex.JournalNetworkException;
import com.questdb.factory.JournalWriterFactory;
import com.questdb.factory.configuration.JournalMetadata;
import com.questdb.factory.configuration.JournalStructure;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.misc.Chars;
import com.questdb.misc.Files;
import com.questdb.misc.NamedDaemonThreadFactory;
import com.questdb.net.SecureSocketChannel;
import com.questdb.net.SslConfig;
import com.questdb.net.StatsCollectingReadableByteChannel;
import com.questdb.net.ha.auth.AuthConfigurationException;
import com.questdb.net.ha.auth.AuthFailureException;
import com.questdb.net.ha.auth.CredentialProvider;
import com.questdb.net.ha.comsumer.HugeBufferConsumer;
import com.questdb.net.ha.comsumer.JournalDeltaConsumer;
import com.questdb.net.ha.config.ClientConfig;
import com.questdb.net.ha.config.ServerNode;
import com.questdb.net.ha.model.IndexedJournal;
import com.questdb.net.ha.model.IndexedJournalKey;
import com.questdb.net.ha.producer.JournalClientStateProducer;
import com.questdb.net.ha.protocol.CommandConsumer;
import com.questdb.net.ha.protocol.CommandProducer;
import com.questdb.net.ha.protocol.commands.ByteArrayResponseProducer;
import com.questdb.net.ha.protocol.commands.CharSequenceResponseConsumer;
import com.questdb.net.ha.protocol.commands.IntResponseConsumer;
import com.questdb.net.ha.protocol.commands.IntResponseProducer;
import com.questdb.net.ha.protocol.commands.SetKeyRequestProducer;
import com.questdb.std.IntList;
import com.questdb.std.ObjList;
import com.questdb.store.TxListener;
import java.io.File;
import java.io.IOException;
import java.nio.channels.ByteChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

/* loaded from: input_file:com/questdb/net/ha/JournalClient.class */
public class JournalClient {
    public static final int DISCONNECT_UNKNOWN = 1;
    public static final int DISCONNECT_CLIENT_HALT = 2;
    public static final int DISCONNECT_CLIENT_EXCEPTION = 3;
    public static final int DISCONNECT_BROKEN_CHANNEL = 4;
    public static final int DISCONNECT_CLIENT_ERROR = 5;
    public static final int DISCONNECT_INCOMPATIBLE_JOURNAL = 6;
    private static final AtomicInteger counter = new AtomicInteger(0);
    private static final Log LOG = LogFactory.getLog(JournalClient.class);
    private static final ThreadFactory CLIENT_THREAD_FACTORY = new NamedDaemonThreadFactory("journal-client", false);
    private final ObjList<JournalKey> remoteKeys;
    private final ObjList<JournalKey> localKeys;
    private final ObjList<TxListener> listeners;
    private final ObjList<JournalWriter> writers;
    private final ObjList<JournalDeltaConsumer> deltaConsumers;
    private final IntList statusSentList;
    private final JournalWriterFactory factory;
    private final CommandProducer commandProducer;
    private final CommandConsumer commandConsumer;
    private final SetKeyRequestProducer setKeyRequestProducer;
    private final CharSequenceResponseConsumer charSequenceResponseConsumer;
    private final JournalClientStateProducer journalClientStateProducer;
    private final IntResponseConsumer intResponseConsumer;
    private final IntResponseProducer intResponseProducer;
    private final ByteArrayResponseProducer byteArrayResponseProducer;
    private final ClientConfig config;
    private final ExecutorService service;
    private final AtomicBoolean running;
    private final CredentialProvider credentialProvider;
    private final DisconnectCallbackImpl disconnectCallback;
    private ByteChannel channel;
    private StatsCollectingReadableByteChannel statsChannel;
    private Future handlerFuture;

    /* loaded from: input_file:com/questdb/net/ha/JournalClient$DisconnectCallback.class */
    public interface DisconnectCallback {
        void onDisconnect(int i);
    }

    /* loaded from: input_file:com/questdb/net/ha/JournalClient$DisconnectCallbackImpl.class */
    private final class DisconnectCallbackImpl implements DisconnectCallback {
        private DisconnectCallback next;

        private DisconnectCallbackImpl() {
        }

        @Override // com.questdb.net.ha.JournalClient.DisconnectCallback
        public void onDisconnect(int i) {
            switch (i) {
                case 1:
                case 4:
                    int retryCount = JournalClient.this.config.getReconnectPolicy().getRetryCount();
                    int loginRetryCount = JournalClient.this.config.getReconnectPolicy().getLoginRetryCount();
                    boolean z = false;
                    while (JournalClient.this.running.get() && !z) {
                        int i2 = retryCount;
                        retryCount--;
                        if (i2 > 0 && loginRetryCount > 0) {
                            try {
                                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(JournalClient.this.config.getReconnectPolicy().getSleepBetweenRetriesMillis()));
                                JournalClient.LOG.info().$((CharSequence) "Retrying reconnect ... [").$(retryCount + 1).$(']').$();
                                JournalClient.this.close0();
                                JournalClient.this.handshake();
                                z = true;
                            } catch (AuthConfigurationException | AuthFailureException e) {
                                loginRetryCount--;
                            } catch (JournalNetworkException e2) {
                                JournalClient.LOG.info().$((CharSequence) "Error during disconnect").$((Throwable) e2).$();
                            }
                        }
                    }
                    if (!z) {
                        disconnect(i);
                        return;
                    } else {
                        JournalClient.this.handlerFuture = JournalClient.this.service.submit(new Handler());
                        return;
                    }
                default:
                    disconnect(i);
                    return;
            }
        }

        private void disconnect(int i) {
            JournalClient.LOG.info().$((CharSequence) "Client disconnecting").$();
            JournalClient.counter.decrementAndGet();
            JournalClient.this.running.set(false);
            JournalClient.this.handlerFuture = null;
            JournalClient.this.service.shutdown();
            if (this.next != null) {
                this.next.onDisconnect(i);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/questdb/net/ha/JournalClient$Handler.class */
    public final class Handler implements Runnable {
        static final /* synthetic */ boolean $assertionsDisabled;

        private Handler() {
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:12:0x0035. Please report as an issue. */
        @Override // java.lang.Runnable
        public void run() {
            int i;
            while (true) {
                try {
                    try {
                        try {
                            if (!$assertionsDisabled && JournalClient.this.channel == null) {
                                throw new AssertionError();
                            }
                            JournalClient.this.commandConsumer.read(JournalClient.this.channel);
                            switch (JournalClient.this.commandConsumer.getCommand()) {
                                case 4:
                                    JournalClient.this.statsChannel.setDelegate(JournalClient.this.channel);
                                    int value = JournalClient.this.intResponseConsumer.getValue(JournalClient.this.statsChannel);
                                    ((JournalDeltaConsumer) JournalClient.this.deltaConsumers.getQuick(value)).read(JournalClient.this.statsChannel);
                                    JournalClient.this.statusSentList.set(value, 0);
                                    JournalClient.this.statsChannel.logStats();
                                case 5:
                                    if (!JournalClient.this.isRunning()) {
                                        JournalClient.this.sendDisconnect();
                                        i = 2;
                                        break;
                                    } else {
                                        JournalClient.this.sendState();
                                    }
                                case 6:
                                    if (!JournalClient.this.isRunning()) {
                                        JournalClient.this.sendDisconnect();
                                        i = 2;
                                        break;
                                    } else {
                                        JournalClient.this.sendReady();
                                    }
                                case 7:
                                case 8:
                                case 9:
                                case 10:
                                case 11:
                                default:
                                    JournalClient.LOG.info().$((CharSequence) "Unknown command: ").$((int) JournalClient.this.commandConsumer.getCommand()).$();
                                case 12:
                                    i = 4;
                                    break;
                            }
                        } catch (IncompatibleJournalException e) {
                            JournalClient.LOG.error().$((CharSequence) e.getMessage()).$();
                            JournalClient.this.disconnectCallback.onDisconnect(6);
                            return;
                        } catch (Throwable th) {
                            JournalClient.LOG.error().$((CharSequence) "Unhandled exception in client").$(th).$();
                            JournalClient.this.disconnectCallback.onDisconnect(3);
                            return;
                        }
                    } catch (JournalNetworkException e2) {
                        JournalClient.LOG.error().$((CharSequence) "Network error. Server died?").$();
                        JournalClient.LOG.debug().$((CharSequence) "Network error details: ").$((Throwable) e2).$();
                        JournalClient.this.disconnectCallback.onDisconnect(4);
                        return;
                    } catch (Error e3) {
                        JournalClient.LOG.error().$((CharSequence) "Unhandled exception in client").$((Throwable) e3).$();
                        throw e3;
                    }
                } catch (Throwable th2) {
                    JournalClient.this.disconnectCallback.onDisconnect(1);
                    throw th2;
                }
            }
            JournalClient.this.disconnectCallback.onDisconnect(i);
        }

        static {
            $assertionsDisabled = !JournalClient.class.desiredAssertionStatus();
        }
    }

    public JournalClient(JournalWriterFactory journalWriterFactory) {
        this(journalWriterFactory, (CredentialProvider) null);
    }

    public JournalClient(JournalWriterFactory journalWriterFactory, CredentialProvider credentialProvider) {
        this(new ClientConfig(), journalWriterFactory, credentialProvider);
    }

    public JournalClient(ClientConfig clientConfig, JournalWriterFactory journalWriterFactory) {
        this(clientConfig, journalWriterFactory, null);
    }

    public JournalClient(ClientConfig clientConfig, JournalWriterFactory journalWriterFactory, CredentialProvider credentialProvider) {
        this.remoteKeys = new ObjList<>();
        this.localKeys = new ObjList<>();
        this.listeners = new ObjList<>();
        this.writers = new ObjList<>();
        this.deltaConsumers = new ObjList<>();
        this.statusSentList = new IntList();
        this.commandProducer = new CommandProducer();
        this.commandConsumer = new CommandConsumer();
        this.setKeyRequestProducer = new SetKeyRequestProducer();
        this.charSequenceResponseConsumer = new CharSequenceResponseConsumer();
        this.journalClientStateProducer = new JournalClientStateProducer();
        this.intResponseConsumer = new IntResponseConsumer();
        this.intResponseProducer = new IntResponseProducer();
        this.byteArrayResponseProducer = new ByteArrayResponseProducer();
        this.running = new AtomicBoolean(false);
        this.disconnectCallback = new DisconnectCallbackImpl();
        this.config = clientConfig;
        this.factory = journalWriterFactory;
        this.service = Executors.newCachedThreadPool(CLIENT_THREAD_FACTORY);
        this.credentialProvider = credentialProvider;
    }

    public void halt() {
        if (!this.running.compareAndSet(true, false)) {
            closeChannel();
            return;
        }
        try {
        } catch (InterruptedException | ExecutionException e) {
            LOG.error().$((CharSequence) "Exception while waiting for client to shutdown gracefully").$(e).$();
        } finally {
            this.handlerFuture = null;
        }
        if (this.handlerFuture != null) {
            this.handlerFuture.get();
        }
        close0();
        free();
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public void setDisconnectCallback(DisconnectCallback disconnectCallback) {
        this.disconnectCallback.next = disconnectCallback;
    }

    public void start() throws JournalNetworkException {
        if (this.running.compareAndSet(false, true)) {
            handshake();
            this.handlerFuture = this.service.submit(new Handler());
        }
    }

    public <T> void subscribe(Class<T> cls) {
        subscribe(cls, (TxListener) null);
    }

    public <T> void subscribe(Class<T> cls, String str) {
        subscribe(cls, str, (TxListener) null);
    }

    public <T> void subscribe(Class<T> cls, String str, String str2) {
        subscribe(cls, str, str2, (TxListener) null);
    }

    public <T> void subscribe(Class<T> cls, String str, String str2, TxListener txListener) {
        subscribe(new JournalKey(cls, str), new JournalKey(cls, str2), txListener);
    }

    public <T> void subscribe(Class<T> cls, String str, String str2, int i) {
        subscribe(cls, str, str2, i, null);
    }

    public <T> void subscribe(Class<T> cls, String str, String str2, int i, TxListener txListener) {
        subscribe(new JournalKey(cls, str, 4, i), new JournalKey(cls, str2, 4, i), txListener);
    }

    public <T> void subscribe(JournalKey<T> journalKey, JournalWriter<T> journalWriter, TxListener txListener) {
        this.remoteKeys.add(journalKey);
        this.localKeys.add(journalWriter.getKey());
        this.listeners.add(txListener);
        set0(this.remoteKeys.size() - 1, journalWriter, txListener);
    }

    public void subscribe(JournalKey journalKey, JournalKey journalKey2, TxListener txListener) {
        this.remoteKeys.add(journalKey);
        this.localKeys.add(journalKey2);
        this.listeners.add(txListener);
    }

    private void checkAck() throws JournalNetworkException {
        this.charSequenceResponseConsumer.read(this.channel);
        fail(Chars.equals("OK", this.charSequenceResponseConsumer.getValue()), this.charSequenceResponseConsumer.getValue().toString());
    }

    private void checkAuthAndSendCredential() throws JournalNetworkException {
        this.commandProducer.write(this.channel, (byte) 9);
        CharSequence readString = readString();
        if (!Chars.equals("AUTH", readString)) {
            if (Chars.equals("OK", readString)) {
                return;
            }
            fail(true, "Unknown server response");
        } else {
            if (this.credentialProvider == null) {
                throw new AuthConfigurationException();
            }
            this.commandProducer.write(this.channel, (byte) 10);
            this.byteArrayResponseProducer.write((WritableByteChannel) this.channel, (ByteChannel) getToken());
            CharSequence readString2 = readString();
            if (!Chars.equals("OK", readString2)) {
                throw new AuthFailureException(readString2.toString());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void close0() {
        closeChannel();
        int size = this.writers.size();
        for (int i = 0; i < size; i++) {
            this.writers.getQuick(i).close();
        }
        this.writers.clear();
        this.statusSentList.clear();
        this.deltaConsumers.clear();
    }

    private void closeChannel() {
        if (this.channel != null) {
            try {
            } catch (IOException e) {
                LOG.error().$((CharSequence) "Error closing channel").$((Throwable) e).$();
            } finally {
                this.channel = null;
            }
            if (this.channel.isOpen()) {
                this.channel.close();
            }
        }
    }

    private void fail(boolean z, String str) throws JournalNetworkException {
        if (!z) {
            throw new JournalNetworkException(str);
        }
    }

    private void free() {
        int size = this.deltaConsumers.size();
        for (int i = 0; i < size; i++) {
            this.deltaConsumers.getQuick(i).free();
        }
        this.commandConsumer.free();
        this.charSequenceResponseConsumer.free();
        this.intResponseConsumer.free();
    }

    private byte[] getToken() throws JournalNetworkException {
        try {
            return this.credentialProvider.createToken();
        } catch (Exception e) {
            halt();
            throw new JournalNetworkException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handshake() throws JournalNetworkException {
        openChannel(null);
        sendProtocolVersion();
        sendKeys();
        checkAuthAndSendCredential();
        sendState();
        counter.incrementAndGet();
    }

    private void openChannel(ServerNode serverNode) throws JournalNetworkException {
        if (this.channel == null || serverNode != null) {
            if (this.channel != null) {
                closeChannel();
            }
            SocketChannel openSocketChannel = serverNode == null ? this.config.openSocketChannel() : this.config.openSocketChannel(serverNode);
            try {
                this.statsChannel = new StatsCollectingReadableByteChannel(openSocketChannel.getRemoteAddress());
                SslConfig sslConfig = this.config.getSslConfig();
                if (sslConfig.isSecure()) {
                    this.channel = new SecureSocketChannel(openSocketChannel, sslConfig);
                } else {
                    this.channel = openSocketChannel;
                }
            } catch (IOException e) {
                throw new JournalNetworkException("Cannot get remote address", e);
            }
        }
    }

    private CharSequence readString() throws JournalNetworkException {
        this.charSequenceResponseConsumer.read(this.channel);
        return this.charSequenceResponseConsumer.getValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendDisconnect() throws JournalNetworkException {
        this.commandProducer.write(this.channel, (byte) 7);
    }

    private void sendKeys() throws JournalNetworkException {
        int size = this.remoteKeys.size();
        for (int i = 0; i < size; i++) {
            this.commandProducer.write(this.channel, (byte) 1);
            this.setKeyRequestProducer.write((WritableByteChannel) this.channel, (ByteChannel) new IndexedJournalKey(i, this.remoteKeys.getQuick(i)));
            checkAck();
            File makeTempFile = Files.makeTempFile();
            try {
                try {
                    HugeBufferConsumer hugeBufferConsumer = new HugeBufferConsumer(makeTempFile);
                    Throwable th = null;
                    try {
                        try {
                            hugeBufferConsumer.read(this.channel);
                            JournalMetadata journalMetadata = new JournalMetadata(hugeBufferConsumer.getHb());
                            if (hugeBufferConsumer != null) {
                                if (0 != 0) {
                                    try {
                                        hugeBufferConsumer.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    hugeBufferConsumer.close();
                                }
                            }
                            try {
                                if (this.writers.getQuiet(i) == null) {
                                    set0(i, this.factory.writer(new JournalStructure(journalMetadata).location2(this.localKeys.getQuick(i).derivedLocation())), this.listeners.getQuick(i));
                                }
                            } catch (JournalException e) {
                                throw new JournalNetworkException(e);
                            }
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                    Files.delete(makeTempFile);
                }
            } catch (JournalException e2) {
                throw new JournalNetworkException(e2);
            }
        }
    }

    private void sendProtocolVersion() throws JournalNetworkException {
        this.commandProducer.write(this.channel, (byte) 8);
        this.intResponseProducer.write(this.channel, 2);
        checkAck();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendReady() throws JournalNetworkException {
        this.commandProducer.write(this.channel, (byte) 3);
        LOG.debug().$((CharSequence) "Client ready: ").$((CharSequence) this.channel.toString()).$();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendState() throws JournalNetworkException {
        int size = this.writers.size();
        for (int i = 0; i < size; i++) {
            if (this.statusSentList.get(i) == 0) {
                this.commandProducer.write(this.channel, (byte) 2);
                this.journalClientStateProducer.write((WritableByteChannel) this.channel, (ByteChannel) new IndexedJournal(i, this.writers.getQuick(i)));
                checkAck();
                this.statusSentList.setQuick(i, 1);
            }
        }
        sendReady();
    }

    private <T> void set0(int i, JournalWriter<T> journalWriter, TxListener txListener) {
        this.statusSentList.extendAndSet(i, 0);
        this.deltaConsumers.extendAndSet(i, new JournalDeltaConsumer(journalWriter.setCommitOnClose(false)));
        this.writers.extendAndSet(i, journalWriter);
        if (txListener != null) {
            journalWriter.setTxListener(txListener);
        }
    }

    private <T> void subscribe(Class<T> cls, TxListener txListener) {
        subscribe(new JournalKey(cls), new JournalKey(cls), txListener);
    }

    private <T> void subscribe(Class<T> cls, String str, TxListener txListener) {
        subscribe(new JournalKey(cls, str), new JournalKey(cls, str), txListener);
    }
}
