package io.cdap.plugin.kafka.batch.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.data.format.FormatSpecification;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.format.RecordFormats;
import io.cdap.plugin.common.KeyValueListParser;
import io.cdap.plugin.common.ReferencePluginConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.fs.Path;

/* loaded from: input_file:io/cdap/plugin/kafka/batch/source/KafkaBatchConfig.class */
public class KafkaBatchConfig extends ReferencePluginConfig {
    public static final String KEY_FIELD = "keyField";
    public static final String PARTITION_FIELD = "partitionField";
    public static final String OFFSET_FIELD = "offsetField";
    public static final String SCHEMA = "schema";
    public static final String INITIAL_PARTITION_OFFSETS = "initialPartitionOffsets";
    public static final String FORMAT = "format";
    public static final String TOPIC = "topic";
    public static final String KAFKA_BROKERS = "kafkaBrokers";

    @Description("Kafka topic to read from.")
    @Macro
    private String topic;

    @Description("A directory path to store the latest Kafka offsets. A file named with the pipeline name will be created under the given directory.")
    @Macro
    @Nullable
    private String offsetDir;

    @Description("A comma separated list of topic partitions to read from. If not specified, all partitions will be read.")
    @Macro
    @Nullable
    private String partitions;

    @Description("The initial offset for each topic partition in partition1:offset1,partition2:offset2 form. These offsets will only be used for the first run of the pipeline. Any subsequent run will read from the latest offset from previous run.Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. If not specified, the initial run will start reading from the latest message in Kafka.")
    @Macro
    @Nullable
    private String initialPartitionOffsets;

    @Description("The maximum of messages the source will read from each topic partition. If the current topic partition does not have this number of messages, the source will read to the latest offset. Note that this is an estimation, the actual number of messages the source read may be smaller than this number.")
    @Macro
    @Nullable
    private Long maxNumberRecords;

    @Description("Output schema of the source, including the timeField and keyField. The fields excluding keyField are used in conjunction with the format to parse Kafka payloads.")
    private String schema;

    @Description("Optional format of the Kafka event. Any format supported by CDAP is supported. For example, a value of 'csv' will attempt to parse Kafka payloads as comma-separated values. If no format is given, Kafka message payloads will be treated as bytes.")
    @Nullable
    private String format;

    @Description("Optional name of the field containing the message key. If this is not set, no key field will be added to output records. If set, this field must be present in the schema property and must be bytes.")
    @Nullable
    private String keyField;

    @Description("Optional name of the field containing the kafka partition that was read from. If this is not set, no partition field will be added to output records. If set, this field must be present in the schema property and must be an integer.")
    @Nullable
    private String partitionField;

    @Description("Optional name of the field containing the kafka offset that the message was read from. If this is not set, no offset field will be added to output records. If set, this field must be present in the schema property and must be a long.")
    @Nullable
    private String offsetField;

    public KafkaBatchConfig() {
        super("");
    }

    public KafkaBatchConfig(String str, String str2, String str3) {
        super(String.format("Kafka_%s", str2));
        this.partitions = str;
        this.topic = str2;
        this.initialPartitionOffsets = str3;
    }

    public String getTopic() {
        return this.topic;
    }

    @Nullable
    public String getOffsetDir() {
        return this.offsetDir;
    }

    public Set<Integer> getPartitions(FailureCollector failureCollector) {
        HashSet hashSet = new HashSet();
        if (this.partitions == null) {
            return hashSet;
        }
        for (String str : Splitter.on(',').trimResults().split(this.partitions)) {
            try {
                hashSet.add(Integer.valueOf(Integer.parseInt(str)));
            } catch (NumberFormatException e) {
                failureCollector.addFailure(String.format("Invalid partition '%s'. Partitions must be integers.", str), (String) null).withConfigElement("partitions", str);
            }
        }
        return hashSet;
    }

    @Nullable
    public String getKeyField() {
        if (Strings.isNullOrEmpty(this.keyField)) {
            return null;
        }
        return this.keyField;
    }

    @Nullable
    public String getPartitionField() {
        if (Strings.isNullOrEmpty(this.partitionField)) {
            return null;
        }
        return this.partitionField;
    }

    @Nullable
    public String getOffsetField() {
        if (Strings.isNullOrEmpty(this.offsetField)) {
            return null;
        }
        return this.offsetField;
    }

    public long getMaxNumberRecords() {
        if (this.maxNumberRecords == null) {
            return -1L;
        }
        return this.maxNumberRecords.longValue();
    }

    @Nullable
    public String getFormat() {
        if (Strings.isNullOrEmpty(this.format)) {
            return null;
        }
        return this.format;
    }

    @Nullable
    public Schema getSchema(FailureCollector failureCollector) {
        try {
            if (Strings.isNullOrEmpty(this.schema)) {
                return null;
            }
            return Schema.parseJson(this.schema);
        } catch (IOException e) {
            failureCollector.addFailure("Unable to parse schema: " + e.getMessage(), (String) null).withConfigProperty(SCHEMA);
            return null;
        }
    }

    public Schema getMessageSchema(FailureCollector failureCollector) {
        Schema schema = getSchema(failureCollector);
        if (schema == null) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        boolean z = false;
        boolean z2 = false;
        boolean z3 = false;
        for (Schema.Field field : schema.getFields()) {
            String name = field.getName();
            Schema schema2 = field.getSchema();
            Schema.Type type = schema2.isNullable() ? schema2.getNonNullable().getType() : schema2.getType();
            if (name.equals(this.keyField)) {
                if (type != Schema.Type.BYTES) {
                    failureCollector.addFailure("The key field must be of type bytes or nullable bytes.", (String) null).withConfigProperty(KEY_FIELD).withOutputSchemaField(this.keyField);
                }
                z = true;
            } else if (name.equals(this.partitionField)) {
                if (type != Schema.Type.INT) {
                    failureCollector.addFailure("The partition field must be of type int.", (String) null).withConfigProperty(PARTITION_FIELD).withOutputSchemaField(this.partitionField);
                }
                z2 = true;
            } else if (name.equals(this.offsetField)) {
                if (type != Schema.Type.LONG) {
                    failureCollector.addFailure("The offset field must be of type long.", (String) null).withConfigProperty(OFFSET_FIELD).withOutputSchemaField(this.offsetField);
                }
                z3 = true;
            } else {
                arrayList.add(field);
            }
        }
        if (arrayList.isEmpty()) {
            failureCollector.addFailure("Schema must contain at least one other field besides the time and key fields.", (String) null).withConfigProperty(SCHEMA);
        }
        if (getKeyField() != null && !z) {
            failureCollector.addFailure(String.format("keyField '%s' does not exist in the schema. Please add it to the schema.", this.keyField), (String) null).withConfigProperty(KEY_FIELD);
        }
        if (getPartitionField() != null && !z2) {
            failureCollector.addFailure(String.format("partitionField '%s' does not exist in the schema. Please add it to the schema.", this.partitionField), (String) null).withConfigProperty(PARTITION_FIELD);
        }
        if (getOffsetField() != null && !z3) {
            failureCollector.addFailure(String.format("offsetField '%s' does not exist in the schema. Please add it to the schema.", this.offsetField), (String) null).withConfigProperty(OFFSET_FIELD);
        }
        return Schema.recordOf("kafka.message", arrayList);
    }

    public static Map<String, Integer> parseBrokerMap(String str, @Nullable FailureCollector failureCollector) {
        HashMap hashMap = new HashMap();
        for (KeyValue keyValue : KeyValueListParser.DEFAULT.parse(str)) {
            String str2 = (String) keyValue.getKey();
            String str3 = (String) keyValue.getValue();
            try {
                hashMap.put(str2, Integer.valueOf(Integer.parseInt(str3)));
            } catch (NumberFormatException e) {
                String format = String.format("Invalid port '%s' for host '%s'.", str3, str2);
                if (failureCollector == null) {
                    throw new IllegalArgumentException(format);
                }
                failureCollector.addFailure(format, (String) null).withConfigElement(KAFKA_BROKERS, str2 + ":" + str3);
            }
        }
        if (hashMap.isEmpty()) {
            if (failureCollector == null) {
                throw new IllegalArgumentException("Must specify kafka brokers.");
            }
            failureCollector.addFailure("Must specify kafka brokers.", (String) null).withConfigProperty(KAFKA_BROKERS);
        }
        return hashMap;
    }

    public KafkaPartitionOffsets getInitialPartitionOffsets(FailureCollector failureCollector) {
        KafkaPartitionOffsets kafkaPartitionOffsets = new KafkaPartitionOffsets(Collections.emptyMap());
        if (this.initialPartitionOffsets == null) {
            return kafkaPartitionOffsets;
        }
        for (KeyValue keyValue : KeyValueListParser.DEFAULT.parse(this.initialPartitionOffsets)) {
            String str = (String) keyValue.getKey();
            String str2 = (String) keyValue.getValue();
            try {
                int parseInt = Integer.parseInt(str);
                try {
                    kafkaPartitionOffsets.setPartitionOffset(parseInt, Long.parseLong(str2));
                } catch (NumberFormatException e) {
                    failureCollector.addFailure(String.format("Invalid offset '%s' in initialPartitionOffsets for partition %d.", str, Integer.valueOf(parseInt)), (String) null).withConfigElement(INITIAL_PARTITION_OFFSETS, str + ":" + str2);
                }
            } catch (NumberFormatException e2) {
                failureCollector.addFailure(String.format("Invalid partition '%s' in initialPartitionOffsets.", str), (String) null).withConfigElement(INITIAL_PARTITION_OFFSETS, str + ":" + str2);
            }
        }
        return kafkaPartitionOffsets;
    }

    public void validate(FailureCollector failureCollector) {
        getPartitions(failureCollector);
        getInitialPartitionOffsets(failureCollector);
        Schema messageSchema = getMessageSchema(failureCollector);
        if (messageSchema == null) {
            return;
        }
        if (!Strings.isNullOrEmpty(this.format)) {
            try {
                RecordFormats.createInitializedFormat(new FormatSpecification(this.format, messageSchema, new HashMap()));
                return;
            } catch (Exception e) {
                failureCollector.addFailure(String.format("Unable to instantiate a message parser from format '%s' and message schema '%s': %s", this.format, messageSchema, e.getMessage()), (String) null).withConfigProperty(FORMAT);
                return;
            }
        }
        List fields = messageSchema.getFields();
        if (fields.size() > 1) {
            failureCollector.addFailure(String.format("Without a format, the schema must contain just a single message field of type bytes or nullable bytes. Found %s message fields (%s).", Integer.valueOf(fields.size()), (String) fields.stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.joining(","))), (String) null).withConfigProperty(FORMAT);
        }
        Schema.Field field = (Schema.Field) fields.get(0);
        Schema schema = field.getSchema();
        if ((schema.isNullable() ? schema.getNonNullable().getType() : schema.getType()) != Schema.Type.BYTES) {
            failureCollector.addFailure(String.format("Without a format, the message field must be of type bytes or nullable bytes, but field %s is of type %s.", field.getName(), field.getSchema()), (String) null).withOutputSchemaField(field.getName()).withConfigProperty(FORMAT);
        }
    }

    @VisibleForTesting
    public static Path getOffsetFilePath(Path path, String str, String str2) {
        return new Path(path, String.format("%s.%s.offsets.json", str, str2));
    }
}
