package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.config.DelimitedRowFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.data.Schema;
import io.streamthoughts.kafka.connect.filepulse.data.StructSchema;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedField;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter;
import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.class */
public abstract class AbstractDelimitedRowFilter<T extends AbstractRecordFilter<T>> extends AbstractRecordFilter<T> {
    private static final String DEFAULT_SOURCE_FIELD = "message";
    private static final Schema DEFAULT_COLUMN_TYPE = Schema.string();
    private static final String AUTO_GENERATED_COLUMN_NAME_PREFIX = "column";
    private DelimitedRowFilterConfig configs;
    private StructSchema schema;
    private final Map<Integer, TypedField> columnsTypesByIndex = new HashMap();

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public void configure(Map<String, ?> map) {
        super.configure(map);
        this.configs = new DelimitedRowFilterConfig(configDef(), map);
        if (isMandatoryConfigsMissing()) {
            throw new ConfigException("At least one of those parameters should be configured " + new StringJoiner(",", "[", "]").add(DelimitedRowFilterConfig.READER_AUTO_GENERATE_COLUMN_NAME_CONFIG).add(DelimitedRowFilterConfig.READER_EXTRACT_COLUMN_NAME_CONFIG).add(DelimitedRowFilterConfig.READER_FIELD_COLUMNS_CONFIG).toString());
        }
        this.schema = this.configs.schema();
        if (this.schema != null) {
            List fields = this.schema.fields();
            IntStream.range(0, fields.size()).forEach(i -> {
                this.columnsTypesByIndex.put(Integer.valueOf(i), (TypedField) fields.get(i));
            });
        }
    }

    private boolean isMandatoryConfigsMissing() {
        return this.configs.schema() == null && this.configs.extractColumnName() == null && !this.configs.isAutoGenerateColumnNames();
    }

    public DelimitedRowFilterConfig filterConfig() {
        return this.configs;
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public ConfigDef configDef() {
        return DelimitedRowFilterConfig.configDef();
    }

    protected abstract String[] parseColumnsValues(String str);

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public RecordsIterable<TypedStruct> apply(FilterContext filterContext, TypedStruct typedStruct, boolean z) throws FilterException {
        String[] parseColumnsValues = parseColumnsValues(typedStruct.first("message").getString());
        if (this.schema == null || isSchemaDynamic()) {
            inferSchemaFromRecord(typedStruct, parseColumnsValues.length);
        }
        return RecordsIterable.of(new TypedStruct[]{buildStructForFields(parseColumnsValues)});
    }

    public boolean isSchemaDynamic() {
        return this.configs.extractColumnName() == null && this.configs.schema() == null && this.configs.isAutoGenerateColumnNames();
    }

    private void inferSchemaFromRecord(TypedStruct typedStruct, int i) {
        this.schema = Schema.struct();
        if (this.configs.extractColumnName() == null) {
            if (!this.configs.isAutoGenerateColumnNames()) {
                throw new FilterException("Can't found valid configuration to determine schema for input value");
            }
            for (int i2 = 0; i2 < i; i2++) {
                String str = "column" + (i2 + 1);
                this.schema.field(str, DEFAULT_COLUMN_TYPE);
                this.columnsTypesByIndex.put(Integer.valueOf(i2), this.schema.field(str));
            }
            return;
        }
        String extractColumnName = this.configs.extractColumnName();
        String string = typedStruct.first(extractColumnName).getString();
        if (string == null) {
            throw new FilterException("Cannot find field for name '" + extractColumnName + "' to determine columns names");
        }
        List list = (List) Arrays.stream(parseColumnsValues(string)).map((v0) -> {
            return v0.trim();
        }).collect(Collectors.toList());
        if (this.configs.isDuplicateColumnsAsArray()) {
            Map map = (Map) ((Map) list.stream().collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))).entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return ((Long) entry.getValue()).longValue() > 1 ? Schema.array(DEFAULT_COLUMN_TYPE) : DEFAULT_COLUMN_TYPE;
            }));
            StructSchema structSchema = this.schema;
            Objects.requireNonNull(structSchema);
            map.forEach(structSchema::field);
        } else {
            list.forEach(str2 -> {
                this.schema.field(str2, DEFAULT_COLUMN_TYPE);
            });
        }
        IntStream.range(0, list.size()).forEach(i3 -> {
            this.columnsTypesByIndex.put(Integer.valueOf(i3), this.schema.field((String) list.get(i3)));
        });
    }

    private TypedStruct buildStructForFields(String[] strArr) {
        if (strArr.length > this.columnsTypesByIndex.size()) {
            throw new FilterException("Error while reading delimited input row. Too large number of fields (" + strArr.length + ")");
        }
        TypedStruct create = TypedStruct.create();
        for (int i = 0; i < strArr.length; i++) {
            String str = strArr[i];
            if (this.configs.isTrimColumn()) {
                str = str.trim();
            }
            TypedField typedField = this.columnsTypesByIndex.get(Integer.valueOf(i));
            Type type = typedField.type();
            if (type == Type.ARRAY) {
                if (!create.exists(typedField.name())) {
                    create.put(typedField.name(), new ArrayList());
                }
                create.getArray(typedField.name()).add(str);
            } else {
                create = create.put(typedField.name(), type, StringUtils.isNotBlank(str) ? type.convert(str) : null);
            }
        }
        return create;
    }
}
