package dev.responsive.kafka.internal.stores;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.stores.ResponsiveSessionParams;
import dev.responsive.kafka.internal.utils.Iterators;
import dev.responsive.kafka.internal.utils.SessionKey;
import dev.responsive.kafka.internal.utils.TableName;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/stores/ResponsiveSessionStore.class */
public class ResponsiveSessionStore implements SessionStore<Bytes, byte[]> {
    private final Logger log;
    private final ResponsiveSessionParams params;
    private final TableName name;
    private SessionOperations sessionOperations;
    private boolean open;
    private StateStoreContext context;
    private long observedStreamTime = -1;
    private Position position = Position.emptyPosition();

    public ResponsiveSessionStore(ResponsiveSessionParams responsiveSessionParams) {
        this.name = responsiveSessionParams.name();
        this.params = responsiveSessionParams;
        this.log = new LogContext(String.format("session-store [%s] ", this.name.kafkaName())).logger(ResponsiveSessionStore.class);
    }

    @Deprecated
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        if (!(processorContext instanceof StateStoreContext)) {
            throw new UnsupportedOperationException("Use ResponsiveSessionStore#init(StateStoreContext, StateStore) instead.");
        }
        init((StateStoreContext) processorContext, stateStore);
    }

    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        this.log.info("Initializing state store");
        Map appConfigs = stateStoreContext.appConfigs();
        ResponsiveConfig responsiveConfig = ResponsiveConfig.responsiveConfig(appConfigs);
        this.context = stateStoreContext;
        try {
            this.sessionOperations = SessionOperationsImpl.create(this.name, this.context, this.params, appConfigs, responsiveConfig, sessionKey -> {
                return sessionKey.sessionEndMs >= minValidEndTimestamp();
            });
            this.log.info("Completed initializing state store");
            stateStoreContext.register(stateStore, this.sessionOperations);
            this.open = true;
        } catch (InterruptedException | TimeoutException e) {
            throw new ProcessorStateException("Failed to initialize store.", e);
        }
    }

    public String name() {
        return this.name.kafkaName();
    }

    public boolean persistent() {
        return false;
    }

    public boolean isOpen() {
        return this.open;
    }

    public Position getPosition() {
        return this.position;
    }

    public void flush() {
    }

    public void close() {
        this.sessionOperations.close();
    }

    public void put(Windowed<Bytes> windowed, byte[] bArr) {
        if (windowed.window().end() < minValidEndTimestamp()) {
            return;
        }
        SessionKey sessionKey = new SessionKey((Bytes) windowed.key(), windowed.window().start(), windowed.window().end());
        if (bArr == null) {
            this.sessionOperations.delete(sessionKey);
            return;
        }
        this.sessionOperations.put(sessionKey, bArr);
        this.observedStreamTime = Math.max(this.observedStreamTime, sessionKey.sessionEndMs);
        StoreQueryUtils.updatePosition(this.position, this.context);
    }

    public void remove(Windowed<Bytes> windowed) {
        if (windowed.window().end() < minValidEndTimestamp()) {
            return;
        }
        this.sessionOperations.delete(new SessionKey((Bytes) windowed.key(), windowed.window().start(), windowed.window().end()));
    }

    public byte[] fetchSession(Bytes bytes, long j, long j2) {
        if (j2 < minValidEndTimestamp()) {
            return null;
        }
        return this.sessionOperations.fetch(new SessionKey(bytes, j, j2));
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes bytes, long j, long j2) {
        long max = Long.max(0L, Long.max(j, minValidEndTimestamp()));
        return Iterators.filterKv(this.sessionOperations.fetchAll(bytes, max, Long.max(this.observedStreamTime, 0L)), windowed -> {
            return windowed.window().start() <= j2 && windowed.window().end() >= max;
        });
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes, Bytes bytes2) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    private long minValidEndTimestamp() {
        return (this.observedStreamTime - this.params.retentionPeriod()) + 1;
    }

    public /* bridge */ /* synthetic */ void put(Windowed windowed, Object obj) {
        put((Windowed<Bytes>) windowed, (byte[]) obj);
    }
}
