package io.kipe.streams.kafka.processors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.CollectionType;
import io.kipe.streams.kafka.factories.TopicNamesFactory;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kipe/streams/kafka/processors/SequenceBuilder.class */
public class SequenceBuilder<K, V, GK, VR> extends AbstractTopologyPartBuilder<K, V> {
    static final Logger LOG = LoggerFactory.getLogger(SequenceBuilder.class);
    private BiFunction<K, V, GK> groupKeyFunction;
    private Serde<GK> groupKeySerde;
    private int sequenceSize;

    /* loaded from: input_file:io/kipe/streams/kafka/processors/SequenceBuilder$SequenceTransformer.class */
    static class SequenceTransformer<K, V, VR, GK> implements Transformer<K, V, KeyValue<K, VR>> {
        private final String stateStoreName;
        private final BiFunction<K, V, GK> groupKeyFunction;
        private final int sequenceSize;
        private final BiFunction<GK, List<V>, VR> aggregateFunction;
        KeyValueStore<GK, List<V>> stateStore;

        SequenceTransformer(String str, BiFunction<K, V, GK> biFunction, int i, BiFunction<GK, List<V>, VR> biFunction2) {
            this.stateStoreName = str;
            this.groupKeyFunction = biFunction;
            this.sequenceSize = i;
            this.aggregateFunction = biFunction2;
        }

        public void init(ProcessorContext processorContext) {
            this.stateStore = processorContext.getStateStore(this.stateStoreName);
        }

        public KeyValue<K, VR> transform(K k, V v) {
            GK apply = this.groupKeyFunction.apply(k, v);
            List<V> list = (List) this.stateStore.get(apply);
            if (list == null) {
                list = new LinkedList();
            }
            list.add(v);
            if (list.size() < this.sequenceSize) {
                this.stateStore.put(apply, list);
                return null;
            }
            KeyValue<K, VR> keyValue = new KeyValue<>(k, this.aggregateFunction.apply(apply, list));
            list.remove(0);
            this.stateStore.put(apply, list);
            return keyValue;
        }

        public void close() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: transform, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m10transform(Object obj, Object obj2) {
            return transform((SequenceTransformer<K, V, VR, GK>) obj, obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/kipe/streams/kafka/processors/SequenceBuilder$SequencesSerde.class */
    public static class SequencesSerde<T> implements Serializer<List<T>>, Deserializer<List<T>>, Serde<List<T>> {
        private final ObjectMapper mapper = new ObjectMapper();
        private final CollectionType valueType;

        public SequencesSerde(Class<T> cls) {
            this.valueType = this.mapper.getTypeFactory().constructCollectionType(List.class, cls);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public List<T> m11deserialize(String str, byte[] bArr) {
            if (bArr == null) {
                return null;
            }
            try {
                return (List) this.mapper.readValue(bArr, this.valueType);
            } catch (IOException e) {
                throw new SerializationException("Unable to deserialize data: " + bArr, e);
            }
        }

        public byte[] serialize(String str, List<T> list) {
            if (list == null) {
                return null;
            }
            try {
                return this.mapper.writeValueAsBytes(list);
            } catch (JsonProcessingException e) {
                throw new SerializationException("Unable to serialize data: " + list, e);
            }
        }

        public void configure(Map<String, ?> map, boolean z) {
        }

        public Serializer<List<T>> serializer() {
            return this;
        }

        public Deserializer<List<T>> deserializer() {
            return this;
        }

        public void close() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SequenceBuilder(StreamsBuilder streamsBuilder, KStream<K, V> kStream, Serde<K> serde, Serde<V> serde2, String str) {
        super(streamsBuilder, kStream, serde, serde2, str);
        this.sequenceSize = 1;
    }

    public SequenceBuilder<K, V, GK, VR> groupBy(BiFunction<K, V, GK> biFunction, Serde<GK> serde) {
        this.groupKeyFunction = biFunction;
        this.groupKeySerde = serde;
        return this;
    }

    public SequenceBuilder<K, V, GK, VR> groupBy(BiFunction<K, V, GK> biFunction) {
        return groupBy(biFunction, null);
    }

    public SequenceBuilder<K, V, GK, VR> size(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("size must be larger than 0");
        }
        this.sequenceSize = i;
        return this;
    }

    public KipesBuilder<K, VR> as(BiFunction<GK, List<V>, VR> biFunction, Class<V> cls, Serde<VR> serde) {
        Objects.requireNonNull(getTopicsBaseName(), "topicsBaseName");
        Objects.requireNonNull(this.groupKeyFunction, "groupKeyFunction");
        if (this.groupKeySerde == null) {
            LOG.warn("The default groupKeySerde is being used. To customize serdes, provide a specific serde to override this behavior.");
        }
        if (serde == null) {
            LOG.warn("The default resultValueSerde is being used. To customize serdes, provide a specific serde to override this behavior.");
        }
        String processorStoreTopicName = TopicNamesFactory.getProcessorStoreTopicName(getTopicsBaseName() + "-sequence");
        this.streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(processorStoreTopicName), this.groupKeySerde, new SequencesSerde(cls)));
        return (KipesBuilder<K, VR>) createKipesBuilder(this.stream.transform(() -> {
            return new SequenceTransformer(processorStoreTopicName, this.groupKeyFunction, this.sequenceSize, biFunction);
        }, new String[]{processorStoreTopicName}), this.keySerde, serde);
    }

    public KipesBuilder<K, VR> as(BiFunction<GK, List<V>, VR> biFunction, Class<V> cls) {
        return as(biFunction, cls, null);
    }
}
