package org.apache.iotdb.cluster.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
import org.apache.iotdb.cluster.query.ClusterPlanner;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.TSServiceImpl;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/cluster/server/ClientServer.class */
public class ClientServer extends TSServiceImpl {
    private static final Logger logger = LoggerFactory.getLogger(ClientServer.class);
    private Coordinator coordinator;
    private ExecutorService serverService;
    private TServer poolServer;
    private TServerTransport serverTransport;
    private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/iotdb/cluster/server/ClientServer$EventHandler.class */
    public class EventHandler implements TServerEventHandler {
        EventHandler() {
        }

        public void preServe() {
        }

        public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol2) {
            return null;
        }

        public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol2) {
            ClientServer.this.handleClientExit();
        }

        public void processContext(ServerContext serverContext, TTransport tTransport, TTransport tTransport2) {
        }
    }

    public void setCoordinator(Coordinator coordinator) {
        this.coordinator = coordinator;
    }

    public ClientServer(MetaGroupMember metaGroupMember) throws QueryProcessException {
        this.processor = new ClusterPlanner();
        this.executor = new ClusterPlanExecutor(metaGroupMember);
    }

    public void start() throws TTransportException {
        if (this.serverService != null) {
            return;
        }
        this.serverService = Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, "ClusterClientServer");
        });
        ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
        TCompactProtocol.Factory factory = IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable() ? new TCompactProtocol.Factory() : new TBinaryProtocol.Factory();
        this.serverTransport = new TServerSocket(new InetSocketAddress(IoTDBDescriptor.getInstance().getConfig().getRpcAddress(), config.getClusterRpcPort()));
        TThreadPoolServer.Args minWorkerThreads = new TThreadPoolServer.Args(this.serverTransport).maxWorkerThreads(Math.max(CommonUtils.getCpuCores(), config.getMaxConcurrentClientNum())).minWorkerThreads(CommonUtils.getCpuCores());
        minWorkerThreads.executorService(new ThreadPoolExecutor(minWorkerThreads.minWorkerThreads, minWorkerThreads.maxWorkerThreads, minWorkerThreads.stopTimeoutVal, minWorkerThreads.stopTimeoutUnit, new SynchronousQueue(), new ThreadFactory() { // from class: org.apache.iotdb.cluster.server.ClientServer.1
            private AtomicLong threadIndex = new AtomicLong(0);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable2) {
                return new Thread(runnable2, "ClusterClient-" + this.threadIndex.incrementAndGet());
            }
        }));
        minWorkerThreads.processor(new TSIService.Processor(this));
        minWorkerThreads.protocolFactory(factory);
        minWorkerThreads.transportFactory(RpcTransportFactory.INSTANCE);
        this.poolServer = new TThreadPoolServer(minWorkerThreads);
        this.poolServer.setServerEventHandler(new EventHandler());
        this.serverService.submit(() -> {
            this.poolServer.serve();
        });
        logger.info("Client service is set up");
    }

    public void stop() {
        if (this.serverService == null) {
            return;
        }
        this.poolServer.stop();
        this.serverService.shutdownNow();
        this.serverTransport.close();
    }

    protected TSStatus executeNonQueryPlan(PhysicalPlan physicalPlan) {
        try {
            physicalPlan.checkIntegrity();
            if ((physicalPlan instanceof SetSystemModePlan) || (physicalPlan instanceof FlushPlan) || !IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
                return this.coordinator.executeNonQueryPlan(physicalPlan);
            }
            throw new QueryProcessException("Current system mode is read-only, does not support non-query operation");
        } catch (QueryProcessException e) {
            logger.warn("Illegal plan detected： {}", physicalPlan);
            return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
        }
    }

    protected List<TSDataType> getSeriesTypesByPaths(List<PartialPath> list, List<String> list2) throws MetadataException {
        return (List) ((CMManager) IoTDB.metaManager).getSeriesTypesByPath(list, list2).left;
    }

    protected List<TSDataType> getSeriesTypesByString(List<PartialPath> list, String str) throws MetadataException {
        return (List) ((CMManager) IoTDB.metaManager).getSeriesTypesByPaths(list, str).left;
    }

    protected QueryContext genQueryContext(long j, boolean z) {
        RemoteQueryContext remoteQueryContext = new RemoteQueryContext(j, z);
        this.queryContextMap.put(Long.valueOf(j), remoteQueryContext);
        return remoteQueryContext;
    }

    protected void releaseQueryResource(long j) throws StorageEngineException {
        super.releaseQueryResource(j);
        RemoteQueryContext remove = this.queryContextMap.remove(Long.valueOf(j));
        if (remove != null) {
            loop0: for (Map.Entry<Node, Set<Node>> entry : remove.getQueriedNodesMap().entrySet()) {
                Node key = entry.getKey();
                for (Node node : entry.getValue()) {
                    GenericHandler genericHandler = new GenericHandler(node, new AtomicReference());
                    try {
                        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
                            this.coordinator.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()).endQuery(key, this.coordinator.getThisNode(), j, genericHandler);
                        } else {
                            SyncDataClient syncDataClient = this.coordinator.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
                            try {
                                try {
                                    syncDataClient.endQuery(key, this.coordinator.getThisNode(), j);
                                    if (syncDataClient != null) {
                                        syncDataClient.close();
                                    }
                                } catch (TException e) {
                                    syncDataClient.getInputProtocol().getTransport().close();
                                    throw e;
                                    break loop0;
                                }
                            } finally {
                            }
                        }
                    } catch (IOException | TException e2) {
                        logger.error("Cannot end query {} in {}", Long.valueOf(j), node);
                    }
                }
            }
        }
    }
}
