package io.kipe.streams.kafka.processors;

import io.kipe.streams.kafka.factories.TopicNamesFactory;
import io.kipe.streams.recordtypes.TransactionRecord;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kipe/streams/kafka/processors/TransactionBuilder.class */
public class TransactionBuilder<K, V, GK> extends AbstractTopologyPartBuilder<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionBuilder.class);
    private BiFunction<K, V, GK> groupKeyFunction;
    private Serde<GK> groupKeySerde;
    private BiPredicate<K, V> startsWithPredicate;
    private BiPredicate<K, V> endsWithPredicate;
    private EmitType emitType;

    /* loaded from: input_file:io/kipe/streams/kafka/processors/TransactionBuilder$EmitType.class */
    public enum EmitType {
        START,
        ONGOING,
        END,
        ALL(START, ONGOING, END),
        START_AND_END(START, END);

        private List<EmitType> covered;

        EmitType() {
            this.covered = Collections.emptyList();
        }

        EmitType(EmitType... emitTypeArr) {
            this.covered = Arrays.asList(emitTypeArr);
        }

        public boolean isCovered(EmitType emitType) {
            return this == emitType || this.covered.contains(emitType);
        }
    }

    /* loaded from: input_file:io/kipe/streams/kafka/processors/TransactionBuilder$TransactionTransformer.class */
    static class TransactionTransformer<K, V, GK> implements Transformer<K, V, KeyValue<K, TransactionRecord<GK, V>>> {
        private static final Logger LOG = LoggerFactory.getLogger(TransactionTransformer.class);
        private final String stateStoreName;
        private final BiFunction<K, V, GK> groupKeyFunction;
        private final BiPredicate<K, V> startsWithPredicate;
        private final BiPredicate<K, V> endsWithPredicate;
        private final EmitType emitType;
        KeyValueStore<GK, TransactionRecord<GK, V>> stateStore;

        TransactionTransformer(String str, BiFunction<K, V, GK> biFunction, BiPredicate<K, V> biPredicate, BiPredicate<K, V> biPredicate2, EmitType emitType) {
            this.stateStoreName = str;
            this.groupKeyFunction = biFunction;
            this.startsWithPredicate = biPredicate;
            this.endsWithPredicate = biPredicate2;
            this.emitType = emitType;
        }

        public void init(ProcessorContext processorContext) {
            this.stateStore = processorContext.getStateStore(this.stateStoreName);
        }

        public KeyValue<K, TransactionRecord<GK, V>> transform(K k, V v) {
            GK apply = this.groupKeyFunction.apply(k, v);
            TransactionRecord transactionRecord = (TransactionRecord) this.stateStore.get(apply);
            EmitType emitType = null;
            if (transactionRecord == null && startsWith(k, v)) {
                transactionRecord = new TransactionRecord();
                transactionRecord.setGroupKey(apply);
                emitType = EmitType.START;
                LOG.trace("transaction.startsWith groupKey:{} value:{}", apply, v);
            }
            if (transactionRecord == null) {
                return null;
            }
            if (endsWith(k, v)) {
                emitType = emitType == EmitType.START ? EmitType.START_AND_END : EmitType.END;
                LOG.trace("transaction.endsWith groupKey:{} value:{}", apply, v);
            } else if (emitType != EmitType.START) {
                emitType = EmitType.ONGOING;
                LOG.trace("transaction.continued groupKey:{} value:{}", apply, v);
            }
            if (this.emitType.isCovered(emitType) || (emitType == EmitType.START_AND_END && (this.emitType.isCovered(EmitType.START) || this.emitType.isCovered(EmitType.END)))) {
                transactionRecord.addUnique(v);
                LOG.trace("transaction.emit.{}.covers groupKey:{} value:{} currentTXNType:{}", new Object[]{this.emitType, apply, v, emitType});
            }
            if (!emitType.isCovered(EmitType.END)) {
                this.stateStore.put(apply, transactionRecord);
                return null;
            }
            this.stateStore.delete(apply);
            LOG.debug("transaction.emit.{} groupKey:{} key:{} transaction:{} ", new Object[]{this.emitType, apply, k, transactionRecord});
            return new KeyValue<>(k, transactionRecord);
        }

        boolean startsWith(K k, V v) {
            return this.startsWithPredicate.test(k, v);
        }

        boolean endsWith(K k, V v) {
            return this.endsWithPredicate.test(k, v);
        }

        public void close() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: transform, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m15transform(Object obj, Object obj2) {
            return transform((TransactionTransformer<K, V, GK>) obj, obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TransactionBuilder(StreamsBuilder streamsBuilder, KStream<K, V> kStream, Serde<K> serde, Serde<V> serde2, String str) {
        super(streamsBuilder, kStream, serde, serde2, str);
        this.emitType = EmitType.ALL;
    }

    public TransactionBuilder<K, V, GK> groupBy(BiFunction<K, V, GK> biFunction, Serde<GK> serde) {
        this.groupKeyFunction = biFunction;
        this.groupKeySerde = serde;
        return this;
    }

    public TransactionBuilder<K, V, GK> groupBy(BiFunction<K, V, GK> biFunction) {
        return groupBy(biFunction, null);
    }

    public TransactionBuilder<K, V, GK> startsWith(BiPredicate<K, V> biPredicate) {
        this.startsWithPredicate = biPredicate;
        return this;
    }

    public TransactionBuilder<K, V, GK> endsWith(BiPredicate<K, V> biPredicate) {
        this.endsWithPredicate = biPredicate;
        return this;
    }

    public TransactionBuilder<K, V, GK> emit(EmitType emitType) {
        this.emitType = emitType;
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public KipesBuilder<K, TransactionRecord<GK, V>> as(Serde<TransactionRecord<GK, V>> serde) {
        Objects.requireNonNull(getTopicsBaseName(), "topicsBaseName");
        Objects.requireNonNull(this.groupKeyFunction, "groupKeyFunction");
        if (this.groupKeySerde == null) {
            LOG.warn("The default groupKeySerde is being used. To customize serdes, provide a specific serde to override this behavior.");
        }
        Objects.requireNonNull(this.startsWithPredicate, "startsWithPredicate");
        Objects.requireNonNull(this.endsWithPredicate, "endsWithPredicate");
        Objects.requireNonNull(this.emitType, "emitType");
        String processorStoreTopicName = TopicNamesFactory.getProcessorStoreTopicName(getTopicsBaseName() + "-transaction");
        this.streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(processorStoreTopicName), this.groupKeySerde, serde));
        return (KipesBuilder<K, TransactionRecord<GK, V>>) createKipesBuilder(this.stream.transform(() -> {
            return new TransactionTransformer(processorStoreTopicName, this.groupKeyFunction, this.startsWithPredicate, this.endsWithPredicate, this.emitType);
        }, new String[]{processorStoreTopicName}), this.keySerde, serde);
    }

    public KipesBuilder<K, TransactionRecord<GK, V>> as() {
        return as(null);
    }
}
