/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
import org.apache.kafka.streams.state.internals.LRUCacheEntry;
import org.apache.kafka.streams.state.internals.MergedSortedCacheKeyValueBytesStoreIterator;
import org.apache.kafka.streams.state.internals.PeekingKeyValueIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

class CachingKeyValueStore<K, V>
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>>
implements KeyValueStore<Bytes, byte[]>,
CachedStateStore<K, V> {
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private CacheFlushListener<K, V> flushListener;
    private boolean sendOldValues;
    private String cacheName;
    private ThreadCache cache;
    private InternalProcessorContext context;
    private StateSerdes<K, V> serdes;
    private Thread streamThread;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    CachingKeyValueStore(KeyValueStore<Bytes, byte[]> underlying, Serde<K> keySerde, Serde<V> valueSerde) {
        super(underlying);
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.initInternal(context);
        super.init(context, root);
        this.streamThread = Thread.currentThread();
    }

    private void initInternal(ProcessorContext context) {
        this.context = (InternalProcessorContext)context;
        this.serdes = new StateSerdes(ProcessorStateManager.storeChangelogTopic(context.applicationId(), this.name()), this.keySerde == null ? context.keySerde() : this.keySerde, this.valueSerde == null ? context.valueSerde() : this.valueSerde);
        this.cache = this.context.getCache();
        this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), this.name());
        this.cache.addDirtyEntryFlushListener(this.cacheName, entries -> {
            for (ThreadCache.DirtyEntry entry : entries) {
                this.putAndMaybeForward(entry, (InternalProcessorContext)context);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putAndMaybeForward(ThreadCache.DirtyEntry entry, InternalProcessorContext context) {
        if (this.flushListener != null) {
            byte[] oldValueBytes;
            byte[] newValueBytes = entry.newValue();
            byte[] byArray = oldValueBytes = newValueBytes == null || this.sendOldValues ? (byte[])((KeyValueStore)this.wrapped()).get(entry.key()) : null;
            if (newValueBytes != null || oldValueBytes != null) {
                K key = this.serdes.keyFrom(entry.key().get());
                Object newValue = newValueBytes != null ? (Object)this.serdes.valueFrom(newValueBytes) : null;
                Object oldValue = this.sendOldValues && oldValueBytes != null ? (Object)this.serdes.valueFrom(oldValueBytes) : null;
                ((KeyValueStore)this.wrapped()).put(entry.key(), entry.newValue());
                ProcessorRecordContext current = context.recordContext();
                context.setRecordContext(entry.entry().context());
                try {
                    this.flushListener.apply(key, newValue, oldValue, entry.entry().context().timestamp());
                }
                finally {
                    context.setRecordContext(current);
                }
            }
        } else {
            ((KeyValueStore)this.wrapped()).put(entry.key(), entry.newValue());
        }
    }

    @Override
    public void setFlushListener(CacheFlushListener<K, V> flushListener, boolean sendOldValues) {
        this.flushListener = flushListener;
        this.sendOldValues = sendOldValues;
    }

    @Override
    public void flush() {
        this.lock.writeLock().lock();
        try {
            this.cache.flush(this.cacheName);
            super.flush();
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public void close() {
        try {
            this.flush();
        }
        finally {
            try {
                super.close();
            }
            finally {
                this.cache.close(this.cacheName);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public byte[] get(Bytes key) {
        Objects.requireNonNull(key, "key cannot be null");
        this.validateStoreOpen();
        Lock theLock = Thread.currentThread().equals(this.streamThread) ? this.lock.writeLock() : this.lock.readLock();
        theLock.lock();
        try {
            byte[] byArray = this.getInternal(key);
            return byArray;
        }
        finally {
            theLock.unlock();
        }
    }

    private byte[] getInternal(Bytes key) {
        LRUCacheEntry entry = null;
        if (this.cache != null) {
            entry = this.cache.get(this.cacheName, key);
        }
        if (entry == null) {
            byte[] rawValue = (byte[])((KeyValueStore)this.wrapped()).get(key);
            if (rawValue == null) {
                return null;
            }
            if (Thread.currentThread().equals(this.streamThread)) {
                this.cache.put(this.cacheName, key, new LRUCacheEntry(rawValue));
            }
            return rawValue;
        }
        return entry.value();
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to) {
        this.validateStoreOpen();
        KeyValueIterator<Bytes, byte[]> storeIterator = ((KeyValueStore)this.wrapped()).range(from, to);
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.range(this.cacheName, from, to);
        return new MergedSortedCacheKeyValueBytesStoreIterator((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)cacheIterator, storeIterator);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> all() {
        this.validateStoreOpen();
        DelegatingPeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<Bytes, byte[]>(this.name(), ((KeyValueStore)this.wrapped()).all());
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.all(this.cacheName);
        return new MergedSortedCacheKeyValueBytesStoreIterator((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)cacheIterator, (KeyValueIterator<Bytes, byte[]>)storeIterator);
    }

    @Override
    public long approximateNumEntries() {
        this.validateStoreOpen();
        this.lock.readLock().lock();
        try {
            long l = ((KeyValueStore)this.wrapped()).approximateNumEntries();
            return l;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public void put(Bytes key, byte[] value) {
        Objects.requireNonNull(key, "key cannot be null");
        this.validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            this.putInternal(key, value);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private void putInternal(Bytes key, byte[] value) {
        this.cache.put(this.cacheName, key, new LRUCacheEntry(value, this.context.headers(), true, this.context.offset(), this.context.timestamp(), this.context.partition(), this.context.topic()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public byte[] putIfAbsent(Bytes key, byte[] value) {
        Objects.requireNonNull(key, "key cannot be null");
        this.validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            byte[] v = this.getInternal(key);
            if (v == null) {
                this.putInternal(key, value);
            }
            byte[] byArray = v;
            return byArray;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void putAll(List<KeyValue<Bytes, byte[]>> entries) {
        this.validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            for (KeyValue<Bytes, byte[]> entry : entries) {
                Objects.requireNonNull(entry.key, "key cannot be null");
                this.put((Bytes)entry.key, (byte[])entry.value);
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public byte[] delete(Bytes key) {
        Objects.requireNonNull(key, "key cannot be null");
        this.validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            byte[] byArray = this.deleteInternal(key);
            return byArray;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private byte[] deleteInternal(Bytes key) {
        byte[] v = this.getInternal(key);
        this.putInternal(key, null);
        return v;
    }
}

