package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.config.DelimitedRowFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.data.Schema;
import io.streamthoughts.kafka.connect.filepulse.data.StructSchema;
import io.streamthoughts.kafka.connect.filepulse.data.TypedField;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/filter/DelimitedRowFilter.class */
public class DelimitedRowFilter extends AbstractRecordFilter<DelimitedRowFilter> {
    private static final String DEFAULT_SOURCE_FIELD = "message";
    private static Schema DEFAULT_COLUMN_TYPE = Schema.string();
    private static final String AUTO_GENERATED_COLUMN_NAME_PREFIX = "column";
    private DelimitedRowFilterConfig configs;
    private StructSchema schema;

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public void configure(Map<String, ?> map) {
        super.configure(map);
        this.configs = new DelimitedRowFilterConfig(map);
        if (isMandatoryConfigsMissing()) {
            throw new ConfigException("At least one of those parameters should be configured " + new StringJoiner(",", "[", "]").add(DelimitedRowFilterConfig.READER_AUTO_GENERATE_COLUMN_NAME_CONFIG).add(DelimitedRowFilterConfig.READER_EXTRACT_COLUMN_NAME_CONFIG).add(DelimitedRowFilterConfig.READER_FIELD_COLUMNS_CONFIG).toString());
        }
        this.schema = this.configs.schema();
    }

    private boolean isMandatoryConfigsMissing() {
        return this.configs.schema() == null && this.configs.extractColumnName() == null && !this.configs.isAutoGenerateColumnNames();
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public ConfigDef configDef() {
        return DelimitedRowFilterConfig.configDef();
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public RecordsIterable<TypedStruct> apply(FilterContext filterContext, TypedStruct typedStruct, boolean z) throws FilterException {
        String[] splitFields = splitFields(typedStruct.first("message").getString());
        return RecordsIterable.of(new TypedStruct[]{buildStructForFields(splitFields, getSchema(typedStruct, splitFields.length))});
    }

    private StructSchema getSchema(TypedStruct typedStruct, int i) {
        if (this.schema != null) {
            return this.schema;
        }
        this.schema = Schema.struct();
        if (this.configs.extractColumnName() != null) {
            String extractColumnName = this.configs.extractColumnName();
            String string = typedStruct.first(extractColumnName).getString();
            if (string == null) {
                throw new FilterException("Can't found field for name '" + extractColumnName + "' to determine columns names");
            }
            for (String str : splitFields(string)) {
                this.schema.field(str, DEFAULT_COLUMN_TYPE);
            }
        } else {
            if (!this.configs.isAutoGenerateColumnNames()) {
                throw new FilterException("Can't found valid configuration to determine schema for input value");
            }
            for (int i2 = 0; i2 < i; i2++) {
                this.schema.field(AUTO_GENERATED_COLUMN_NAME_PREFIX + (i2 + 1), DEFAULT_COLUMN_TYPE);
            }
        }
        return this.schema;
    }

    private String[] splitFields(String str) {
        return str.split(this.configs.delimiter());
    }

    private TypedStruct buildStructForFields(String[] strArr, StructSchema structSchema) {
        List fields = structSchema.fields();
        if (strArr.length > fields.size()) {
            throw new FilterException("Error while reading delimited input row. Too large number of fields (" + strArr.length + ")");
        }
        TypedStruct create = TypedStruct.create();
        for (int i = 0; i < strArr.length; i++) {
            String str = strArr[i];
            if (this.configs.isTrimColumn()) {
                str = str.trim();
            }
            create = create.put(((TypedField) fields.get(i)).name(), str);
        }
        return create;
    }
}
