/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.ContextualRecord;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import org.apache.kafka.streams.state.internals.metrics.Sensors;

public final class InMemoryTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyValueBuffer<K, V> {
    private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
    private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
    private static final RecordHeaders V_1_CHANGELOG_HEADERS = new RecordHeaders(new Header[]{new RecordHeader("v", new byte[]{1})});
    private final Map<Bytes, BufferKey> index = new HashMap<Bytes, BufferKey>();
    private final TreeMap<BufferKey, ContextualRecord> sortedMap = new TreeMap();
    private final Set<Bytes> dirtyKeys = new HashSet<Bytes>();
    private final String storeName;
    private final boolean loggingEnabled;
    private Serde<K> keySerde;
    private Serde<V> valueSerde;
    private long memBufferSize = 0L;
    private long minTimestamp = Long.MAX_VALUE;
    private RecordCollector collector;
    private String changelogTopic;
    private Sensor bufferSizeSensor;
    private Sensor bufferCountSensor;
    private volatile boolean open;
    private int partition;

    private InMemoryTimeOrderedKeyValueBuffer(String storeName, boolean loggingEnabled, Serde<K> keySerde, Serde<V> valueSerde) {
        this.storeName = storeName;
        this.loggingEnabled = loggingEnabled;
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
    }

    @Override
    public String name() {
        return this.storeName;
    }

    @Override
    public boolean persistent() {
        return false;
    }

    @Override
    public void setSerdesIfNull(Serde<K> keySerde, Serde<V> valueSerde) {
        this.keySerde = this.keySerde == null ? keySerde : this.keySerde;
        this.valueSerde = this.valueSerde == null ? valueSerde : this.valueSerde;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        InternalProcessorContext internalProcessorContext = (InternalProcessorContext)context;
        this.bufferSizeSensor = Sensors.createBufferSizeSensor(this, internalProcessorContext);
        this.bufferCountSensor = Sensors.createBufferCountSensor(this, internalProcessorContext);
        context.register(root, this::restoreBatch);
        if (this.loggingEnabled) {
            this.collector = ((RecordCollector.Supplier)((Object)context)).recordCollector();
            this.changelogTopic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), this.storeName);
        }
        this.updateBufferMetrics();
        this.open = true;
        this.partition = context.taskId().partition;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    @Override
    public void close() {
        this.open = false;
        this.index.clear();
        this.sortedMap.clear();
        this.dirtyKeys.clear();
        this.memBufferSize = 0L;
        this.minTimestamp = Long.MAX_VALUE;
        this.updateBufferMetrics();
    }

    @Override
    public void flush() {
        if (this.loggingEnabled) {
            for (Bytes key : this.dirtyKeys) {
                BufferKey bufferKey = this.index.get(key);
                if (bufferKey == null) {
                    this.logTombstone(key);
                    continue;
                }
                ContextualRecord value = this.sortedMap.get(bufferKey);
                this.logValue(key, bufferKey, value);
            }
            this.dirtyKeys.clear();
        }
    }

    private void logValue(Bytes key, BufferKey bufferKey, ContextualRecord value) {
        byte[] serializedContextualRecord = value.serialize();
        int sizeOfBufferTime = 8;
        int sizeOfContextualRecord = serializedContextualRecord.length;
        byte[] timeAndContextualRecord = ByteBuffer.wrap(new byte[8 + sizeOfContextualRecord]).putLong(bufferKey.time).put(serializedContextualRecord).array();
        this.collector.send(this.changelogTopic, key, timeAndContextualRecord, (Headers)V_1_CHANGELOG_HEADERS, this.partition, null, KEY_SERIALIZER, VALUE_SERIALIZER);
    }

    private void logTombstone(Bytes key) {
        this.collector.send(this.changelogTopic, key, null, null, this.partition, null, KEY_SERIALIZER, VALUE_SERIALIZER);
    }

    private void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> batch) {
        for (ConsumerRecord<byte[], byte[]> record : batch) {
            Bytes key = Bytes.wrap(record.key());
            if (record.value() == null) {
                BufferKey bufferKey = this.index.remove(key);
                if (bufferKey != null) {
                    ContextualRecord removed = this.sortedMap.remove(bufferKey);
                    if (removed != null) {
                        this.memBufferSize -= InMemoryTimeOrderedKeyValueBuffer.computeRecordSize(bufferKey.key, removed);
                    }
                    if (bufferKey.time == this.minTimestamp) {
                        long l = this.minTimestamp = this.sortedMap.isEmpty() ? Long.MAX_VALUE : this.sortedMap.firstKey().time;
                    }
                }
                if (record.partition() != this.partition) {
                    throw new IllegalStateException(String.format("record partition [%d] is being restored by the wrong suppress partition [%d]", record.partition(), this.partition));
                }
            } else {
                ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
                long time = timeAndValue.getLong();
                byte[] value = new byte[record.value().length - 8];
                timeAndValue.get(value);
                if (record.headers().lastHeader("v") == null) {
                    this.cleanPut(time, key, new ContextualRecord(value, new ProcessorRecordContext(record.timestamp(), record.offset(), record.partition(), record.topic(), record.headers())));
                } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
                    ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(value));
                    this.cleanPut(time, key, contextualRecord);
                } else {
                    throw new IllegalArgumentException("Restoring apparently invalid changelog record: " + record);
                }
            }
            if (record.partition() == this.partition) continue;
            throw new IllegalStateException(String.format("record partition [%d] is being restored by the wrong suppress partition [%d]", record.partition(), this.partition));
        }
        this.updateBufferMetrics();
    }

    @Override
    public void evictWhile(Supplier<Boolean> predicate, Consumer<TimeOrderedKeyValueBuffer.Eviction<K, V>> callback) {
        Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = this.sortedMap.entrySet().iterator();
        int evictions = 0;
        if (predicate.get().booleanValue()) {
            Map.Entry<BufferKey, ContextualRecord> next = null;
            if (delegate.hasNext()) {
                next = delegate.next();
            }
            while (next != null && predicate.get().booleanValue()) {
                if (next.getKey().time != this.minTimestamp) {
                    throw new IllegalStateException("minTimestamp [" + this.minTimestamp + "] did not match the actual min timestamp [" + next.getKey().time + "]");
                }
                K key = this.keySerde.deserializer().deserialize(this.changelogTopic, next.getKey().key.get());
                V value = this.valueSerde.deserializer().deserialize(this.changelogTopic, next.getValue().value());
                callback.accept(new TimeOrderedKeyValueBuffer.Eviction<K, V>(key, value, next.getValue().recordContext()));
                delegate.remove();
                this.index.remove(next.getKey().key);
                this.dirtyKeys.add(next.getKey().key);
                this.memBufferSize -= InMemoryTimeOrderedKeyValueBuffer.computeRecordSize(next.getKey().key, next.getValue());
                if (delegate.hasNext()) {
                    next = delegate.next();
                    this.minTimestamp = next == null ? Long.MAX_VALUE : next.getKey().time;
                } else {
                    next = null;
                    this.minTimestamp = Long.MAX_VALUE;
                }
                ++evictions;
            }
        }
        if (evictions > 0) {
            this.updateBufferMetrics();
        }
    }

    @Override
    public void put(long time, K key, V value, ProcessorRecordContext recordContext) {
        Objects.requireNonNull(value, "value cannot be null");
        Objects.requireNonNull(recordContext, "recordContext cannot be null");
        Bytes serializedKey = Bytes.wrap(this.keySerde.serializer().serialize(this.changelogTopic, key));
        byte[] serializedValue = this.valueSerde.serializer().serialize(this.changelogTopic, value);
        this.cleanPut(time, serializedKey, new ContextualRecord(serializedValue, recordContext));
        this.dirtyKeys.add(serializedKey);
        this.updateBufferMetrics();
    }

    private void cleanPut(long time, Bytes key, ContextualRecord value) {
        BufferKey previousKey = this.index.get(key);
        if (previousKey == null) {
            BufferKey nextKey = new BufferKey(time, key);
            this.index.put(key, nextKey);
            this.sortedMap.put(nextKey, value);
            this.minTimestamp = Math.min(this.minTimestamp, time);
            this.memBufferSize += InMemoryTimeOrderedKeyValueBuffer.computeRecordSize(key, value);
        } else {
            ContextualRecord removedValue = this.sortedMap.put(previousKey, value);
            this.memBufferSize = this.memBufferSize + InMemoryTimeOrderedKeyValueBuffer.computeRecordSize(key, value) - (removedValue == null ? 0L : InMemoryTimeOrderedKeyValueBuffer.computeRecordSize(key, removedValue));
        }
    }

    @Override
    public int numRecords() {
        return this.index.size();
    }

    @Override
    public long bufferSize() {
        return this.memBufferSize;
    }

    @Override
    public long minTimestamp() {
        return this.minTimestamp;
    }

    private static long computeRecordSize(Bytes key, ContextualRecord value) {
        long size = 0L;
        size += 8L;
        size += (long)key.get().length;
        if (value != null) {
            size += value.sizeBytes();
        }
        return size;
    }

    private void updateBufferMetrics() {
        this.bufferSizeSensor.record(this.memBufferSize);
        this.bufferCountSensor.record(this.index.size());
    }

    public String toString() {
        return "InMemoryTimeOrderedKeyValueBuffer{storeName='" + this.storeName + '\'' + ", changelogTopic='" + this.changelogTopic + '\'' + ", open=" + this.open + ", loggingEnabled=" + this.loggingEnabled + ", minTimestamp=" + this.minTimestamp + ", memBufferSize=" + this.memBufferSize + ", \n\tdirtyKeys=" + this.dirtyKeys + ", \n\tindex=" + this.index + ", \n\tsortedMap=" + this.sortedMap + '}';
    }

    private static final class BufferKey
    implements Comparable<BufferKey> {
        private final long time;
        private final Bytes key;

        private BufferKey(long time, Bytes key) {
            this.time = time;
            this.key = key;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            BufferKey bufferKey = (BufferKey)o;
            return this.time == bufferKey.time && Objects.equals(this.key, bufferKey.key);
        }

        public int hashCode() {
            return Objects.hash(this.time, this.key);
        }

        @Override
        public int compareTo(BufferKey o) {
            int timeComparison = Long.compare(this.time, o.time);
            return timeComparison == 0 ? this.key.compareTo(o.key) : timeComparison;
        }

        public String toString() {
            return "BufferKey{key=" + this.key + ", time=" + this.time + '}';
        }
    }

    public static class Builder<K, V>
    implements StoreBuilder<StateStore> {
        private final String storeName;
        private final Serde<K> keySerde;
        private final Serde<V> valSerde;
        private boolean loggingEnabled = true;

        public Builder(String storeName, Serde<K> keySerde, Serde<V> valSerde) {
            this.storeName = storeName;
            this.keySerde = keySerde;
            this.valSerde = valSerde;
        }

        @Override
        public StoreBuilder<StateStore> withCachingEnabled() {
            return this;
        }

        @Override
        public StoreBuilder<StateStore> withCachingDisabled() {
            return this;
        }

        @Override
        public StoreBuilder<StateStore> withLoggingEnabled(Map<String, String> config) {
            throw new UnsupportedOperationException();
        }

        @Override
        public StoreBuilder<StateStore> withLoggingDisabled() {
            this.loggingEnabled = false;
            return this;
        }

        @Override
        public InMemoryTimeOrderedKeyValueBuffer<K, V> build() {
            return new InMemoryTimeOrderedKeyValueBuffer(this.storeName, this.loggingEnabled, this.keySerde, this.valSerde);
        }

        @Override
        public Map<String, String> logConfig() {
            return Collections.emptyMap();
        }

        @Override
        public boolean loggingEnabled() {
            return this.loggingEnabled;
        }

        @Override
        public String name() {
            return this.storeName;
        }
    }
}

