package io.streamnative.oxia.client.session;

import com.google.common.annotations.VisibleForTesting;
import io.streamnative.oxia.client.ClientConfig;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.metrics.SessionMetrics;
import io.streamnative.oxia.proto.CloseSessionRequest;
import io.streamnative.oxia.proto.CreateSessionRequest;
import io.streamnative.oxia.proto.CreateSessionResponse;
import io.streamnative.oxia.proto.SessionHeartbeat;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Function;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

/* loaded from: input_file:io/streamnative/oxia/client/session/Session.class */
public class Session implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Session.class);

    @NonNull
    private final Function<Long, OxiaStub> stubByShardId;

    @NonNull
    private final Duration sessionTimeout;

    @NonNull
    private final Duration heartbeatInterval;

    @VisibleForTesting
    private final long shardId;
    private final long sessionId;

    @NonNull
    private final SessionHeartbeat heartbeat;

    @NonNull
    private final SessionMetrics metrics;
    private Scheduler scheduler;
    private Disposable keepAliveSubscription;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/oxia/client/session/Session$Factory.class */
    public static class Factory {

        @NonNull
        ClientConfig config;

        @NonNull
        Function<Long, OxiaStub> stubByShardId;

        @NonNull
        SessionMetrics metrics;

        /* JADX INFO: Access modifiers changed from: package-private */
        @NonNull
        public Session create(long j) {
            OxiaStub apply = this.stubByShardId.apply(Long.valueOf(j));
            CreateSessionResponse block = apply.reactor().createSession(CreateSessionRequest.newBuilder().setSessionTimeoutMs((int) this.config.sessionTimeout().toMillis()).setShardId(j).setClientIdentity(this.config.clientIdentifier()).build()).block();
            if (block == null) {
                throw new IllegalStateException("Empty session returned for shardId: " + j);
            }
            return new Session(this.stubByShardId, this.config, j, block.getSessionId(), this.metrics);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Factory(@NonNull ClientConfig clientConfig, @NonNull Function<Long, OxiaStub> function, @NonNull SessionMetrics sessionMetrics) {
            if (clientConfig == null) {
                throw new NullPointerException("config is marked non-null but is null");
            }
            if (function == null) {
                throw new NullPointerException("stubByShardId is marked non-null but is null");
            }
            if (sessionMetrics == null) {
                throw new NullPointerException("metrics is marked non-null but is null");
            }
            this.config = clientConfig;
            this.stubByShardId = function;
            this.metrics = sessionMetrics;
        }
    }

    Session(@NonNull Function<Long, OxiaStub> function, @NonNull ClientConfig clientConfig, long j, long j2, SessionMetrics sessionMetrics) {
        this(function, clientConfig.sessionTimeout(), Duration.ofMillis(Math.max(clientConfig.sessionTimeout().toMillis() / 10, Duration.ofSeconds(2L).toMillis())), j, j2, SessionHeartbeat.newBuilder().setShardId(j).setSessionId(j2).build(), sessionMetrics);
        if (function == null) {
            throw new NullPointerException("stubByShardId is marked non-null but is null");
        }
        if (clientConfig == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        this.scheduler = Schedulers.newSingle(String.format("session-[id=%s,shard=%s]-keep-alive", Long.valueOf(j2), Long.valueOf(j)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        Flux publishOn = Mono.just(this.heartbeat).repeat().delayElements(this.heartbeatInterval).flatMap(sessionHeartbeat -> {
            return this.stubByShardId.apply(Long.valueOf(this.shardId)).reactor().keepAlive(sessionHeartbeat);
        }).retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100L)).doBeforeRetry(retrySignal -> {
            log.warn("Retrying sending keep-alives for session [id={},shard={}] - {}", Long.valueOf(this.sessionId), Long.valueOf(this.shardId), retrySignal);
        })).timeout(this.sessionTimeout).publishOn(this.scheduler);
        SessionMetrics sessionMetrics = this.metrics;
        Objects.requireNonNull(sessionMetrics);
        this.keepAliveSubscription = publishOn.doOnEach(sessionMetrics::recordKeepAlive).doOnError(th -> {
            log.warn("Session keep-alive error: [id={},shard={}]", Long.valueOf(this.sessionId), Long.valueOf(this.shardId), th);
        }).subscribe();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.keepAliveSubscription.dispose();
        OxiaStub apply = this.stubByShardId.apply(Long.valueOf(this.shardId));
        apply.reactor().closeSession(CloseSessionRequest.newBuilder().setShardId(this.shardId).setSessionId(this.sessionId).build()).block();
        this.scheduler.dispose();
    }

    Session(@NonNull Function<Long, OxiaStub> function, @NonNull Duration duration, @NonNull Duration duration2, long j, long j2, @NonNull SessionHeartbeat sessionHeartbeat, @NonNull SessionMetrics sessionMetrics) {
        if (function == null) {
            throw new NullPointerException("stubByShardId is marked non-null but is null");
        }
        if (duration == null) {
            throw new NullPointerException("sessionTimeout is marked non-null but is null");
        }
        if (duration2 == null) {
            throw new NullPointerException("heartbeatInterval is marked non-null but is null");
        }
        if (sessionHeartbeat == null) {
            throw new NullPointerException("heartbeat is marked non-null but is null");
        }
        if (sessionMetrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        this.stubByShardId = function;
        this.sessionTimeout = duration;
        this.heartbeatInterval = duration2;
        this.shardId = j;
        this.sessionId = j2;
        this.heartbeat = sessionHeartbeat;
        this.metrics = sessionMetrics;
    }

    long getShardId() {
        return this.shardId;
    }

    public long getSessionId() {
        return this.sessionId;
    }
}
