package org.apache.iotdb.db.pipe.connector.protocol.writeback;

import java.io.IOException;
import java.time.ZoneId;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.DatabaseNotSetException;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReqV2;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.InternalClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.class */
public class WriteBackConnector implements PipeConnector {
    private IClientSession session;
    private static final Logger LOGGER = LoggerFactory.getLogger(WriteBackConnector.class);
    private static final Coordinator COORDINATOR = Coordinator.getInstance();
    private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
    private static final String TREE_MODEL_DATABASE_NAME_IDENTIFIER = null;
    private static final SqlParser RELATIONAL_SQL_PARSER = new SqlParser();
    private static final Set<String> ALREADY_CREATED_DATABASES = ConcurrentHashMap.newKeySet();

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
    }

    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
        PipeRuntimeEnvironment runtimeEnvironment = pipeConnectorRuntimeConfiguration.getRuntimeEnvironment();
        this.session = new InternalClientSession(String.format("%s_%s_%s_%s", WriteBackConnector.class.getSimpleName(), runtimeEnvironment.getPipeName(), Long.valueOf(runtimeEnvironment.getCreationTime()), Integer.valueOf(runtimeEnvironment.getRegionId())));
        this.session.setUsername(AuthorityChecker.SUPER_USER);
        this.session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
        this.session.setZoneId(ZoneId.systemDefault());
    }

    public void handshake() throws Exception {
    }

    public void heartbeat() throws Exception {
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("WriteBackConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. Ignore {}.", tabletInsertionEvent);
        } else if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
            doTransferWrapper((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
        } else {
            doTransferWrapper((PipeRawTabletInsertionEvent) tabletInsertionEvent);
        }
    }

    private void doTransferWrapper(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, WALPipeException, IOException {
        if (pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(WriteBackConnector.class.getName())) {
            try {
                doTransfer(pipeInsertNodeTabletInsertionEvent);
            } finally {
                pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(WriteBackConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, WALPipeException, IOException {
        InsertNode insertNodeViaCacheIfPossible = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
        String tableModelDatabaseName = pipeInsertNodeTabletInsertionEvent.isTableModelEvent() ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() : TREE_MODEL_DATABASE_NAME_IDENTIFIER;
        InsertBaseStatement constructStatement = Objects.isNull(insertNodeViaCacheIfPossible) ? PipeTransferTabletBinaryReqV2.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getByteBuffer(), tableModelDatabaseName).constructStatement() : PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(insertNodeViaCacheIfPossible, tableModelDatabaseName).constructStatement();
        TSStatus executeStatementForTableModel = constructStatement.isWriteToTable() ? executeStatementForTableModel(constructStatement, tableModelDatabaseName) : executeStatementForTreeModel(constructStatement);
        if (executeStatementForTableModel.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() && executeStatementForTableModel.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(String.format("Write back PipeInsertNodeTabletInsertionEvent %s error, result status %s", pipeInsertNodeTabletInsertionEvent, executeStatementForTableModel));
        }
    }

    private void doTransferWrapper(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException {
        if (pipeRawTabletInsertionEvent.increaseReferenceCount(WriteBackConnector.class.getName())) {
            try {
                doTransfer(pipeRawTabletInsertionEvent);
            } finally {
                pipeRawTabletInsertionEvent.decreaseReferenceCount(WriteBackConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException {
        String tableModelDatabaseName = pipeRawTabletInsertionEvent.isTableModelEvent() ? pipeRawTabletInsertionEvent.getTableModelDatabaseName() : TREE_MODEL_DATABASE_NAME_IDENTIFIER;
        InsertTabletStatement constructStatement = PipeTransferTabletRawReqV2.toTPipeTransferRawReq(pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned(), tableModelDatabaseName).constructStatement();
        TSStatus executeStatementForTableModel = constructStatement.isWriteToTable() ? executeStatementForTableModel(constructStatement, tableModelDatabaseName) : executeStatementForTreeModel(constructStatement);
        if (executeStatementForTableModel.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() && executeStatementForTableModel.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(String.format("Write back PipeRawTabletInsertionEvent %s error, result status %s", pipeRawTabletInsertionEvent, executeStatementForTableModel));
        }
    }

    public void transfer(Event event) throws Exception {
    }

    public void close() throws Exception {
        if (this.session != null) {
            SessionManager sessionManager = SESSION_MANAGER;
            IClientSession iClientSession = this.session;
            Coordinator coordinator = COORDINATOR;
            Objects.requireNonNull(coordinator);
            sessionManager.closeSession(iClientSession, (v1) -> {
                r2.cleanupQueryExecution(v1);
            });
        }
    }

    private TSStatus executeStatementForTableModel(Statement statement, String str) {
        this.session.setDatabaseName(str);
        this.session.setSqlDialect(IClientSession.SqlDialect.TABLE);
        SESSION_MANAGER.registerSession(this.session);
        try {
            try {
                autoCreateDatabaseIfNecessary(str);
                TSStatus tSStatus = Coordinator.getInstance().executeForTableModel(new PipeEnrichedStatement(statement), RELATIONAL_SQL_PARSER, this.session, SESSION_MANAGER.requestQueryId(), SESSION_MANAGER.getSessionInfoOfPipeReceiver(this.session, str), "", LocalExecutionPlanner.getInstance().metadata, IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()).status;
                SESSION_MANAGER.removeCurrSession();
                return tSStatus;
            } catch (Exception e) {
                ALREADY_CREATED_DATABASES.remove(str);
                Throwable rootCause = ErrorHandlingUtils.getRootCause(e);
                if (rootCause.getMessage() == null || !rootCause.getMessage().toLowerCase(Locale.ENGLISH).contains(DatabaseNotSetException.DATABASE_NOT_SET.toLowerCase(Locale.ENGLISH))) {
                    throw e;
                }
                autoCreateDatabaseIfNecessary(str);
                this.session.setDatabaseName(str);
                TSStatus tSStatus2 = Coordinator.getInstance().executeForTableModel(new PipeEnrichedStatement(statement), RELATIONAL_SQL_PARSER, this.session, SESSION_MANAGER.requestQueryId(), SESSION_MANAGER.getSessionInfo(this.session), "", LocalExecutionPlanner.getInstance().metadata, IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()).status;
                SESSION_MANAGER.removeCurrSession();
                return tSStatus2;
            }
        } catch (Throwable th) {
            SESSION_MANAGER.removeCurrSession();
            throw th;
        }
    }

    private void autoCreateDatabaseIfNecessary(String str) {
        if (ALREADY_CREATED_DATABASES.contains(str)) {
            return;
        }
        TDatabaseSchema tDatabaseSchema = new TDatabaseSchema(new TDatabaseSchema(str));
        tDatabaseSchema.setIsTableModel(true);
        try {
            ConfigTaskResult configTaskResult = (ConfigTaskResult) new CreateDBTask(tDatabaseSchema, true).execute(ClusterConfigTaskExecutor.getInstance()).get();
            int statusCode = configTaskResult.getStatusCode().getStatusCode();
            if (statusCode != TSStatusCode.SUCCESS_STATUS.getStatusCode() && statusCode != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
                throw new PipeException(String.format("Auto create database failed: %s, status code: %s", str, configTaskResult.getStatusCode()));
            }
            ALREADY_CREATED_DATABASES.add(str);
        } catch (InterruptedException | ExecutionException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new PipeException("Auto create database failed because: " + e.getMessage());
        }
    }

    private TSStatus executeStatementForTreeModel(Statement statement) {
        this.session.setDatabaseName(null);
        this.session.setSqlDialect(IClientSession.SqlDialect.TREE);
        SESSION_MANAGER.registerSession(this.session);
        try {
            TSStatus tSStatus = Coordinator.getInstance().executeForTreeModel(new PipeEnrichedStatement(statement), SESSION_MANAGER.requestQueryId(), SESSION_MANAGER.getSessionInfo(this.session), "", ClusterPartitionFetcher.getInstance(), ClusterSchemaFetcher.getInstance(), IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), false).status;
            SESSION_MANAGER.removeCurrSession();
            return tSStatus;
        } catch (Throwable th) {
            SESSION_MANAGER.removeCurrSession();
            throw th;
        }
    }
}
