package io.kipe.streams.kafka.processors;

import io.kipe.streams.kafka.factories.TopicNamesFactory;
import io.kipe.streams.kafka.processors.recordtypes.TableRecord;
import io.kipe.streams.recordtypes.GenericRecord;
import java.util.Objects;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;

/* loaded from: input_file:io/kipe/streams/kafka/processors/TableBuilder.class */
public class TableBuilder<K> extends AbstractTopologyPartBuilder<K, GenericRecord> {
    private static final String EMPTY = "";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/kipe/streams/kafka/processors/TableBuilder$TableTransformer.class */
    public static class TableTransformer<K> implements Transformer<K, GenericRecord, KeyValue<String, TableRecord<K, GenericRecord>>> {
        private final String stateStoreName;
        KeyValueStore<String, TableRecord<K, GenericRecord>> stateStore;

        TableTransformer(String str) {
            this.stateStoreName = str;
        }

        public void init(ProcessorContext processorContext) {
            this.stateStore = processorContext.getStateStore(this.stateStoreName);
        }

        public KeyValue<String, TableRecord<K, GenericRecord>> transform(K k, GenericRecord genericRecord) {
            TableRecord tableRecord = (TableRecord) this.stateStore.get(TableBuilder.EMPTY);
            if (tableRecord == null) {
                tableRecord = new TableRecord();
            }
            tableRecord.put(k, genericRecord);
            this.stateStore.put(TableBuilder.EMPTY, tableRecord);
            return new KeyValue<>(TableBuilder.EMPTY, tableRecord);
        }

        public void close() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ Object transform(Object obj, Object obj2) {
            return transform((TableTransformer<K>) obj, (GenericRecord) obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TableBuilder(StreamsBuilder streamsBuilder, KStream<K, GenericRecord> kStream, Serde<K> serde, Serde<GenericRecord> serde2, String str) {
        super(streamsBuilder, kStream, serde, serde2, str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public KipesBuilder<String, TableRecord<K, GenericRecord>> build(Serde<String> serde, Serde<TableRecord<K, GenericRecord>> serde2) {
        Objects.requireNonNull(serde, "resultKeySerde");
        Objects.requireNonNull(serde2, "resultValueSerde");
        Objects.requireNonNull(getTopicsBaseName(), "topicsBaseName must be set");
        String processorStoreTopicName = TopicNamesFactory.getProcessorStoreTopicName(getTopicsBaseName() + "-tablebuilder");
        this.streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(processorStoreTopicName), serde, serde2));
        return (KipesBuilder<String, TableRecord<K, GenericRecord>>) createKipesBuilder(this.stream.transform(() -> {
            return new TableTransformer(processorStoreTopicName);
        }, new String[]{processorStoreTopicName}), serde, serde2);
    }
}
