package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.config.CommonFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.config.GrokFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.transform.pattern.GrokMatcher;
import io.streamthoughts.kafka.connect.transform.pattern.GrokPatternCompiler;
import io.streamthoughts.kafka.connect.transform.pattern.GrokPatternResolver;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/filter/GrokFilter.class */
public class GrokFilter extends AbstractMergeRecordFilter<GrokFilter> {
    private GrokFilterConfig config;
    private GrokPatternCompiler compiler;
    private List<GrokMatcher> matchPatterns;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/filter/GrokFilter$SchemaAndNamedCaptured.class */
    public static final class SchemaAndNamedCaptured {
        private final Schema schema;
        private final Map<String, Object> namedCaptured;

        public SchemaAndNamedCaptured(Schema schema, Map<String, Object> map) {
            this.schema = schema;
            this.namedCaptured = map;
        }

        public Schema schema() {
            return this.schema;
        }

        public Map<String, Object> namedCaptured() {
            return this.namedCaptured;
        }
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public void configure(Map<String, ?> map) {
        super.configure(map);
        this.config = new GrokFilterConfig(map);
        this.compiler = new GrokPatternCompiler(new GrokPatternResolver(this.config.grok().patternDefinitions(), this.config.grok().patternsDir()), this.config.grok().namedCapturesOnly());
        this.matchPatterns = (List) this.config.grok().patterns().stream().map(str -> {
            return this.compiler.compile(str);
        }).collect(Collectors.toList());
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public ConfigDef configDef() {
        return CommonFilterConfig.configDef();
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter
    protected RecordsIterable<TypedStruct> apply(FilterContext filterContext, TypedStruct typedStruct) throws FilterException {
        String string = typedStruct.getString(this.config.source());
        if (string == null) {
            return null;
        }
        byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
        ArrayList arrayList = new ArrayList(this.matchPatterns.size());
        for (GrokMatcher grokMatcher : this.matchPatterns) {
            Map captures = grokMatcher.captures(bytes);
            if (captures != null) {
                arrayList.add(new SchemaAndNamedCaptured(grokMatcher.schema(), captures));
                if (this.config.grok().breakOnFirstPattern()) {
                    break;
                }
            }
        }
        if (arrayList.isEmpty()) {
            throw new FilterException("Supplied Grok patterns does not match input data: " + string);
        }
        return RecordsIterable.of(new TypedStruct[]{mergeToStruct(arrayList, mergeToSchema(arrayList))});
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter
    protected Set<String> overwrite() {
        return this.config.overwrite();
    }

    private Schema mergeToSchema(List<SchemaAndNamedCaptured> list) {
        if (list.size() == 1) {
            return list.get(0).schema();
        }
        HashMap hashMap = new HashMap();
        Iterator<SchemaAndNamedCaptured> it = list.iterator();
        while (it.hasNext()) {
            it.next().schema().fields().forEach(field -> {
                hashMap.put(field.name(), hashMap.containsKey(field.name()) ? SchemaBuilder.array(field.schema()) : field.schema());
            });
        }
        SchemaBuilder struct = SchemaBuilder.struct();
        Objects.requireNonNull(struct);
        hashMap.forEach(struct::field);
        return struct.build();
    }

    private TypedStruct mergeToStruct(List<SchemaAndNamedCaptured> list, Schema schema) {
        HashMap hashMap = new HashMap();
        Iterator<SchemaAndNamedCaptured> it = list.iterator();
        while (it.hasNext()) {
            it.next().namedCaptured().forEach((str, obj) -> {
                Field field = schema.field(str);
                if (field.schema().type() == Schema.Type.ARRAY) {
                    ((TypedValue) hashMap.computeIfAbsent(str, str -> {
                        return TypedValue.array(new ArrayList(), Type.forConnectSchemaType(field.schema().valueSchema().type()));
                    })).getArray().add(obj);
                } else {
                    hashMap.put(str, TypedValue.of(obj, Type.forConnectSchemaType(field.schema().type())));
                }
            });
        }
        TypedStruct create = TypedStruct.create();
        Objects.requireNonNull(create);
        hashMap.forEach(create::put);
        return create;
    }
}
