package io.kipe.streams.kafka.processors;

import io.kipe.streams.kafka.factories.TopicNamesFactory;
import java.util.Objects;
import java.util.function.BiFunction;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kipe/streams/kafka/processors/DedupBuilder.class */
public class DedupBuilder<K, V, GK, DV> extends AbstractTopologyPartBuilder<K, V> {
    static final Logger LOG = LoggerFactory.getLogger(DedupBuilder.class);
    private BiFunction<K, V, GK> groupKeyFunction;
    private Serde<GK> groupKeySerde;
    private BiFunction<K, V, DV> dedupValueFunction;

    /* loaded from: input_file:io/kipe/streams/kafka/processors/DedupBuilder$DedupTransformer.class */
    static class DedupTransformer<K, V, GK, DV> implements Transformer<K, V, KeyValue<K, V>> {
        private static final Logger LOG = LoggerFactory.getLogger(DedupTransformer.class);
        private final String stateStoreName;
        private final BiFunction<K, V, GK> groupKeyFunction;
        private final BiFunction<K, V, DV> dedupValueFunction;
        KeyValueStore<GK, V> stateStore;

        DedupTransformer(String str, BiFunction<K, V, GK> biFunction, BiFunction<K, V, DV> biFunction2) {
            this.stateStoreName = str;
            this.groupKeyFunction = biFunction;
            this.dedupValueFunction = biFunction2;
        }

        public void init(ProcessorContext processorContext) {
            this.stateStore = processorContext.getStateStore(this.stateStoreName);
        }

        public KeyValue<K, V> transform(K k, V v) {
            GK apply = this.groupKeyFunction.apply(k, v);
            Object obj = this.stateStore.get(apply);
            this.stateStore.put(apply, v);
            if (obj == null) {
                LOG.debug("dedup.emitFirst.newGroupKey groupKey:{} key:{} value:{}", new Object[]{apply, k, v});
                return new KeyValue<>(k, v);
            }
            if (this.dedupValueFunction == null) {
                LOG.debug("dedup.duplicateIgnored.groupKey groupKey:{} key:{} value:{}", new Object[]{apply, k, v});
                return null;
            }
            Object apply2 = this.dedupValueFunction.apply(k, obj);
            DV apply3 = this.dedupValueFunction.apply(k, v);
            if (apply2.equals(apply3)) {
                LOG.debug("dedup.duplicateIgnored.groupDedupValue groupKey:{} groupDedupValue:{} key:{} value:{}", new Object[]{apply, apply3, k, v});
                return null;
            }
            LOG.debug("dedup.emitFirst.newGroupDedupValue groupKey:{} groupDedupValue:{} key:{} value:{}", new Object[]{apply, apply3, k, v});
            return new KeyValue<>(k, v);
        }

        public void close() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: transform, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m5transform(Object obj, Object obj2) {
            return transform((DedupTransformer<K, V, GK, DV>) obj, obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DedupBuilder(StreamsBuilder streamsBuilder, KStream<K, V> kStream, Serde<K> serde, Serde<V> serde2, String str) {
        super(streamsBuilder, kStream, serde, serde2, str);
    }

    public DedupBuilder<K, V, GK, DV> groupBy(BiFunction<K, V, GK> biFunction, Serde<GK> serde) {
        this.groupKeyFunction = biFunction;
        this.groupKeySerde = serde;
        return this;
    }

    public DedupBuilder<K, V, GK, DV> groupBy(BiFunction<K, V, GK> biFunction) {
        return groupBy(biFunction, null);
    }

    public DedupBuilder<K, V, GK, DV> advanceBy(BiFunction<K, V, DV> biFunction) {
        this.dedupValueFunction = biFunction;
        return this;
    }

    public KipesBuilder<K, V> emitFirst() {
        Objects.requireNonNull(getTopicsBaseName(), "topicsBaseName must be set");
        Objects.requireNonNull(this.groupKeyFunction, "groupBy groupKeyFunction must be set");
        if (this.groupKeySerde == null) {
            LOG.warn("The default groupKeySerde is being used. To customize serdes, provide a specific serde to override this behavior.");
        }
        String processorStoreTopicName = TopicNamesFactory.getProcessorStoreTopicName(getTopicsBaseName() + "-dedup");
        this.streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(processorStoreTopicName), this.groupKeySerde, this.valueSerde));
        return (KipesBuilder<K, V>) createKipesBuilder(this.stream.transform(() -> {
            return new DedupTransformer(processorStoreTopicName, this.groupKeyFunction, this.dedupValueFunction);
        }, new String[]{processorStoreTopicName}), this.keySerde, this.valueSerde);
    }
}
