package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.config.AppendFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
import io.streamthoughts.kafka.connect.filepulse.expression.PropertyExpression;
import io.streamthoughts.kafka.connect.filepulse.expression.StandardEvaluationContext;
import io.streamthoughts.kafka.connect.filepulse.expression.parser.ExpressionParsers;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.config.ConfigDef;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/filter/AppendFilter.class */
public class AppendFilter extends AbstractMergeRecordFilter<AppendFilter> {
    private AppendFilterConfig config;
    private List<Expression> values;
    private Expression fieldExpression;
    protected String target;

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public void configure(Map<String, ?> map) {
        super.configure(map);
        this.config = new AppendFilterConfig(map);
        this.values = Collections.singletonList(ExpressionParsers.parseExpression(this.config.value()));
        this.fieldExpression = this.config.field();
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public ConfigDef configDef() {
        return AppendFilterConfig.configDef();
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter
    protected RecordsIterable<TypedStruct> apply(FilterContext filterContext, TypedStruct typedStruct) throws FilterException {
        InternalFilterContext internalFilterContext = (InternalFilterContext) filterContext;
        internalFilterContext.setValue(typedStruct);
        StandardEvaluationContext standardEvaluationContext = new StandardEvaluationContext(internalFilterContext, internalFilterContext.variables());
        Expression mayEvaluateWriteExpression = mayEvaluateWriteExpression(standardEvaluationContext);
        TypedStruct create = TypedStruct.create();
        Iterator<Expression> it = this.values.iterator();
        while (it.hasNext() && create != null) {
            Expression next = it.next();
            internalFilterContext.setValue(typedStruct);
            Object readValue = next.readValue(standardEvaluationContext);
            StandardEvaluationContext standardEvaluationContext2 = new StandardEvaluationContext(internalFilterContext, internalFilterContext.variables());
            internalFilterContext.setValue(create);
            mayEvaluateWriteExpression.writeValue(readValue, standardEvaluationContext2);
            create = internalFilterContext.value();
        }
        return RecordsIterable.of(new TypedStruct[]{create});
    }

    private Expression mayEvaluateWriteExpression(StandardEvaluationContext standardEvaluationContext) {
        Expression expression = this.fieldExpression;
        if (!this.fieldExpression.canWrite()) {
            String str = (String) this.fieldExpression.readValue(standardEvaluationContext, String.class);
            if (str == null) {
                throw new FilterException("Invalid value for property 'field'. Evaluation of expression '" + this.fieldExpression.originalExpression() + " 'returns 'null'.");
            }
            expression = ExpressionParsers.parseExpression(str);
        }
        this.target = ((PropertyExpression) expression).getAttribute();
        return expression;
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter
    protected Set<String> overwrite() {
        return (!this.config.isOverwritten() || this.target == null) ? Collections.emptySet() : Collections.singleton(this.target);
    }
}
