package org.apache.iotdb.db.schemaengine.schemaregion.attribute.update;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.protocol.client.dn.DnToDnInternalServiceAsyncRequestManager;
import org.apache.iotdb.db.protocol.client.dn.DnToDnRequestType;
import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceAttributeCommitUpdateNode;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.schemaregion.impl.SchemaRegionMemoryImpl;
import org.apache.iotdb.mpp.rpc.thrift.TAttributeUpdateReq;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaRegionAttributeInfo;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.class */
public class GeneralRegionAttributeSecurityService implements IService {
    private static final Logger LOGGER = LoggerFactory.getLogger(GeneralRegionAttributeSecurityService.class);
    private static final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
    private final ExecutorService securityServiceExecutor;
    private final Map<Integer, Pair<Long, Integer>> dataNodeId2FailureDurationAndTimesMap;
    private final Set<ISchemaRegion> regionLeaders;
    private final Map<SchemaRegionId, String> regionId2DatabaseMap;
    private final ReentrantLock lock;
    private final Condition condition;
    private volatile boolean skipNextSleep;
    private volatile boolean allowSubmitListen;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService$GeneralRegionAttributeSecurityServiceHolder.class */
    public static final class GeneralRegionAttributeSecurityServiceHolder {
        private static final GeneralRegionAttributeSecurityService INSTANCE = new GeneralRegionAttributeSecurityService();

        private GeneralRegionAttributeSecurityServiceHolder() {
        }
    }

    public void startBroadcast(ISchemaRegion iSchemaRegion) {
        if ((iSchemaRegion instanceof SchemaRegionMemoryImpl) && PathUtils.isTableModelDatabase(iSchemaRegion.getDatabaseFullPath())) {
            this.regionId2DatabaseMap.put(iSchemaRegion.getSchemaRegionId(), iSchemaRegion.getDatabaseFullPath());
            this.regionLeaders.add(iSchemaRegion);
        }
    }

    public void stopBroadcast(ISchemaRegion iSchemaRegion) {
        this.regionLeaders.remove(iSchemaRegion);
    }

    public void notifyBroadCast() {
        if (!this.lock.tryLock()) {
            this.skipNextSleep = true;
            return;
        }
        try {
            this.condition.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    private void execute() {
        this.lock.lock();
        try {
            try {
                AtomicInteger atomicInteger = new AtomicInteger(CommonDescriptor.getInstance().getConfig().getPipeConnectorRequestSliceThresholdBytes());
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                HashMap hashMap = new HashMap();
                for (ISchemaRegion iSchemaRegion : this.regionLeaders) {
                    Pair<Long, Map<TDataNodeLocation, byte[]>> attributeUpdateInfo = iSchemaRegion.getAttributeUpdateInfo(atomicInteger, atomicBoolean);
                    if (((Map) attributeUpdateInfo.getRight()).isEmpty()) {
                        break;
                    } else {
                        hashMap.put(iSchemaRegion.getSchemaRegionId(), attributeUpdateInfo);
                    }
                }
                if (atomicBoolean.get()) {
                    this.skipNextSleep = true;
                }
                if (!hashMap.isEmpty()) {
                    Map<SchemaRegionId, Set<TDataNodeLocation>> sendUpdateRequestAndMayShrink = sendUpdateRequestAndMayShrink(hashMap);
                    hashMap.forEach((schemaRegionId, pair) -> {
                        if (new RegionWriteExecutor().execute(schemaRegionId, new TableDeviceAttributeCommitUpdateNode(new PlanNodeId(""), ((Long) pair.getLeft()).longValue(), (Map) pair.getRight(), (Set) sendUpdateRequestAndMayShrink.getOrDefault(schemaRegionId, Collections.emptySet()), new TDataNodeLocation(iotdbConfig.getDataNodeId(), (TEndPoint) null, new TEndPoint(iotdbConfig.getInternalAddress(), iotdbConfig.getInternalPort()), (TEndPoint) null, (TEndPoint) null, (TEndPoint) null))).isAccepted()) {
                            return;
                        }
                        this.skipNextSleep = false;
                        LOGGER.warn("Failed to write attribute commit message to region {}.", schemaRegionId);
                    });
                }
                if (!this.skipNextSleep) {
                    this.condition.await(iotdbConfig.getGeneralRegionAttributeSecurityServiceIntervalSeconds(), TimeUnit.SECONDS);
                }
                this.skipNextSleep = false;
                this.lock.unlock();
                if (this.allowSubmitListen) {
                    this.securityServiceExecutor.submit(this::execute);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOGGER.warn("Interrupted when waiting for the next attribute broadcasting: {}", e.getMessage());
                this.lock.unlock();
                if (this.allowSubmitListen) {
                    this.securityServiceExecutor.submit(this::execute);
                }
            }
        } catch (Throwable th) {
            this.lock.unlock();
            if (this.allowSubmitListen) {
                this.securityServiceExecutor.submit(this::execute);
            }
            throw th;
        }
    }

    @Nonnull
    private Map<SchemaRegionId, Set<TDataNodeLocation>> detectNodeShrinkage(Map<SchemaRegionId, Pair<Long, Map<TDataNodeLocation, byte[]>>> map) {
        try {
            ConfigNodeClient configNodeClient = (ConfigNodeClient) ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
            try {
                TShowClusterResp showCluster = configNodeClient.showCluster();
                if (configNodeClient != null) {
                    configNodeClient.close();
                }
                this.dataNodeId2FailureDurationAndTimesMap.clear();
                HashSet hashSet = new HashSet();
                showCluster.getDataNodeList().forEach(tDataNodeLocation -> {
                    tDataNodeLocation.setDataRegionConsensusEndPoint((TEndPoint) null);
                    tDataNodeLocation.setMPPDataExchangeEndPoint((TEndPoint) null);
                    tDataNodeLocation.setSchemaRegionConsensusEndPoint((TEndPoint) null);
                    tDataNodeLocation.setClientRpcEndPoint((TEndPoint) null);
                    hashSet.add(tDataNodeLocation);
                });
                return (Map) map.entrySet().stream().filter(entry -> {
                    Set keySet = ((Map) ((Pair) entry.getValue()).getRight()).keySet();
                    Objects.requireNonNull(hashSet);
                    keySet.removeIf((v1) -> {
                        return r1.contains(v1);
                    });
                    return !((Map) ((Pair) entry.getValue()).getRight()).isEmpty();
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry2 -> {
                    return ((Map) ((Pair) entry2.getValue()).getRight()).keySet();
                }));
            } finally {
            }
        } catch (ClientManagerException | TException e) {
            LOGGER.warn("Failed to fetch dataNodeLocations, will retry.");
            return Collections.emptyMap();
        }
    }

    @Nonnull
    private Map<SchemaRegionId, Set<TDataNodeLocation>> sendUpdateRequestAndMayShrink(Map<SchemaRegionId, Pair<Long, Map<TDataNodeLocation, byte[]>>> map) {
        AsyncRequestContext asyncRequestContext = new AsyncRequestContext(DnToDnRequestType.UPDATE_ATTRIBUTE);
        map.forEach((schemaRegionId, pair) -> {
            ((Map) pair.getRight()).forEach((tDataNodeLocation, bArr) -> {
                asyncRequestContext.putNodeLocation(tDataNodeLocation.getDataNodeId(), tDataNodeLocation);
                asyncRequestContext.putRequestIfAbsent(tDataNodeLocation.getDataNodeId(), new TAttributeUpdateReq(new HashMap()));
                ((TAttributeUpdateReq) asyncRequestContext.getRequest(tDataNodeLocation.getDataNodeId())).getAttributeUpdateMap().put(Integer.valueOf(schemaRegionId.getId()), new TSchemaRegionAttributeInfo(((Long) pair.getLeft()).longValue(), this.regionId2DatabaseMap.get(schemaRegionId), ByteBuffer.wrap(bArr)));
            });
        });
        DnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithTimeoutInMs(asyncRequestContext, IoTDBDescriptor.getInstance().getConfig().getGeneralRegionAttributeSecurityServiceTimeoutSeconds());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Set set = (Set) asyncRequestContext.getResponseMap().entrySet().stream().filter(entry -> {
            boolean z = ((TSStatus) entry.getValue()).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode();
            if (z) {
                this.dataNodeId2FailureDurationAndTimesMap.compute((Integer) entry.getKey(), (num, pair2) -> {
                    if (Objects.isNull(pair2)) {
                        return new Pair(Long.valueOf(System.currentTimeMillis()), 1);
                    }
                    pair2.setRight(Integer.valueOf(((Integer) pair2.getRight()).intValue() + 1));
                    if (System.currentTimeMillis() - ((Long) pair2.getLeft()).longValue() >= iotdbConfig.getGeneralRegionAttributeSecurityServiceFailureDurationSecondsToFetch() || ((Integer) pair2.getRight()).intValue() >= iotdbConfig.getGeneralRegionAttributeSecurityServiceFailureTimesToFetch()) {
                        atomicBoolean.set(true);
                    }
                    return pair2;
                });
            } else {
                this.dataNodeId2FailureDurationAndTimesMap.remove(entry.getKey());
            }
            return z;
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet());
        Map<SchemaRegionId, Set<TDataNodeLocation>> detectNodeShrinkage = atomicBoolean.get() ? detectNodeShrinkage(map) : Collections.emptyMap();
        Iterator<Map.Entry<SchemaRegionId, Pair<Long, Map<TDataNodeLocation, byte[]>>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<SchemaRegionId, Pair<Long, Map<TDataNodeLocation, byte[]>>> next = it.next();
            Map map2 = (Map) next.getValue().getRight();
            map2.entrySet().removeIf(entry2 -> {
                return set.contains(Integer.valueOf(((TDataNodeLocation) entry2.getKey()).getDataNodeId()));
            });
            if (map2.isEmpty() && !detectNodeShrinkage.containsKey(next.getKey())) {
                it.remove();
            }
        }
        return detectNodeShrinkage;
    }

    public void start() throws StartupException {
        this.allowSubmitListen = true;
        this.securityServiceExecutor.submit(this::execute);
        LOGGER.info("General region attribute security service is started successfully.");
    }

    public void stop() {
        this.allowSubmitListen = false;
        this.securityServiceExecutor.shutdown();
        LOGGER.info("General region attribute security service is stopped successfully.");
    }

    public ServiceType getID() {
        return ServiceType.GENERAL_REGION_ATTRIBUTE_SECURITY_SERVICE;
    }

    private GeneralRegionAttributeSecurityService() {
        this.securityServiceExecutor = IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.GENERAL_REGION_ATTRIBUTE_SECURITY_SERVICE.getName());
        this.dataNodeId2FailureDurationAndTimesMap = new HashMap();
        this.regionLeaders = Collections.newSetFromMap(new ConcurrentHashMap());
        this.regionId2DatabaseMap = new ConcurrentHashMap();
        this.lock = new ReentrantLock();
        this.condition = this.lock.newCondition();
        this.skipNextSleep = false;
        this.allowSubmitListen = false;
    }

    public static GeneralRegionAttributeSecurityService getInstance() {
        return GeneralRegionAttributeSecurityServiceHolder.INSTANCE;
    }
}
