package org.apache.shardingsphere.proxy.backend.handler.cdc;

import com.google.common.base.Strings;
import io.netty.channel.Channel;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
import org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtil;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCTableRuleUtil;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.class */
public final class CDCBackendHandler {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(CDCBackendHandler.class);
    private final CDCJobAPI jobAPI = new CDCJobAPI();

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v53, types: [java.util.Collection] */
    public CDCResponse streamData(String str, StreamDataRequestBody streamDataRequestBody, CDCConnectionContext cDCConnectionContext, Channel channel) {
        HashSet<String> hashSet;
        ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(streamDataRequestBody.getDatabase());
        if (null == database) {
            return CDCResponseGenerator.failed(str, CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", streamDataRequestBody.getDatabase()));
        }
        HashSet hashSet2 = new HashSet();
        if (database.getProtocolType().isSchemaAvailable()) {
            Map parseTableExpressionWithSchema = CDCSchemaTableUtil.parseTableExpressionWithSchema(database, streamDataRequestBody.getSourceSchemaTableList());
            hashSet = (Collection) parseTableExpressionWithSchema.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
            parseTableExpressionWithSchema.forEach((str2, set) -> {
                set.forEach(str2 -> {
                    hashSet2.add(str2.isEmpty() ? str2 : String.join(".", str2, str2));
                });
            });
        } else {
            hashSet2.addAll(CDCSchemaTableUtil.parseTableExpressionWithoutSchema(database, (List) streamDataRequestBody.getSourceSchemaTableList().stream().map((v0) -> {
                return v0.getTable();
            }).collect(Collectors.toList())));
            hashSet = hashSet2;
        }
        if (hashSet.isEmpty()) {
            throw new NotFindStreamDataSourceTableException();
        }
        Optional findSingleRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
        if (!findSingleRule.isPresent()) {
            return CDCResponseGenerator.failed(str, CDCResponseErrorCode.SERVER_ERROR, "Not find sharding rule");
        }
        HashMap hashMap = new HashMap();
        for (String str3 : hashSet) {
            hashMap.put(str3, CDCTableRuleUtil.getActualDataNodes((ShardingRule) findSingleRule.get(), str3));
        }
        String createJob = this.jobAPI.createJob(new StreamDataParameter(streamDataRequestBody.getDatabase(), new LinkedList(hashSet2), streamDataRequestBody.getFull(), hashMap, database.getProtocolType() instanceof OpenGaussDatabaseType), CDCSinkType.SOCKET, new Properties());
        cDCConnectionContext.setJobId(createJob);
        startStreaming(str, createJob, cDCConnectionContext, channel);
        return CDCResponseGenerator.succeedBuilder(str).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(createJob).build()).build();
    }

    public String getDatabaseNameByJobId(String str) {
        return this.jobAPI.getJobConfiguration(str).getDatabaseName();
    }

    public CDCResponse startStreaming(String str, String str2, CDCConnectionContext cDCConnectionContext, Channel channel) {
        CDCJobConfiguration jobConfiguration = this.jobAPI.getJobConfiguration(str2);
        if (null == jobConfiguration) {
            return CDCResponseGenerator.failed(str2, CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config doesn't exist", str2));
        }
        JobConfigurationPOJO jobConfiguration2 = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(str2);
        jobConfiguration2.setDisabled(false);
        PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfiguration2);
        ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(jobConfiguration.getDatabaseName());
        CDCJob cDCJob = new CDCJob(new SocketSinkImporterConnector(channel, database, jobConfiguration.getJobShardingCount(), jobConfiguration.getSchemaTableNames(), jobConfiguration.isDecodeWithTX() ? DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType()) : null));
        PipelineJobCenter.addJob(jobConfiguration2.getJobName(), cDCJob);
        OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), cDCJob, jobConfiguration2.toJobConfiguration());
        cDCJob.setJobBootstrap(oneOffJobBootstrap);
        oneOffJobBootstrap.execute();
        cDCConnectionContext.setJobId(str2);
        return CDCResponseGenerator.succeedBuilder(str).build();
    }

    public void stopStreaming(String str) {
        if (Strings.isNullOrEmpty(str)) {
            log.warn("job id is null or empty, ignored");
            return;
        }
        PipelineJobCenter.stop(str);
        JobConfigurationPOJO jobConfiguration = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(str);
        jobConfiguration.setDisabled(true);
        PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfiguration);
    }

    public void dropStreaming(String str) throws SQLException {
        this.jobAPI.rollback(str);
    }

    public void processAck(AckStreamingRequestBody ackStreamingRequestBody) {
        CDCAckHolder.getInstance().ack(ackStreamingRequestBody.getAckId());
    }
}
