package org.bboxdb.network.server.connection;

import com.google.common.io.ByteStreams;
import io.prometheus.client.Gauge;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.bboxdb.commons.CloseableHelper;
import org.bboxdb.commons.concurrent.ExceptionSafeRunnable;
import org.bboxdb.commons.concurrent.ExecutorUtil;
import org.bboxdb.commons.service.ServiceState;
import org.bboxdb.network.NetworkPackageDecoder;
import org.bboxdb.network.capabilities.PeerCapabilities;
import org.bboxdb.network.packages.NetworkResponsePackage;
import org.bboxdb.network.packages.PackageEncodeException;
import org.bboxdb.network.packages.response.CompressionEnvelopeResponse;
import org.bboxdb.network.packages.response.ErrorResponse;
import org.bboxdb.network.packages.response.JoinedTupleResponse;
import org.bboxdb.network.packages.response.TupleResponse;
import org.bboxdb.network.routing.PackageRouter;
import org.bboxdb.network.routing.RoutingHeaderParser;
import org.bboxdb.network.server.ClientQuery;
import org.bboxdb.network.server.ErrorMessages;
import org.bboxdb.network.server.connection.handler.query.HandleBoundingBoxQuery;
import org.bboxdb.network.server.connection.handler.query.HandleBoundingBoxTimeQuery;
import org.bboxdb.network.server.connection.handler.query.HandleContinuousQuery;
import org.bboxdb.network.server.connection.handler.query.HandleInsertTimeQuery;
import org.bboxdb.network.server.connection.handler.query.HandleJoinQuery;
import org.bboxdb.network.server.connection.handler.query.HandleKeyQuery;
import org.bboxdb.network.server.connection.handler.query.HandleVersionTimeQuery;
import org.bboxdb.network.server.connection.handler.query.QueryHandler;
import org.bboxdb.network.server.connection.handler.request.CancelRequestHandler;
import org.bboxdb.network.server.connection.handler.request.CompressionHandler;
import org.bboxdb.network.server.connection.handler.request.CreateDistributionGroupHandler;
import org.bboxdb.network.server.connection.handler.request.CreateTableHandler;
import org.bboxdb.network.server.connection.handler.request.DeleteDistributionGroupHandler;
import org.bboxdb.network.server.connection.handler.request.DeleteTableHandler;
import org.bboxdb.network.server.connection.handler.request.DisconnectHandler;
import org.bboxdb.network.server.connection.handler.request.HandshakeHandler;
import org.bboxdb.network.server.connection.handler.request.InsertTupleHandler;
import org.bboxdb.network.server.connection.handler.request.KeepAliveHandler;
import org.bboxdb.network.server.connection.handler.request.LockTupleHandler;
import org.bboxdb.network.server.connection.handler.request.NextPageHandler;
import org.bboxdb.network.server.connection.handler.request.RequestHandler;
import org.bboxdb.network.server.connection.lock.LockHelper;
import org.bboxdb.network.server.connection.lock.LockManager;
import org.bboxdb.storage.entity.JoinedTuple;
import org.bboxdb.storage.tuplestore.manager.TupleStoreManagerRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/bboxdb/network/server/connection/ClientConnectionHandler.class */
public class ClientConnectionHandler extends ExceptionSafeRunnable {
    public final Socket clientSocket;
    private BufferedOutputStream outputStream;
    private InputStream inputStream;
    private final Map<Short, ClientQuery> activeQueries;
    private final ExecutorService threadPool;
    private final PackageRouter packageRouter;
    private final List<NetworkResponsePackage> pendingCompressionPackages;
    private static final int MAX_PENDING_REQUESTS = 25;
    private static final int MAX_RUNNING_QUERIES = 25;
    private Map<Short, RequestHandler> requestHandlers;
    private Map<Byte, QueryHandler> queryHandlerList;
    private ConnectionMaintenanceRunnable maintenanceThread;
    private final TupleStoreManagerRegistry storageRegistry;
    private final LockManager lockManager;
    private static final Gauge readBytesCounter = Gauge.build().name("bboxdb_network_read_bytes").help("Total read bytes from network").register();
    private static final Gauge writtenBytesCounter = Gauge.build().name("bboxdb_network_write_bytes").help("Total written bytes to network").register();
    private static final Gauge activeConnectionsTotal = Gauge.build().name("bboxdb_network_connections_total").help("Total amount of active network connections").register();
    private static final Logger logger = LoggerFactory.getLogger(ClientConnectionHandler.class);
    private PeerCapabilities connectionCapabilities = new PeerCapabilities();
    private final ServiceState serviceState = new ServiceState();

    /* loaded from: input_file:org/bboxdb/network/server/connection/ClientConnectionHandler$ConnectionMaintenanceRunnable.class */
    class ConnectionMaintenanceRunnable extends ExceptionSafeRunnable {
        ConnectionMaintenanceRunnable() {
        }

        protected void beginHook() {
            ClientConnectionHandler.logger.debug("Starting connection mainteinance thread for: {}", ClientConnectionHandler.this.getConnectionName());
        }

        protected void endHook() {
            ClientConnectionHandler.logger.debug("Mainteinance thread for {} has terminated", ClientConnectionHandler.this.getConnectionName());
        }

        protected void runThread() throws Exception {
            while (true) {
                if (!ClientConnectionHandler.this.serviceState.isInStartingState() && !ClientConnectionHandler.this.serviceState.isInRunningState()) {
                    return;
                }
                ClientConnectionHandler.this.flushPendingCompressionPackages();
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    public ClientConnectionHandler(TupleStoreManagerRegistry tupleStoreManagerRegistry, Socket socket, LockManager lockManager) {
        this.clientSocket = socket;
        this.storageRegistry = tupleStoreManagerRegistry;
        this.lockManager = lockManager;
        this.serviceState.registerCallback(serviceState -> {
            if (serviceState.isInStartingState()) {
                activeConnectionsTotal.inc();
            }
        });
        this.serviceState.registerCallback(serviceState2 -> {
            if (serviceState2.isInFinishedState()) {
                activeConnectionsTotal.dec();
            }
        });
        this.serviceState.registerCallback(serviceState3 -> {
            if (serviceState3.isInFinishedState()) {
                LockHelper.handleLockRemove(this);
            }
        });
        this.serviceState.dipatchToStarting();
        try {
            this.outputStream = new BufferedOutputStream(socket.getOutputStream());
            this.inputStream = new BufferedInputStream(socket.getInputStream());
        } catch (IOException e) {
            this.inputStream = null;
            this.outputStream = null;
            this.serviceState.dispatchToFailed(e);
            logger.error("Exception while creating IO stream", e);
        }
        this.activeQueries = new HashMap();
        this.threadPool = ExecutorUtil.getBoundThreadPoolExecutor(25, 25);
        this.packageRouter = new PackageRouter(this.threadPool, this);
        this.pendingCompressionPackages = new ArrayList();
        this.maintenanceThread = new ConnectionMaintenanceRunnable();
        new Thread((Runnable) this.maintenanceThread).start();
        initRequestHandlerMap();
        initQueryHandlerMap();
    }

    private ByteBuffer readNextPackageHeader(InputStream inputStream) throws IOException, PackageEncodeException {
        ByteBuffer allocate = ByteBuffer.allocate(12);
        ByteStreams.readFully(inputStream, allocate.array(), 0, allocate.limit());
        byte[] encodeHeader = RoutingHeaderParser.encodeHeader(RoutingHeaderParser.decodeRoutingHeader(inputStream));
        ByteBuffer allocate2 = ByteBuffer.allocate(allocate.limit() + encodeHeader.length);
        allocate2.put(allocate.array());
        allocate2.put(encodeHeader);
        return allocate2;
    }

    public void flushPendingCompressionPackages() {
        ArrayList arrayList = new ArrayList();
        synchronized (this.pendingCompressionPackages) {
            if (this.pendingCompressionPackages.isEmpty()) {
                return;
            }
            arrayList.addAll(this.pendingCompressionPackages);
            this.pendingCompressionPackages.clear();
            if (logger.isDebugEnabled()) {
                logger.debug("Chunk size is: {}", Integer.valueOf(arrayList.size()));
            }
            try {
                writePackageToSocket(new CompressionEnvelopeResponse((byte) 0, arrayList));
            } catch (IOException | PackageEncodeException e) {
                logger.error("Got an exception while write pending compression packages to client", e);
            }
        }
    }

    public synchronized void writeResultPackage(NetworkResponsePackage networkResponsePackage) throws IOException, PackageEncodeException {
        boolean z;
        if (!this.connectionCapabilities.hasGZipCompression()) {
            writePackageToSocket(networkResponsePackage);
            return;
        }
        synchronized (this.pendingCompressionPackages) {
            this.pendingCompressionPackages.add(networkResponsePackage);
            z = this.pendingCompressionPackages.size() >= 200;
        }
        if (z) {
            flushPendingCompressionPackages();
        }
    }

    public synchronized void writeResultPackageNE(NetworkResponsePackage networkResponsePackage) {
        try {
            writeResultPackage(networkResponsePackage);
        } catch (Exception e) {
            logger.error("Unable to send package", e);
        }
    }

    private void writePackageToSocket(NetworkResponsePackage networkResponsePackage) throws IOException, PackageEncodeException {
        synchronized (this.outputStream) {
            writtenBytesCounter.inc(networkResponsePackage.writeToOutputStream(this.outputStream));
            this.outputStream.flush();
        }
    }

    public void runThread() {
        try {
            logger.debug("Handling new connection from: {}", this.clientSocket.getInetAddress());
            while (true) {
                if (!this.serviceState.isInRunningState() && !this.serviceState.isInStartingState()) {
                    break;
                } else {
                    handleNextPackage(this.inputStream);
                }
            }
            flushPendingCompressionPackages();
            if (!this.serviceState.isInStoppingState()) {
                this.serviceState.dispatchToStopping();
            }
            this.serviceState.dispatchToTerminated();
            logger.info("Closing connection to: {}", this.clientSocket.getInetAddress());
        } catch (IOException | PackageEncodeException e) {
            if (this.serviceState.isInRunningState()) {
                logger.error("Socket to {} closed unexpectly (state: {}), closing connection", this.clientSocket.getInetAddress(), getConnectionState());
                logger.debug("Socket exception", e);
            }
        }
        getThreadPool().shutdown();
        getActiveQueries().values().forEach(clientQuery -> {
            clientQuery.close();
        });
        getActiveQueries().clear();
        CloseableHelper.closeWithoutException(this.clientSocket);
    }

    private ByteBuffer readFullPackage(ByteBuffer byteBuffer, InputStream inputStream) throws IOException {
        int bodyLengthFromRequestPackage = (int) NetworkPackageDecoder.getBodyLengthFromRequestPackage(byteBuffer);
        int limit = byteBuffer.limit() + bodyLengthFromRequestPackage;
        ByteBuffer allocate = ByteBuffer.allocate(limit);
        try {
            allocate.put(byteBuffer.array());
            ByteStreams.readFully(inputStream, allocate.array(), allocate.position(), bodyLengthFromRequestPackage);
            readBytesCounter.inc(limit);
            return allocate;
        } catch (IOException e) {
            this.serviceState.dispatchToStopping();
            throw e;
        }
    }

    public void handleNextPackage(InputStream inputStream) throws IOException, PackageEncodeException {
        ByteBuffer readNextPackageHeader = readNextPackageHeader(inputStream);
        short requestIDFromRequestPackage = NetworkPackageDecoder.getRequestIDFromRequestPackage(readNextPackageHeader);
        short packageTypeFromRequest = NetworkPackageDecoder.getPackageTypeFromRequest(readNextPackageHeader);
        if (!this.serviceState.isInStartingState() || packageTypeFromRequest == 0) {
            if (handleBufferedPackage(readFullPackage(readNextPackageHeader, inputStream), requestIDFromRequestPackage, packageTypeFromRequest)) {
                return;
            }
            this.serviceState.dispatchToStopping();
        } else {
            String str = "Connection is in handshake state but got package: " + ((int) packageTypeFromRequest);
            logger.error(str);
            this.serviceState.dispatchToFailed(new IllegalStateException(str));
        }
    }

    public void writeResultTuple(short s, JoinedTuple joinedTuple, boolean z) throws IOException, PackageEncodeException {
        if (joinedTuple.getNumberOfTuples() > 1 || z) {
            writeResultPackage(new JoinedTupleResponse(s, joinedTuple));
        } else {
            writeResultPackage(new TupleResponse(s, joinedTuple.getTupleStoreName(0), joinedTuple.getTuple(0)));
        }
    }

    private boolean handleQuery(ByteBuffer byteBuffer, short s) throws IOException, PackageEncodeException {
        byte queryTypeFromRequest = NetworkPackageDecoder.getQueryTypeFromRequest(byteBuffer);
        if (!this.queryHandlerList.containsKey(Byte.valueOf(queryTypeFromRequest))) {
            logger.warn("Unsupported query type: {}", Byte.valueOf(queryTypeFromRequest));
            writeResultPackage(new ErrorResponse(s, ErrorMessages.ERROR_UNSUPPORTED_PACKAGE_TYPE));
            return false;
        }
        if (this.activeQueries.size() <= 25) {
            this.queryHandlerList.get(Byte.valueOf(queryTypeFromRequest)).handleQuery(byteBuffer, s, this);
            return true;
        }
        logger.warn("Client requested more than {} parallel queries", 25);
        writeResultPackage(new ErrorResponse(s, ErrorMessages.ERROR_QUERY_TO_MUCH));
        return true;
    }

    private boolean handleBufferedPackage(ByteBuffer byteBuffer, short s, short s2) throws PackageEncodeException, IOException {
        if (s2 == 7) {
            if (logger.isDebugEnabled()) {
                logger.debug("Got query package");
            }
            return handleQuery(byteBuffer, s);
        }
        if (!this.requestHandlers.containsKey(Short.valueOf(s2))) {
            logger.error("Got unknown package type, closing connection: " + ((int) s2));
            this.serviceState.dispatchToStopping();
            return false;
        }
        RequestHandler requestHandler = this.requestHandlers.get(Short.valueOf(s2));
        if (logger.isDebugEnabled()) {
            logger.debug("Dispatching package to handler: {}", requestHandler);
        }
        return requestHandler.handleRequest(byteBuffer, s, this);
    }

    private void initRequestHandlerMap() {
        this.requestHandlers = new HashMap();
        this.requestHandlers.put((short) 0, new HandshakeHandler());
        this.requestHandlers.put((short) 16, new CompressionHandler());
        this.requestHandlers.put((short) 6, new DisconnectHandler());
        this.requestHandlers.put((short) 3, new CreateTableHandler());
        this.requestHandlers.put((short) 4, new DeleteTableHandler());
        this.requestHandlers.put((short) 1, new InsertTupleHandler());
        this.requestHandlers.put((short) 8, new CreateDistributionGroupHandler());
        this.requestHandlers.put((short) 9, new DeleteDistributionGroupHandler());
        this.requestHandlers.put((short) 17, new KeepAliveHandler());
        this.requestHandlers.put((short) 18, new NextPageHandler());
        this.requestHandlers.put((short) 19, new CancelRequestHandler());
        this.requestHandlers.put((short) 5, new LockTupleHandler());
    }

    private void initQueryHandlerMap() {
        this.queryHandlerList = new HashMap();
        this.queryHandlerList.put((byte) 1, new HandleKeyQuery());
        this.queryHandlerList.put((byte) 2, new HandleBoundingBoxQuery());
        this.queryHandlerList.put((byte) 3, new HandleVersionTimeQuery());
        this.queryHandlerList.put((byte) 4, new HandleInsertTimeQuery());
        this.queryHandlerList.put((byte) 5, new HandleBoundingBoxTimeQuery());
        this.queryHandlerList.put((byte) 6, new HandleContinuousQuery());
        this.queryHandlerList.put((byte) 7, new HandleJoinQuery());
    }

    public void sendNextResultsForQuery(final short s, final short s2) throws IOException, PackageEncodeException {
        if (!getActiveQueries().containsKey(Short.valueOf(s2))) {
            logger.error("Unable to resume query {} - package {} - not found", Short.valueOf(s2), Short.valueOf(s));
            writeResultPackage(new ErrorResponse(s, ErrorMessages.ERROR_QUERY_NOT_FOUND));
            return;
        }
        ExceptionSafeRunnable exceptionSafeRunnable = new ExceptionSafeRunnable() { // from class: org.bboxdb.network.server.connection.ClientConnectionHandler.1
            protected void runThread() throws IOException, PackageEncodeException {
                ClientQuery clientQuery = ClientConnectionHandler.this.getActiveQueries().get(Short.valueOf(s2));
                if (clientQuery == null) {
                    ClientConnectionHandler.logger.error("Unable to resume query {}, not found", Short.valueOf(s2));
                    return;
                }
                clientQuery.fetchAndSendNextTuples(s);
                if (clientQuery.isQueryDone()) {
                    ClientConnectionHandler.logger.info("Query {} is done with {} tuples, removing iterator ", Short.valueOf(s2), Long.valueOf(clientQuery.getTotalSendTuples()));
                    clientQuery.close();
                    ClientConnectionHandler.this.getActiveQueries().remove(Short.valueOf(s2));
                }
            }

            protected void afterExceptionHook() {
                try {
                    ClientConnectionHandler.this.writeResultPackage(new ErrorResponse(s, ErrorMessages.ERROR_EXCEPTION));
                } catch (IOException | PackageEncodeException e) {
                    ClientConnectionHandler.logger.error("Unable to send result package", e);
                }
            }
        };
        if (!this.threadPool.isShutdown()) {
            getThreadPool().submit((Runnable) exceptionSafeRunnable);
        } else {
            logger.warn("Thread pool is shutting down, don't execute query: {}", Short.valueOf(s2));
            writeResultPackage(new ErrorResponse(s, ErrorMessages.ERROR_EXCEPTION));
        }
    }

    public PeerCapabilities getConnectionCapabilities() {
        return this.connectionCapabilities;
    }

    public void setConnectionCapabilities(PeerCapabilities peerCapabilities) {
        this.connectionCapabilities = peerCapabilities;
    }

    public ServiceState getConnectionState() {
        return this.serviceState;
    }

    public void setConnectionStateToOpen() {
        this.serviceState.dispatchToRunning();
    }

    public Map<Short, ClientQuery> getActiveQueries() {
        return this.activeQueries;
    }

    public ExecutorService getThreadPool() {
        return this.threadPool;
    }

    public PackageRouter getPackageRouter() {
        return this.packageRouter;
    }

    public String getConnectionName() {
        StringBuilder sb = new StringBuilder("Connection: ");
        if (this.clientSocket != null) {
            sb.append("Client: ");
            if (this.clientSocket.getRemoteSocketAddress() != null) {
                sb.append(this.clientSocket.getRemoteSocketAddress().toString());
            } else {
                sb.append("-");
            }
            sb.append(" to: ");
            if (this.clientSocket.getLocalAddress() != null) {
                sb.append(this.clientSocket.getLocalAddress().toString());
            } else {
                sb.append("-");
            }
        }
        return sb.toString();
    }

    public TupleStoreManagerRegistry getStorageRegistry() {
        return this.storageRegistry;
    }

    public LockManager getLockManager() {
        return this.lockManager;
    }

    public void addConnectionClosedHandler(Consumer<ClientConnectionHandler> consumer) {
        this.serviceState.registerCallback(serviceState -> {
            if (serviceState.isInFinishedState()) {
                consumer.accept(this);
            }
        });
    }
}
