/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.SessionKeySchema;
import org.apache.kafka.streams.state.internals.WrappedSessionStoreIterator;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

public class RocksDBSessionStore<K, AGG>
extends WrappedStateStore<SegmentedBytesStore>
implements SessionStore<K, AGG> {
    private final Serde<K> keySerde;
    private final Serde<AGG> aggSerde;
    private StateSerdes<K, AGG> serdes;
    private String topic;

    RocksDBSessionStore(SegmentedBytesStore bytesStore, Serde<K> keySerde, Serde<AGG> aggSerde) {
        super(bytesStore);
        this.keySerde = keySerde;
        this.aggSerde = aggSerde;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        String storeName = this.name();
        this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
        this.serdes = new StateSerdes(this.topic, this.keySerde == null ? context.keySerde() : this.keySerde, this.aggSerde == null ? context.valueSerde() : this.aggSerde);
        super.init(context, root);
    }

    @Override
    public KeyValueIterator<Windowed<K>, AGG> findSessions(K key, long earliestSessionEndTime, long latestSessionStartTime) {
        KeyValueIterator<Bytes, byte[]> bytesIterator = ((SegmentedBytesStore)this.wrapped()).fetch(Bytes.wrap(this.serdes.rawKey(key)), earliestSessionEndTime, latestSessionStartTime);
        return new WrappedSessionStoreIterator<K, AGG>(bytesIterator, this.serdes);
    }

    @Override
    public KeyValueIterator<Windowed<K>, AGG> findSessions(K keyFrom, K keyTo, long earliestSessionEndTime, long latestSessionStartTime) {
        KeyValueIterator<Bytes, byte[]> bytesIterator = ((SegmentedBytesStore)this.wrapped()).fetch(Bytes.wrap(this.serdes.rawKey(keyFrom)), Bytes.wrap(this.serdes.rawKey(keyTo)), earliestSessionEndTime, latestSessionStartTime);
        return new WrappedSessionStoreIterator<K, AGG>(bytesIterator, this.serdes);
    }

    @Override
    public AGG fetchSession(K key, long startTime, long endTime) {
        return this.serdes.valueFrom(((SegmentedBytesStore)this.wrapped()).get(SessionKeySchema.toBinary(Bytes.wrap(this.serdes.rawKey(key)), startTime, endTime)));
    }

    @Override
    public KeyValueIterator<Windowed<K>, AGG> fetch(K key) {
        return this.findSessions(key, 0L, Long.MAX_VALUE);
    }

    @Override
    public KeyValueIterator<Windowed<K>, AGG> fetch(K from, K to) {
        return this.findSessions(from, to, 0L, Long.MAX_VALUE);
    }

    @Override
    public void remove(Windowed<K> key) {
        ((SegmentedBytesStore)this.wrapped()).remove(Bytes.wrap(SessionKeySchema.toBinary(key, this.serdes.keySerializer(), this.topic)));
    }

    @Override
    public void put(Windowed<K> sessionKey, AGG aggregate) {
        ((SegmentedBytesStore)this.wrapped()).put(Bytes.wrap(SessionKeySchema.toBinary(sessionKey, this.serdes.keySerializer(), this.topic)), this.serdes.rawValue(aggregate));
    }
}

