package io.ray.streaming.state.strategy;

import io.ray.streaming.state.StateException;
import io.ray.streaming.state.StateStoreManager;
import io.ray.streaming.state.StorageRecord;
import io.ray.streaming.state.serialization.Serializer;
import io.ray.streaming.state.store.KeyValueStore;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/* loaded from: input_file:io/ray/streaming/state/strategy/AbstractStateStoreManager.class */
public abstract class AbstractStateStoreManager<V> implements StateStoreManager {
    protected KeyValueStore<String, Map<Long, byte[]>> kvStore;
    protected Map<String, StorageRecord<V>> frontStore = new ConcurrentHashMap();
    protected Map<Long, Map<String, byte[]>> middleStore = new ConcurrentHashMap();
    protected int keyGroupIndex = -1;

    public AbstractStateStoreManager(KeyValueStore<String, Map<Long, byte[]>> keyValueStore) {
        this.kvStore = keyValueStore;
    }

    public byte[] toBytes(StorageRecord storageRecord) {
        return Serializer.object2Bytes(storageRecord);
    }

    public StorageRecord<V> toStorageRecord(byte[] bArr) {
        return (StorageRecord) Serializer.bytes2Object(bArr);
    }

    public abstract V get(long j, String str);

    public void put(long j, String str, V v) {
        this.frontStore.put(str, new StorageRecord<>(j, v));
    }

    @Override // io.ray.streaming.state.StateStoreManager
    public void ackCommit(long j, long j2) {
        ackCommit(j);
    }

    public abstract void ackCommit(long j);

    public void setKeyGroupIndex(int i) {
        this.keyGroupIndex = i;
    }

    public void close() {
        this.frontStore.clear();
        this.middleStore.clear();
        if (this.kvStore != null) {
            this.kvStore.clearCache();
            try {
                this.kvStore.close();
            } catch (IOException e) {
                throw new StateException(e);
            }
        }
    }
}
