/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.FilteredCacheIterator;
import org.apache.kafka.streams.state.internals.HasNextCondition;
import org.apache.kafka.streams.state.internals.LRUCacheEntry;
import org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreIterator;
import org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreKeyValueIterator;
import org.apache.kafka.streams.state.internals.PeekingKeyValueIterator;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.SegmentedCacheFunction;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

class CachingWindowStore<K, V>
extends WrappedStateStore<WindowStore<Bytes, byte[]>>
implements WindowStore<Bytes, byte[]>,
CachedStateStore<Windowed<K>, V> {
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private final long windowSize;
    private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema();
    private String name;
    private ThreadCache cache;
    private boolean sendOldValues;
    private StateSerdes<K, V> serdes;
    private InternalProcessorContext context;
    private StateSerdes<Bytes, byte[]> bytesSerdes;
    private CacheFlushListener<Windowed<K>, V> flushListener;
    private final SegmentedCacheFunction cacheFunction;

    CachingWindowStore(WindowStore<Bytes, byte[]> underlying, Serde<K> keySerde, Serde<V> valueSerde, long windowSize, long segmentInterval) {
        super(underlying);
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
        this.windowSize = windowSize;
        this.cacheFunction = new SegmentedCacheFunction(this.keySchema, segmentInterval);
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.initInternal((InternalProcessorContext)context);
        super.init(context, root);
    }

    private void initInternal(InternalProcessorContext context) {
        this.context = context;
        String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), this.name());
        this.serdes = new StateSerdes(topic, this.keySerde == null ? context.keySerde() : this.keySerde, this.valueSerde == null ? context.valueSerde() : this.valueSerde);
        this.bytesSerdes = new StateSerdes<Bytes, byte[]>(topic, Serdes.Bytes(), Serdes.ByteArray());
        this.name = context.taskId() + "-" + this.name();
        this.cache = this.context.getCache();
        this.cache.addDirtyEntryFlushListener(this.name, entries -> {
            for (ThreadCache.DirtyEntry entry : entries) {
                this.putAndMaybeForward(entry, context);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putAndMaybeForward(ThreadCache.DirtyEntry entry, InternalProcessorContext context) {
        byte[] binaryWindowKey = this.cacheFunction.key(entry.key()).get();
        Windowed<Bytes> windowedKeyBytes = WindowKeySchema.fromStoreBytesKey(binaryWindowKey, this.windowSize);
        long windowStartTimestamp = windowedKeyBytes.window().start();
        Bytes key = windowedKeyBytes.key();
        if (this.flushListener != null) {
            byte[] oldValueBytes;
            byte[] newValueBytes = entry.newValue();
            byte[] byArray = oldValueBytes = newValueBytes == null || this.sendOldValues ? (byte[])((WindowStore)this.wrapped()).fetch(key, windowStartTimestamp) : null;
            if (newValueBytes != null || oldValueBytes != null) {
                Windowed<K> windowedKey = WindowKeySchema.fromStoreKey(windowedKeyBytes, this.serdes.keyDeserializer(), this.serdes.topic());
                Object newValue = newValueBytes != null ? (Object)this.serdes.valueFrom(newValueBytes) : null;
                Object oldValue = this.sendOldValues && oldValueBytes != null ? (Object)this.serdes.valueFrom(oldValueBytes) : null;
                ((WindowStore)this.wrapped()).put(key, entry.newValue(), windowStartTimestamp);
                ProcessorRecordContext current = context.recordContext();
                context.setRecordContext(entry.entry().context());
                try {
                    this.flushListener.apply(windowedKey, newValue, oldValue, entry.entry().context().timestamp());
                }
                finally {
                    context.setRecordContext(current);
                }
            }
        } else {
            ((WindowStore)this.wrapped()).put(key, entry.newValue(), windowStartTimestamp);
        }
    }

    @Override
    public void setFlushListener(CacheFlushListener<Windowed<K>, V> flushListener, boolean sendOldValues) {
        this.flushListener = flushListener;
        this.sendOldValues = sendOldValues;
    }

    @Override
    public synchronized void flush() {
        this.cache.flush(this.name);
        ((WindowStore)this.wrapped()).flush();
    }

    @Override
    public void close() {
        this.flush();
        this.cache.close(this.name);
        ((WindowStore)this.wrapped()).close();
    }

    @Override
    public synchronized void put(Bytes key, byte[] value) {
        this.put(key, value, this.context.timestamp());
    }

    @Override
    public synchronized void put(Bytes key, byte[] value, long windowStartTimestamp) {
        this.validateStoreOpen();
        Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0);
        LRUCacheEntry entry = new LRUCacheEntry(value, this.context.headers(), true, this.context.offset(), this.context.timestamp(), this.context.partition(), this.context.topic());
        this.cache.put(this.name, this.cacheFunction.cacheKey(keyBytes), entry);
    }

    @Override
    public byte[] fetch(Bytes key, long timestamp) {
        this.validateStoreOpen();
        Bytes bytesKey = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0);
        Bytes cacheKey = this.cacheFunction.cacheKey(bytesKey);
        if (this.cache == null) {
            return (byte[])((WindowStore)this.wrapped()).fetch(key, timestamp);
        }
        LRUCacheEntry entry = this.cache.get(this.name, cacheKey);
        if (entry == null) {
            return (byte[])((WindowStore)this.wrapped()).fetch(key, timestamp);
        }
        return entry.value();
    }

    @Override
    public synchronized WindowStoreIterator<byte[]> fetch(Bytes key, long timeFrom, long timeTo) {
        this.validateStoreOpen();
        WindowStoreIterator<byte[]> underlyingIterator = ((WindowStore)this.wrapped()).fetch(key, timeFrom, timeTo);
        if (this.cache == null) {
            return underlyingIterator;
        }
        Bytes cacheKeyFrom = this.cacheFunction.cacheKey(this.keySchema.lowerRangeFixedSize(key, timeFrom));
        Bytes cacheKeyTo = this.cacheFunction.cacheKey(this.keySchema.upperRangeFixedSize(key, timeTo));
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.range(this.name, cacheKeyFrom, cacheKeyTo);
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(key, key, timeFrom, timeTo);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheWindowStoreIterator((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)filteredCacheIterator, (KeyValueIterator<Long, byte[]>)underlyingIterator);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes from, Bytes to, long timeFrom, long timeTo) {
        this.validateStoreOpen();
        KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = ((WindowStore)this.wrapped()).fetch(from, to, timeFrom, timeTo);
        if (this.cache == null) {
            return underlyingIterator;
        }
        Bytes cacheKeyFrom = this.cacheFunction.cacheKey(this.keySchema.lowerRange(from, timeFrom));
        Bytes cacheKeyTo = this.cacheFunction.cacheKey(this.keySchema.upperRange(to, timeTo));
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.range(this.name, cacheKeyFrom, cacheKeyTo);
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(from, to, timeFrom, timeTo);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheWindowStoreKeyValueIterator(filteredCacheIterator, underlyingIterator, this.bytesSerdes, this.windowSize, this.cacheFunction);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
        this.validateStoreOpen();
        KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = ((WindowStore)this.wrapped()).all();
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.all(this.name);
        return new MergedSortedCacheWindowStoreKeyValueIterator(cacheIterator, underlyingIterator, this.bytesSerdes, this.windowSize, this.cacheFunction);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(long timeFrom, long timeTo) {
        this.validateStoreOpen();
        KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = ((WindowStore)this.wrapped()).fetchAll(timeFrom, timeTo);
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.all(this.name);
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(null, null, timeFrom, timeTo);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheWindowStoreKeyValueIterator(filteredCacheIterator, underlyingIterator, this.bytesSerdes, this.windowSize, this.cacheFunction);
    }
}

