package com.questdb.net.ha;

import com.questdb.Journal;
import com.questdb.JournalKey;
import com.questdb.ex.JournalDisconnectedChannelException;
import com.questdb.ex.JournalException;
import com.questdb.ex.JournalNetworkException;
import com.questdb.factory.configuration.JournalConfiguration;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.net.ha.auth.AuthorizationHandler;
import com.questdb.net.ha.bridge.JournalEventHandler;
import com.questdb.net.ha.bridge.JournalEventProcessor;
import com.questdb.net.ha.comsumer.JournalClientStateConsumer;
import com.questdb.net.ha.config.ServerConfig;
import com.questdb.net.ha.model.Command;
import com.questdb.net.ha.model.IndexedJournalKey;
import com.questdb.net.ha.model.JournalClientState;
import com.questdb.net.ha.producer.HugeBufferProducer;
import com.questdb.net.ha.producer.JournalDeltaProducer;
import com.questdb.net.ha.protocol.CommandConsumer;
import com.questdb.net.ha.protocol.CommandProducer;
import com.questdb.net.ha.protocol.commands.ByteArrayResponseConsumer;
import com.questdb.net.ha.protocol.commands.IntResponseConsumer;
import com.questdb.net.ha.protocol.commands.IntResponseProducer;
import com.questdb.net.ha.protocol.commands.SetKeyRequestConsumer;
import com.questdb.net.ha.protocol.commands.StringResponseProducer;
import com.questdb.std.IntIntHashMap;
import com.questdb.std.IntList;
import com.questdb.std.ObjList;
import java.io.File;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.WritableByteChannel;

/* loaded from: input_file:com/questdb/net/ha/JournalServerAgent.class */
public class JournalServerAgent {
    private static final Log LOG = LogFactory.getLog(JournalServerAgent.class);
    private static final byte JOURNAL_INDEX_NOT_FOUND = -1;
    private final JournalServer server;
    private final StatsCollectingWritableByteChannel statsChannel;
    private final JournalEventProcessor eventProcessor;
    private final AuthorizationHandler authorizationHandler;
    private final SocketAddress socketAddress;
    private boolean authorized;
    private final IntIntHashMap writerToReaderMap = new IntIntHashMap();
    private final IntList readerToWriterMap = new IntList();
    private final CommandConsumer commandConsumer = new CommandConsumer();
    private final CommandProducer commandProducer = new CommandProducer();
    private final SetKeyRequestConsumer setKeyRequestConsumer = new SetKeyRequestConsumer();
    private final StringResponseProducer stringResponseProducer = new StringResponseProducer();
    private final JournalClientStateConsumer journalClientStateConsumer = new JournalClientStateConsumer();
    private final IntResponseProducer intResponseProducer = new IntResponseProducer();
    private final IntResponseConsumer intResponseConsumer = new IntResponseConsumer();
    private final ObjList<Journal> readers = new ObjList<>();
    private final ObjList<JournalDeltaProducer> producers = new ObjList<>();
    private final ObjList<JournalClientState> clientStates = new ObjList<>();
    private final EventHandler handler = new EventHandler();
    private final ByteArrayResponseConsumer byteArrayResponseConsumer = new ByteArrayResponseConsumer();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/questdb/net/ha/JournalServerAgent$EventHandler.class */
    public class EventHandler implements JournalEventHandler {
        private WritableByteChannel channel;
        private boolean dataSent;

        private EventHandler() {
            this.dataSent = false;
        }

        @Override // com.questdb.net.ha.bridge.JournalEventHandler
        public void handle(int i) {
            JournalClientState journalClientState;
            int i2 = JournalServerAgent.this.writerToReaderMap.get(i);
            if (i2 == -1 || (journalClientState = (JournalClientState) JournalServerAgent.this.clientStates.getQuick(i2)) == null || !journalClientState.isWaitingOnEvents()) {
                return;
            }
            journalClientState.setWaitingOnEvents(false);
            this.dataSent = JournalServerAgent.this.dispatch0(this.channel, i2) || this.dataSent;
        }

        public boolean isDataSent() {
            return this.dataSent;
        }

        public void setChannel(WritableByteChannel writableByteChannel) {
            this.channel = writableByteChannel;
            this.dataSent = false;
        }
    }

    public JournalServerAgent(JournalServer journalServer, SocketAddress socketAddress, AuthorizationHandler authorizationHandler) {
        this.server = journalServer;
        this.socketAddress = socketAddress;
        this.statsChannel = new StatsCollectingWritableByteChannel(socketAddress);
        this.eventProcessor = new JournalEventProcessor(journalServer.getBridge());
        this.authorizationHandler = authorizationHandler;
        this.authorized = authorizationHandler == null;
        this.readerToWriterMap.zero(-1);
    }

    public void close() {
        this.server.getBridge().removeAgentSequence(this.eventProcessor.getSequence());
        this.journalClientStateConsumer.free();
        this.commandConsumer.free();
        this.setKeyRequestConsumer.free();
        this.intResponseConsumer.free();
        this.byteArrayResponseConsumer.free();
        this.commandProducer.free();
        this.stringResponseProducer.free();
        this.intResponseProducer.free();
        int size = this.producers.size();
        for (int i = 0; i < size; i++) {
            this.producers.getQuick(i).free();
        }
    }

    public void process(ByteChannel byteChannel) throws JournalNetworkException {
        this.commandConsumer.read(byteChannel);
        switch (this.commandConsumer.getCommand()) {
            case 1:
                setClientKey(byteChannel);
                return;
            case 2:
                checkAuthorized(byteChannel);
                LOG.debug().$(this.socketAddress).$((CharSequence) " DeltaRequest command received").$();
                this.journalClientStateConsumer.read(byteChannel);
                storeDeltaRequest(byteChannel, this.journalClientStateConsumer.getValue());
                return;
            case 3:
                checkAuthorized(byteChannel);
                this.statsChannel.setDelegate(byteChannel);
                dispatch(this.statsChannel);
                this.statsChannel.logStats();
                return;
            case 4:
            case 5:
            case 6:
            case 11:
            case 12:
            default:
                throw new JournalNetworkException("Corrupt channel");
            case 7:
                throw new JournalDisconnectedChannelException();
            case 8:
                checkProtocolVersion(byteChannel, this.intResponseConsumer.getValue(byteChannel));
                return;
            case 9:
                if (this.authorized) {
                    ok(byteChannel);
                    return;
                } else {
                    this.stringResponseProducer.write((WritableByteChannel) byteChannel, (ByteChannel) "AUTH");
                    return;
                }
            case 10:
                this.byteArrayResponseConsumer.read(byteChannel);
                authorize(byteChannel, this.byteArrayResponseConsumer.getValue());
                return;
            case Command.ELECTION /* 13 */:
                this.server.handleElectionMessage(byteChannel);
                return;
            case 14:
                this.server.handleElectedMessage(byteChannel);
                return;
        }
    }

    private void authorize(WritableByteChannel writableByteChannel, byte[] bArr) throws JournalNetworkException {
        if (!this.authorized) {
            try {
                int size = this.readers.size();
                ObjList<JournalKey> objList = new ObjList<>(size);
                for (int i = 0; i < size; i++) {
                    objList.add(this.readers.getQuick(i).getKey());
                }
                this.authorized = this.authorizationHandler.isAuthorized(bArr, objList);
            } catch (Throwable th) {
                LOG.error().$(this.socketAddress).$((CharSequence) " Exception in authorization handler:").$(th).$();
                this.authorized = false;
            }
        }
        if (this.authorized) {
            ok(writableByteChannel);
        } else {
            error(writableByteChannel, "Authorization failed");
        }
    }

    private void checkAuthorized(WritableByteChannel writableByteChannel) throws JournalNetworkException {
        if (this.authorized) {
            return;
        }
        error(writableByteChannel, "NOT AUTHORIZED");
        throw new JournalDisconnectedChannelException();
    }

    private void checkProtocolVersion(ByteChannel byteChannel, int i) throws JournalNetworkException {
        if (i == 2) {
            ok(byteChannel);
        } else {
            error(byteChannel, "Unsupported protocol version. Client: " + i + ", Server: 2");
        }
    }

    private <T> void createReader(int i, JournalKey<T> journalKey) throws JournalException {
        Journal<T> quiet = this.readers.getQuiet(i);
        if (quiet == null) {
            ObjList<Journal> objList = this.readers;
            Journal<T> reader = this.server.getFactory().reader(journalKey);
            quiet = reader;
            objList.extendAndSet(i, reader);
        }
        if (this.producers.getQuiet(i) == null) {
            this.producers.extendAndSet(i, new JournalDeltaProducer(quiet));
        }
    }

    private void dispatch(WritableByteChannel writableByteChannel) throws JournalNetworkException {
        if (processJournalEvents(writableByteChannel, false)) {
            return;
        }
        processJournalEvents(writableByteChannel, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean dispatch0(WritableByteChannel writableByteChannel, int i) {
        long currentTimeMillis = System.currentTimeMillis();
        JournalClientState journalClientState = this.clientStates.get(i);
        if (journalClientState == null || journalClientState.isClientStateInvalid()) {
            return false;
        }
        if (journalClientState.isWaitingOnEvents() && currentTimeMillis - journalClientState.getClientStateSyncTime() <= ServerConfig.SYNC_TIMEOUT) {
            return false;
        }
        try {
            boolean dispatchProducer = dispatchProducer(writableByteChannel, journalClientState.getTxn(), journalClientState.getTxPin(), getProducer(i), i);
            if (dispatchProducer) {
                journalClientState.invalidateClientState();
            } else {
                journalClientState.setClientStateSyncTime(currentTimeMillis);
            }
            return dispatchProducer;
        } catch (Exception e) {
            LOG.debug().$(this.socketAddress).$((CharSequence) " Client appears to be refusing new data from server, corrupt client").$((Throwable) e).$();
            return false;
        }
    }

    private boolean dispatchProducer(WritableByteChannel writableByteChannel, long j, long j2, JournalDeltaProducer journalDeltaProducer, int i) throws JournalNetworkException, JournalException {
        journalDeltaProducer.configure(j, j2);
        if (!journalDeltaProducer.hasContent()) {
            return false;
        }
        LOG.debug().$(this.socketAddress).$((CharSequence) " Sending data").$();
        this.commandProducer.write(writableByteChannel, (byte) 4);
        this.intResponseProducer.write(writableByteChannel, i);
        journalDeltaProducer.write(writableByteChannel);
        return true;
    }

    private void error(WritableByteChannel writableByteChannel, String str) throws JournalNetworkException {
        error(writableByteChannel, str, null);
    }

    private void error(WritableByteChannel writableByteChannel, String str, Exception exc) throws JournalNetworkException {
        this.stringResponseProducer.write(writableByteChannel, (WritableByteChannel) str);
        LOG.info().$(this.socketAddress).$(' ').$((CharSequence) str).$((Throwable) exc).$();
    }

    private JournalDeltaProducer getProducer(int i) {
        return this.producers.get(i);
    }

    private void ok(WritableByteChannel writableByteChannel) throws JournalNetworkException {
        this.stringResponseProducer.write(writableByteChannel, (WritableByteChannel) "OK");
    }

    private boolean processJournalEvents(WritableByteChannel writableByteChannel, boolean z) throws JournalNetworkException {
        this.handler.setChannel(writableByteChannel);
        boolean z2 = false;
        if (this.eventProcessor.process(this.handler, z)) {
            z2 = this.handler.isDataSent();
            int size = this.clientStates.size();
            for (int i = 0; i < size; i++) {
                JournalClientState quick = this.clientStates.getQuick(i);
                if (quick.isWaitingOnEvents()) {
                    z2 = dispatch0(writableByteChannel, i) || z2;
                }
                quick.setWaitingOnEvents(true);
            }
            if (z2) {
                this.commandProducer.write(writableByteChannel, (byte) 5);
            } else if (z) {
                this.commandProducer.write(writableByteChannel, (byte) 6);
            }
        } else if (this.server.isRunning()) {
            this.commandProducer.write(writableByteChannel, (byte) 6);
        } else {
            this.commandProducer.write(writableByteChannel, (byte) 12);
        }
        return z2;
    }

    private void sendMetadata(WritableByteChannel writableByteChannel, int i) throws JournalException, JournalNetworkException {
        HugeBufferProducer hugeBufferProducer = new HugeBufferProducer(new File(this.readers.get(i).getLocation(), JournalConfiguration.FILE_NAME));
        Throwable th = null;
        try {
            try {
                hugeBufferProducer.write(writableByteChannel);
                if (hugeBufferProducer != null) {
                    if (0 == 0) {
                        hugeBufferProducer.close();
                        return;
                    }
                    try {
                        hugeBufferProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hugeBufferProducer != null) {
                if (th != null) {
                    try {
                        hugeBufferProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hugeBufferProducer.close();
                }
            }
            throw th4;
        }
    }

    private void setClientKey(ByteChannel byteChannel) throws JournalNetworkException {
        LOG.debug().$(this.socketAddress).$((CharSequence) " SetKey command received").$();
        this.setKeyRequestConsumer.read(byteChannel);
        IndexedJournalKey value = this.setKeyRequestConsumer.getValue();
        JournalKey key = value.getKey();
        int index = value.getIndex();
        IndexedJournalKey writerIndex0 = this.server.getWriterIndex0(key);
        if (writerIndex0 == null) {
            error(byteChannel, "Requested key not exported: " + key);
            return;
        }
        this.writerToReaderMap.put(writerIndex0.getIndex(), index);
        this.readerToWriterMap.extendAndSet(index, writerIndex0.getIndex());
        try {
            createReader(index, writerIndex0.getKey());
            ok(byteChannel);
            sendMetadata(byteChannel, index);
        } catch (JournalException e) {
            error(byteChannel, "Could not created reader for key: " + key, e);
        }
    }

    private void storeDeltaRequest(WritableByteChannel writableByteChannel, JournalClientState journalClientState) throws JournalNetworkException {
        int journalIndex = journalClientState.getJournalIndex();
        if (this.readerToWriterMap.getQuiet(journalIndex) == -1) {
            error(writableByteChannel, "Journal index does not match key request");
            return;
        }
        JournalClientState quiet = this.clientStates.getQuiet(journalIndex);
        if (quiet == null) {
            quiet = new JournalClientState();
            this.clientStates.extendAndSet(journalIndex, quiet);
        }
        journalClientState.deepCopy(quiet);
        quiet.invalidateClientState();
        quiet.setClientStateSyncTime(0L);
        quiet.setWaitingOnEvents(true);
        ok(writableByteChannel);
    }
}
