package org.apache.samza.sql.client.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.serializers.StringSerdeFactory;
import org.apache.samza.sql.client.exceptions.ExecutorException;
import org.apache.samza.sql.client.interfaces.EnvironmentVariableHandler;
import org.apache.samza.sql.client.interfaces.EnvironmentVariableHandlerImpl;
import org.apache.samza.sql.client.interfaces.EnvironmentVariableSpecs;
import org.apache.samza.sql.client.interfaces.ExecutionContext;
import org.apache.samza.sql.client.interfaces.ExecutionStatus;
import org.apache.samza.sql.client.interfaces.NonQueryResult;
import org.apache.samza.sql.client.interfaces.QueryResult;
import org.apache.samza.sql.client.interfaces.SqlExecutor;
import org.apache.samza.sql.client.interfaces.SqlFunction;
import org.apache.samza.sql.client.util.Pair;
import org.apache.samza.sql.client.util.RandomAccessQueue;
import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory;
import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
import org.apache.samza.sql.schema.SamzaSqlFieldType;
import org.apache.samza.sql.schema.SqlFieldSchema;
import org.apache.samza.sql.schema.SqlSchema;
import org.apache.samza.sql.util.JsonUtil;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.kafka.KafkaSystemFactory;
import org.apache.samza.tools.avro.AvroSchemaGenRelConverterFactory;
import org.apache.samza.tools.avro.AvroSerDeFactory;
import org.apache.samza.tools.json.JsonRelConverterFactory;
import org.apache.samza.tools.schemas.ProfileChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/sql/client/impl/SamzaExecutor.class */
public class SamzaExecutor implements SqlExecutor {
    private static final String SAMZA_SYSTEM_LOG = "log";
    private static final String SAMZA_SYSTEM_KAFKA = "kafka";
    private static final String SAMZA_SQL_OUTPUT = "samza.sql.output";
    private static final String SAMZA_SQL_SYSTEM_KAFKA_ADDRESS = "samza.sql.system.kafka.address";
    private static final String DEFAULT_SERVER_ADDRESS = "localhost:2181";
    private static final int DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT = 20000;
    private static final String ZOOKEEPER_BROKERS_TOPICS_PATH = "/brokers/topics";
    private Map<Integer, SamzaSqlApplicationRunner> executions = new HashMap();
    private SamzaExecutorEnvironmentVariableHandler environmentVariableHandler = new SamzaExecutorEnvironmentVariableHandler();
    private static final Logger LOG = LoggerFactory.getLogger(SamzaExecutor.class);
    private static final int RANDOM_ACCESS_QUEUE_CAPACITY = 5000;
    private static RandomAccessQueue<OutgoingMessageEnvelope> outputData = new RandomAccessQueue<>(OutgoingMessageEnvelope.class, RANDOM_ACCESS_QUEUE_CAPACITY);
    private static AtomicInteger execIdSeq = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.samza.sql.client.impl.SamzaExecutor$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/samza/sql/client/impl/SamzaExecutor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$samza$job$ApplicationStatus$StatusCode = new int[ApplicationStatus.StatusCode.values().length];

        static {
            try {
                $SwitchMap$org$apache$samza$job$ApplicationStatus$StatusCode[ApplicationStatus.StatusCode.New.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$samza$job$ApplicationStatus$StatusCode[ApplicationStatus.StatusCode.Running.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$samza$job$ApplicationStatus$StatusCode[ApplicationStatus.StatusCode.SuccessfulFinish.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$samza$job$ApplicationStatus$StatusCode[ApplicationStatus.StatusCode.UnsuccessfulFinish.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/sql/client/impl/SamzaExecutor$MessageFormat.class */
    public enum MessageFormat {
        PRETTY,
        COMPACT
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/samza/sql/client/impl/SamzaExecutor$SamzaExecutorEnvironmentVariableHandler.class */
    public class SamzaExecutorEnvironmentVariableHandler extends EnvironmentVariableHandlerImpl {
        SamzaExecutorEnvironmentVariableHandler() {
        }

        @Override // org.apache.samza.sql.client.interfaces.EnvironmentVariableHandlerImpl
        protected EnvironmentVariableSpecs initializeEnvironmentVariableSpecs() {
            HashMap hashMap = new HashMap();
            hashMap.put(SamzaExecutor.SAMZA_SQL_OUTPUT, new EnvironmentVariableSpecs.Spec(new String[]{"pretty", "compact"}, "compact"));
            return new EnvironmentVariableSpecs(hashMap);
        }

        @Override // org.apache.samza.sql.client.interfaces.EnvironmentVariableHandlerImpl
        protected boolean processEnvironmentVariable(String str, String str2) {
            return SamzaExecutor.this.processEnvironmentVariable(str, str2);
        }

        @Override // org.apache.samza.sql.client.interfaces.EnvironmentVariableHandlerImpl
        protected boolean isAcceptUnknowName() {
            return true;
        }
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public void start(ExecutionContext executionContext) {
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public void stop(ExecutionContext executionContext) throws ExecutorException {
        Iterator<Integer> it = this.executions.keySet().iterator();
        while (it.hasNext()) {
            stopExecution(executionContext, it.next().intValue());
            it.remove();
        }
        outputData.clear();
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public EnvironmentVariableHandler getEnvironmentVariableHandler() {
        return this.environmentVariableHandler;
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public List<String> listTables(ExecutionContext executionContext) throws ExecutorException {
        String environmentVariable = this.environmentVariableHandler.getEnvironmentVariable(SAMZA_SQL_SYSTEM_KAFKA_ADDRESS);
        if (environmentVariable == null || environmentVariable.isEmpty()) {
            environmentVariable = DEFAULT_SERVER_ADDRESS;
        }
        try {
            return (List) new ZkClient(environmentVariable, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT).getChildren(ZOOKEEPER_BROKERS_TOPICS_PATH).stream().map(str -> {
                return "kafka." + str;
            }).collect(Collectors.toList());
        } catch (ZkTimeoutException e) {
            throw new ExecutorException((Throwable) e);
        }
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public SqlSchema getTableSchema(ExecutionContext executionContext, String str) throws ExecutorException {
        MapConfig mapConfig = new MapConfig(fetchSamzaSqlConfig(execIdSeq.incrementAndGet()));
        try {
            SqlIOConfig fetchSourceInfo = SamzaSqlApplicationConfig.createIOResolver(mapConfig).fetchSourceInfo(str);
            return ((RelSchemaProvider) SamzaSqlApplicationConfig.initializePlugin("RelSchemaProvider", fetchSourceInfo.getRelSchemaProviderName(), mapConfig, "samza.sql.relSchemaProvider.%s.", (obj, config) -> {
                return ((RelSchemaProviderFactory) obj).create(fetchSourceInfo.getSystemStream(), config);
            })).getSqlSchema();
        } catch (SamzaException e) {
            throw new ExecutorException((Throwable) e);
        }
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public QueryResult executeQuery(ExecutionContext executionContext, String str) throws ExecutorException {
        outputData.clear();
        int incrementAndGet = execIdSeq.incrementAndGet();
        Map<String, String> fetchSamzaSqlConfig = fetchSamzaSqlConfig(incrementAndGet);
        fetchSamzaSqlConfig.put("samza.sql.stmts.json", JsonUtil.toJson(formatSqlStmts(Collections.singletonList(str))));
        try {
            SamzaSqlApplicationRunner samzaSqlApplicationRunner = new SamzaSqlApplicationRunner(true, new MapConfig(fetchSamzaSqlConfig));
            samzaSqlApplicationRunner.run((ExternalContext) null);
            this.executions.put(Integer.valueOf(incrementAndGet), samzaSqlApplicationRunner);
            LOG.debug("Executing sql. Id ", Integer.valueOf(incrementAndGet));
            return new QueryResult(incrementAndGet, null);
        } catch (SamzaException e) {
            throw new ExecutorException((Throwable) e);
        }
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public int getRowCount() {
        return outputData.getSize();
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public List<String[]> retrieveQueryResult(ExecutionContext executionContext, int i, int i2) {
        ArrayList arrayList = new ArrayList();
        Iterator<OutgoingMessageEnvelope> it = outputData.get(i, i2).iterator();
        while (it.hasNext()) {
            arrayList.add(getFormattedRow(it.next()));
        }
        return arrayList;
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public List<String[]> consumeQueryResult(ExecutionContext executionContext, int i, int i2) {
        ArrayList arrayList = new ArrayList();
        Iterator<OutgoingMessageEnvelope> it = outputData.consume(i, i2).iterator();
        while (it.hasNext()) {
            arrayList.add(getFormattedRow(it.next()));
        }
        return arrayList;
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public NonQueryResult executeNonQuery(ExecutionContext executionContext, File file) throws ExecutorException {
        LOG.info("Sql file path: " + file.getPath());
        try {
            List<String> list = (List) Files.lines(Paths.get(file.getPath(), new String[0])).collect(Collectors.toList());
            LOG.info("Sql statements in Sql file: " + list.toString());
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            validateExecutedStmts(list, arrayList, arrayList2);
            if (arrayList.isEmpty()) {
                throw new ExecutorException("Nothing to execute. Note: SELECT statements are ignored.");
            }
            return new NonQueryResult(executeNonQuery(executionContext, arrayList).getExecutionId(), arrayList, arrayList2);
        } catch (IOException e) {
            throw new ExecutorException(e);
        }
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public NonQueryResult executeNonQuery(ExecutionContext executionContext, List<String> list) throws ExecutorException {
        int incrementAndGet = execIdSeq.incrementAndGet();
        Map<String, String> fetchSamzaSqlConfig = fetchSamzaSqlConfig(incrementAndGet);
        fetchSamzaSqlConfig.put("samza.sql.stmts.json", JsonUtil.toJson(formatSqlStmts(list)));
        try {
            SamzaSqlApplicationRunner samzaSqlApplicationRunner = new SamzaSqlApplicationRunner(true, new MapConfig(fetchSamzaSqlConfig));
            samzaSqlApplicationRunner.run((ExternalContext) null);
            this.executions.put(Integer.valueOf(incrementAndGet), samzaSqlApplicationRunner);
            LOG.debug("Executing sql. Id ", Integer.valueOf(incrementAndGet));
            return new NonQueryResult(incrementAndGet);
        } catch (SamzaException e) {
            throw new ExecutorException((Throwable) e);
        }
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public void stopExecution(ExecutionContext executionContext, int i) throws ExecutorException {
        SamzaSqlApplicationRunner samzaSqlApplicationRunner = this.executions.get(Integer.valueOf(i));
        if (samzaSqlApplicationRunner == null) {
            throw new ExecutorException("Trying to stop a non-existing SQL execution " + i);
        }
        LOG.debug("Stopping execution ", Integer.valueOf(i));
        try {
            samzaSqlApplicationRunner.kill();
        } catch (SamzaException e) {
            throw new ExecutorException((Throwable) e);
        }
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public void removeExecution(ExecutionContext executionContext, int i) throws ExecutorException {
        SamzaSqlApplicationRunner samzaSqlApplicationRunner = this.executions.get(Integer.valueOf(i));
        if (samzaSqlApplicationRunner == null) {
            throw new ExecutorException("Trying to remove a non-existing SQL execution " + i);
        }
        if (samzaSqlApplicationRunner.status().getStatusCode().equals(ApplicationStatus.StatusCode.Running)) {
            throw new ExecutorException("Trying to remove an ongoing execution " + i);
        }
        this.executions.remove(Integer.valueOf(i));
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public ExecutionStatus queryExecutionStatus(int i) throws ExecutorException {
        SamzaSqlApplicationRunner samzaSqlApplicationRunner = this.executions.get(Integer.valueOf(i));
        if (samzaSqlApplicationRunner == null) {
            throw new ExecutorException("Execution " + i + " does not exist.");
        }
        return queryExecutionStatus(samzaSqlApplicationRunner);
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public List<SqlFunction> listFunctions(ExecutionContext executionContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SamzaSqlUdfDisplayInfo("RegexMatch", "Matches the string to the regex", Arrays.asList(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, false, false), SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, false, false)), SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN, false, false)));
        return arrayList;
    }

    @Override // org.apache.samza.sql.client.interfaces.SqlExecutor
    public String getVersion() {
        return getClass().getPackage().getImplementationVersion();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void saveOutputMessage(OutgoingMessageEnvelope outgoingMessageEnvelope) {
        outputData.add(outgoingMessageEnvelope);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean processEnvironmentVariable(String str, String str2) {
        return true;
    }

    Map<String, String> fetchSamzaSqlConfig(int i) {
        HashMap hashMap = new HashMap();
        hashMap.put("job.name", "sql-job-" + i);
        hashMap.put("processor.id", String.valueOf(i));
        hashMap.put("job.coordinator.factory", PassthroughJobCoordinatorFactory.class.getName());
        hashMap.put("task.name.grouper.factory", SingleContainerGrouperFactory.class.getName());
        hashMap.put("samza.sql.ioResolver", "config");
        String format = String.format("samza.sql.ioResolver.%s.", "config");
        hashMap.put(format + "factory", ConfigBasedIOResolverFactory.class.getName());
        hashMap.put("samza.sql.udfResolver", "config");
        hashMap.put("serializers.registry.string.class", StringSerdeFactory.class.getName());
        hashMap.put("serializers.registry.avro.class", AvroSerDeFactory.class.getName());
        hashMap.put("serializers.avro.schema", ProfileChangeEvent.SCHEMA$.toString());
        String format2 = String.format("systems.%s.", SAMZA_SYSTEM_KAFKA);
        String str = format + String.format("%s.", SAMZA_SYSTEM_KAFKA);
        hashMap.put(format2 + "samza.factory", KafkaSystemFactory.class.getName());
        hashMap.put(format2 + "samza.key.serde", "string");
        hashMap.put(format2 + "samza.msg.serde", "avro");
        hashMap.put(format2 + "consumer.zookeeper.connect", DEFAULT_SERVER_ADDRESS);
        hashMap.put(format2 + "producer.bootstrap.servers", "localhost:9092");
        hashMap.put(format2 + "samza.offset.reset", "true");
        hashMap.put(format2 + "samza.offset.default", "oldest");
        hashMap.put(str + "samzaRelConverterName", "avro");
        hashMap.put(str + "relSchemaProviderName", "config");
        String format3 = String.format("systems.%s.", SAMZA_SYSTEM_LOG);
        String str2 = format + String.format("%s.", SAMZA_SYSTEM_LOG);
        hashMap.put(format3 + "samza.factory", CliLoggingSystemFactory.class.getName());
        hashMap.put(str2 + "samzaRelConverterName", "json");
        hashMap.put(str2 + "relSchemaProviderName", "config");
        hashMap.put(String.format("samza.sql.relConverter.%s.", "avro") + "factory", AvroSchemaGenRelConverterFactory.class.getName());
        hashMap.put(String.format("samza.sql.relConverter.%s.", "json") + "factory", JsonRelConverterFactory.class.getName());
        String format4 = String.format("samza.sql.relSchemaProvider.%s.", "config");
        hashMap.put(format4 + "factory", FileSystemAvroRelSchemaProviderFactory.class.getName());
        hashMap.put(format4 + FileSystemAvroRelSchemaProviderFactory.CFG_SCHEMA_DIR, "/tmp/schemas/");
        for (Pair<String, String> pair : this.environmentVariableHandler.getAllEnvironmentVariables()) {
            hashMap.put(pair.getL(), pair.getR());
        }
        return hashMap;
    }

    private List<String> formatSqlStmts(List<String> list) {
        return (List) list.stream().map(str -> {
            if (str.toLowerCase().startsWith("insert")) {
                return str;
            }
            String format = String.format("insert into log.outputStream %s", str);
            LOG.debug("Sql formatted. ", str, format);
            return format;
        }).collect(Collectors.toList());
    }

    private void validateExecutedStmts(List<String> list, List<String> list2, List<String> list3) {
        for (String str : list) {
            if (!str.isEmpty()) {
                if (str.toLowerCase().startsWith("insert")) {
                    list2.add(str);
                } else {
                    list3.add(str);
                }
            }
        }
    }

    SqlSchema generateResultSchema(Config config) {
        RelRoot relRoot = (RelRoot) new SamzaSqlDslConverterFactory().create(config).convertDsl("").iterator().next();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (RelDataTypeField relDataTypeField : relRoot.validatedRowType.getFieldList()) {
            arrayList.add(relDataTypeField.getName());
            arrayList2.add(relDataTypeField.getType().toString());
        }
        return new SqlSchema(arrayList, Collections.emptyList());
    }

    private String[] getFormattedRow(OutgoingMessageEnvelope outgoingMessageEnvelope) {
        String[] strArr = new String[1];
        String environmentVariable = this.environmentVariableHandler.getEnvironmentVariable(SAMZA_SQL_OUTPUT);
        if (environmentVariable == null || !environmentVariable.equalsIgnoreCase(MessageFormat.PRETTY.toString())) {
            strArr[0] = getCompressedFormat(outgoingMessageEnvelope);
        } else {
            strArr[0] = getPrettyFormat(outgoingMessageEnvelope);
        }
        return strArr;
    }

    private ExecutionStatus queryExecutionStatus(SamzaSqlApplicationRunner samzaSqlApplicationRunner) throws ExecutorException {
        ApplicationStatus.StatusCode statusCode = samzaSqlApplicationRunner.status().getStatusCode();
        switch (AnonymousClass1.$SwitchMap$org$apache$samza$job$ApplicationStatus$StatusCode[statusCode.ordinal()]) {
            case 1:
                return ExecutionStatus.New;
            case 2:
                return ExecutionStatus.Running;
            case 3:
                return ExecutionStatus.SuccessfulFinish;
            case 4:
                return ExecutionStatus.UnsuccessfulFinish;
            default:
                throw new ExecutorException("Unsupported status code: " + statusCode);
        }
    }

    private String getPrettyFormat(OutgoingMessageEnvelope outgoingMessageEnvelope) {
        String str;
        String str2 = new String((byte[]) outgoingMessageEnvelope.getMessage());
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            str = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(objectMapper.readValue(str2, Object.class));
        } catch (IOException e) {
            str = str2;
            LOG.error("getPrettyFormat failed with exception while formatting json ", e);
        }
        return str;
    }

    private String getCompressedFormat(OutgoingMessageEnvelope outgoingMessageEnvelope) {
        return new String((byte[]) outgoingMessageEnvelope.getMessage());
    }
}
