package org.apache.iotdb.cluster.query.fill;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.QueryTimeOutException;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.handlers.caller.PreviousFillHandler;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.PartitionUtils;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.fill.PreviousFill;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.class */
public class ClusterPreviousFill extends PreviousFill {
    private static final Logger logger = LoggerFactory.getLogger(ClusterPreviousFill.class);
    private MetaGroupMember metaGroupMember;
    private TimeValuePair fillResult;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClusterPreviousFill(PreviousFill previousFill, MetaGroupMember metaGroupMember) {
        super(previousFill.getDataType(), previousFill.getQueryTime(), previousFill.getBeforeRange());
        this.metaGroupMember = metaGroupMember;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClusterPreviousFill(TSDataType tSDataType, long j, long j2, MetaGroupMember metaGroupMember) {
        super(tSDataType, j, j2);
        this.metaGroupMember = metaGroupMember;
    }

    public void configureFill(PartialPath partialPath, TSDataType tSDataType, long j, Set<String> set, QueryContext queryContext) {
        try {
            this.fillResult = performPreviousFill(partialPath, tSDataType, j, getBeforeRange(), set, queryContext);
        } catch (StorageEngineException e) {
            logger.error("Failed to configure previous fill for Path {}", partialPath, e);
        }
    }

    public TimeValuePair getFillResult() {
        return this.fillResult;
    }

    private TimeValuePair performPreviousFill(PartialPath partialPath, TSDataType tSDataType, long j, long j2, Set<String> set, QueryContext queryContext) throws StorageEngineException {
        try {
            this.metaGroupMember.syncLeaderWithConsistencyCheck(false);
            PartitionUtils.Intervals intervals = new PartitionUtils.Intervals();
            intervals.addInterval(j2 == -1 ? Long.MIN_VALUE : j - j2, j);
            List<PartitionGroup> routeIntervals = this.metaGroupMember.routeIntervals(intervals, partialPath);
            if (logger.isDebugEnabled()) {
                logger.debug("{}: Sending data query of {} to {} groups", new Object[]{this.metaGroupMember.getName(), partialPath, Integer.valueOf(routeIntervals.size())});
            }
            PreviousFillHandler previousFillHandler = new PreviousFillHandler(new CountDownLatch(routeIntervals.size()));
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(routeIntervals.size());
            PreviousFillArguments previousFillArguments = new PreviousFillArguments(partialPath, tSDataType, j, j2, set);
            for (PartitionGroup partitionGroup : routeIntervals) {
                newFixedThreadPool.submit(() -> {
                    performPreviousFill(previousFillArguments, queryContext, partitionGroup, previousFillHandler);
                });
            }
            newFixedThreadPool.shutdown();
            try {
                newFixedThreadPool.awaitTermination(RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.error("Unexpected interruption when waiting for fill pool to stop", e);
            }
            return previousFillHandler.getResult();
        } catch (CheckConsistencyException e2) {
            throw new StorageEngineException(e2);
        }
    }

    private void performPreviousFill(PreviousFillArguments previousFillArguments, QueryContext queryContext, PartitionGroup partitionGroup, PreviousFillHandler previousFillHandler) {
        if (partitionGroup.contains(this.metaGroupMember.getThisNode())) {
            localPreviousFill(previousFillArguments, queryContext, partitionGroup, previousFillHandler);
        } else {
            remotePreviousFill(previousFillArguments, queryContext, partitionGroup, previousFillHandler);
        }
    }

    private void localPreviousFill(PreviousFillArguments previousFillArguments, QueryContext queryContext, PartitionGroup partitionGroup, PreviousFillHandler previousFillHandler) {
        try {
            previousFillHandler.onComplete(this.metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).getLocalQueryExecutor().localPreviousFill(previousFillArguments.getPath(), previousFillArguments.getDataType(), previousFillArguments.getQueryTime(), previousFillArguments.getBeforeRange(), previousFillArguments.getDeviceMeasurements(), queryContext));
        } catch (QueryProcessException | StorageEngineException | IOException e) {
            previousFillHandler.onError(e);
        }
    }

    private void remotePreviousFill(PreviousFillArguments previousFillArguments, QueryContext queryContext, PartitionGroup partitionGroup, PreviousFillHandler previousFillHandler) {
        PreviousFillRequest previousFillRequest = new PreviousFillRequest(previousFillArguments.getPath().getFullPath(), previousFillArguments.getQueryTime(), previousFillArguments.getBeforeRange(), queryContext.getQueryId(), this.metaGroupMember.getThisNode(), partitionGroup.getHeader(), previousFillArguments.getDataType().ordinal(), previousFillArguments.getDeviceMeasurements());
        Iterator<Node> it = partitionGroup.iterator();
        while (it.hasNext()) {
            Node next = it.next();
            ByteBuffer remoteAsyncPreviousFill = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer() ? remoteAsyncPreviousFill(next, previousFillRequest, previousFillArguments) : remoteSyncPreviousFill(next, previousFillRequest, previousFillArguments);
            if (remoteAsyncPreviousFill != null) {
                previousFillHandler.onComplete(remoteAsyncPreviousFill);
                return;
            }
        }
        previousFillHandler.onError(new QueryTimeOutException(String.format("PreviousFill %s@%d range: %d", previousFillArguments.getPath().getFullPath(), Long.valueOf(previousFillArguments.getQueryTime()), Long.valueOf(previousFillArguments.getBeforeRange()))));
    }

    private ByteBuffer remoteAsyncPreviousFill(Node node, PreviousFillRequest previousFillRequest, PreviousFillArguments previousFillArguments) {
        ByteBuffer byteBuffer = null;
        try {
            try {
                byteBuffer = SyncClientAdaptor.previousFill(this.metaGroupMember.getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()), previousFillRequest);
            } catch (Exception e) {
                logger.error("{}: Cannot perform previous fill of {} to {}", new Object[]{this.metaGroupMember, previousFillArguments.getPath(), node, e});
            }
            return byteBuffer;
        } catch (IOException e2) {
            logger.warn("{}: Cannot connect to {} during previous fill", this.metaGroupMember, node);
            return null;
        }
    }

    private ByteBuffer remoteSyncPreviousFill(Node node, PreviousFillRequest previousFillRequest, PreviousFillArguments previousFillArguments) {
        ByteBuffer byteBuffer = null;
        SyncDataClient syncDataClient = null;
        try {
            try {
                try {
                    syncDataClient = this.metaGroupMember.getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
                    byteBuffer = syncDataClient.previousFill(previousFillRequest);
                    if (syncDataClient != null) {
                        ClientUtils.putBackSyncClient(syncDataClient);
                    }
                } catch (TException e) {
                    logger.error("{}: Cannot perform previous fill of {} to {}", new Object[]{this.metaGroupMember.getName(), previousFillArguments.getPath(), node, e});
                    syncDataClient.getInputProtocol().getTransport().close();
                    if (syncDataClient != null) {
                        ClientUtils.putBackSyncClient(syncDataClient);
                    }
                }
            } catch (IOException e2) {
                logger.warn("{}: Cannot connect to {} during previous fill", this.metaGroupMember, node);
                if (syncDataClient != null) {
                    ClientUtils.putBackSyncClient(syncDataClient);
                }
            }
            return byteBuffer;
        } catch (Throwable th) {
            if (syncDataClient != null) {
                ClientUtils.putBackSyncClient(syncDataClient);
            }
            throw th;
        }
    }
}
