package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.config.ExplodeFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/filter/ExplodeFilter.class */
public class ExplodeFilter extends AbstractMergeRecordFilter<ExplodeFilter> {
    private ExplodeFilterConfig config;

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter, io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public void configure(Map<String, ?> map) {
        super.configure(map);
        this.config = new ExplodeFilterConfig(map);
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public ConfigDef configDef() {
        return ExplodeFilterConfig.configDef();
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter
    protected RecordsIterable<TypedStruct> apply(FilterContext filterContext, TypedStruct typedStruct) throws FilterException {
        TypedValue checkIsNotNull = checkIsNotNull(typedStruct.find(this.config.source()));
        if (checkIsNotNull.type() != Type.ARRAY) {
            throw new FilterException("Invalid type for field '" + this.config.source() + "', expected ARRAY, was " + checkIsNotNull.type());
        }
        return new RecordsIterable<>((List) checkIsNotNull.getArray().stream().map(obj -> {
            return TypedStruct.create().insert(this.config.source(), TypedValue.any(obj));
        }).collect(Collectors.toList()));
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter
    protected Set<String> overwrite() {
        HashSet hashSet = new HashSet();
        hashSet.add(this.config.source());
        return hashSet;
    }

    private TypedValue checkIsNotNull(TypedValue typedValue) {
        if (typedValue.isNull()) {
            throw new FilterException("Invalid field '" + this.config.source() + "', cannot explode empty value");
        }
        return typedValue;
    }
}
