package tech.ydb.coordination.impl;

import com.google.protobuf.ByteString;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.common.retry.RetryPolicy;
import tech.ydb.coordination.CoordinationSession;
import tech.ydb.coordination.SemaphoreLease;
import tech.ydb.coordination.description.SemaphoreDescription;
import tech.ydb.coordination.description.SemaphoreWatcher;
import tech.ydb.coordination.settings.CoordinationSessionSettings;
import tech.ydb.coordination.settings.DescribeSemaphoreMode;
import tech.ydb.coordination.settings.WatchSemaphoreMode;
import tech.ydb.core.Issue;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:tech/ydb/coordination/impl/SessionImpl.class */
public class SessionImpl implements CoordinationSession {
    private static final Logger logger = LoggerFactory.getLogger(CoordinationSession.class);
    private final Rpc rpc;
    private final Clock clock;
    private final Executor executor;
    private final RetryPolicy retryPolicy;
    private final String nodePath;
    private final Duration connectTimeout;
    private final ByteString protectionKey;
    private final Map<Consumer<CoordinationSession.State>, Consumer<CoordinationSession.State>> listeners = new ConcurrentHashMap();
    private final AtomicReference<SessionState> state = new AtomicReference<>(SessionState.unstarted());

    /* loaded from: input_file:tech/ydb/coordination/impl/SessionImpl$LeaseCreator.class */
    private class LeaseCreator implements Function<Result<Boolean>, Result<SemaphoreLease>> {
        private final String name;

        LeaseCreator(String str) {
            this.name = str;
        }

        @Override // java.util.function.Function
        public Result<SemaphoreLease> apply(Result<Boolean> result) {
            return !result.isSuccess() ? result.map((Function) null) : !((Boolean) result.getValue()).booleanValue() ? Result.fail(Status.of(StatusCode.TIMEOUT)) : Result.success(new LeaseImpl(SessionImpl.this, this.name));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SessionImpl(Rpc rpc, Clock clock, String str, CoordinationSessionSettings coordinationSessionSettings) {
        this.rpc = rpc;
        this.clock = clock;
        this.executor = coordinationSessionSettings.getExecutor() != null ? coordinationSessionSettings.getExecutor() : ForkJoinPool.commonPool();
        this.retryPolicy = coordinationSessionSettings.getRetryPolicy();
        this.nodePath = str;
        this.connectTimeout = coordinationSessionSettings.getConnectTimeout();
        this.protectionKey = createRandomKey();
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public long getId() {
        return this.state.get().getSessionId();
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public CoordinationSession.State getState() {
        return this.state.get().getState();
    }

    public String toString() {
        return this.state.get().toString();
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public void addStateListener(Consumer<CoordinationSession.State> consumer) {
        if (consumer != null) {
            this.listeners.put(consumer, consumer);
        }
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public void removeStateListener(Consumer<CoordinationSession.State> consumer) {
        this.listeners.remove(consumer);
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public CompletableFuture<Status> stop() {
        logger.debug("{} stopped", this);
        SessionState sessionState = this.state.get();
        while (true) {
            SessionState sessionState2 = sessionState;
            if (updateState(sessionState2, SessionState.closed())) {
                return sessionState2.stop();
            }
            sessionState = this.state.get();
        }
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public CompletableFuture<Status> connect() {
        SessionState sessionState = this.state.get();
        Stream stream = new Stream(this.rpc);
        if (updateState(sessionState, makeConnectionState(sessionState, stream))) {
            return connectToSession(stream, 0L).thenApplyAsync(result -> {
                return establishNewSession(result, stream, Collections.emptyList());
            }, this.executor);
        }
        logger.warn("{} cannot be connected with state {}", this, sessionState.getState());
        return CompletableFuture.completedFuture(Status.of(StatusCode.BAD_REQUEST));
    }

    private CompletableFuture<Result<Long>> connectToSession(Stream stream, long j) {
        stream.startStream().whenCompleteAsync((status, th) -> {
            if (th != null) {
                logger.warn("{} stream finished with exception", this, th);
            }
            if (status != null) {
                if (status.isSuccess()) {
                    logger.debug("{} stream finished with status {}", this, status);
                } else {
                    logger.warn("{} stream finished with status {}", this, status);
                }
            }
            ArrayList arrayList = new ArrayList();
            for (StreamMsg<?> streamMsg : stream.getMessages()) {
                if (streamMsg.isIdempotent()) {
                    arrayList.add(streamMsg);
                } else {
                    completeMessageWithBadSession(streamMsg);
                }
            }
            SessionState sessionState = this.state.get();
            if ((sessionState.getState() == CoordinationSession.State.CONNECTED || sessionState.getState() == CoordinationSession.State.RECONNECTING) && sessionState.hasStream(stream)) {
                logger.debug("stream {} starts to recover");
                restoreSession(this.clock.millis(), 0, arrayList);
            } else {
                completeMessagesWithBadSession(arrayList);
                updateState(sessionState, makeLostState(sessionState));
            }
        }, this.executor);
        return stream.sendSessionStart(j, this.nodePath, this.connectTimeout, this.protectionKey);
    }

    private void reconnect(Stream stream, long j, int i, List<StreamMsg<?>> list) {
        SessionState sessionState = this.state.get();
        if (sessionState.getState() == CoordinationSession.State.RECONNECTING && sessionState.hasStream(stream)) {
            connectToSession(stream, sessionState.getSessionId()).whenCompleteAsync((result, th) -> {
                if (result != null && result.isSuccess()) {
                    establishNewSession(result, stream, list);
                    return;
                }
                if (th != null) {
                    logger.warn("{} stream retry {} finished with exception", new Object[]{this, Integer.valueOf(i), th});
                }
                if (result != null) {
                    logger.debug("{} stream retry {} finished with status {}", new Object[]{this, Integer.valueOf(i), result.getStatus()});
                }
                if ((this.state.get().getState() == CoordinationSession.State.RECONNECTING) && sessionState.hasStream(stream)) {
                    restoreSession(j, i + 1, list);
                } else {
                    completeMessagesWithBadSession(list);
                }
            }, this.executor);
        } else {
            completeMessagesWithBadSession(list);
        }
    }

    private void restoreSession(long j, int i, List<StreamMsg<?>> list) {
        SessionState sessionState = this.state.get();
        if (sessionState.getState() != CoordinationSession.State.CONNECTED && sessionState.getState() != CoordinationSession.State.RECONNECTING) {
            completeMessagesWithBadSession(list);
            return;
        }
        long nextRetryMs = this.retryPolicy.nextRetryMs(i, this.clock.millis() - j);
        if (nextRetryMs < 0) {
            logger.debug("stream {} lost connection by retry policy");
            updateState(sessionState, makeLostState(sessionState));
            completeMessagesWithBadSession(list);
            return;
        }
        Stream stream = new Stream(this.rpc);
        if (!updateState(sessionState, makeConnectionState(sessionState, stream))) {
            logger.warn("{} cannot be reconnected with state {}", this, this.state.get().getState());
            completeMessagesWithBadSession(list);
        } else if (nextRetryMs > 0) {
            logger.debug("stream {} shedule next retry {} in {} ms", new Object[]{this, Integer.valueOf(i), Long.valueOf(nextRetryMs)});
            this.rpc.getScheduler().schedule(() -> {
                reconnect(stream, j, i, list);
            }, nextRetryMs, TimeUnit.MILLISECONDS);
        } else {
            logger.debug("stream {} immediatelly retry {}", this, Integer.valueOf(i));
            reconnect(stream, j, i, list);
        }
    }

    private Status establishNewSession(Result<Long> result, Stream stream, List<StreamMsg<?>> list) {
        if (!result.isSuccess()) {
            return result.getStatus();
        }
        SessionState sessionState = this.state.get();
        SessionState makeConnectedState = makeConnectedState(sessionState, ((Long) result.getValue()).longValue(), stream);
        if (!updateState(sessionState, makeConnectedState)) {
            stream.stop();
            return Status.of(StatusCode.CANCELLED, new Issue[]{Issue.of("{} cannot handle successful session", Issue.Severity.ERROR)});
        }
        Iterator<StreamMsg<?>> it = list.iterator();
        while (it.hasNext()) {
            makeConnectedState.sendMessage(it.next());
        }
        return Status.SUCCESS;
    }

    private SessionState makeConnectionState(SessionState sessionState, Stream stream) {
        if (sessionState.getState() == CoordinationSession.State.INITIAL) {
            return SessionState.connecting(stream);
        }
        if (sessionState.getState() == CoordinationSession.State.LOST) {
            return SessionState.reconnecting(stream);
        }
        if (sessionState.getState() == CoordinationSession.State.CONNECTED || sessionState.getState() == CoordinationSession.State.RECONNECTING) {
            return SessionState.disconnected(sessionState, stream);
        }
        return null;
    }

    private SessionState makeConnectedState(SessionState sessionState, long j, Stream stream) {
        if (sessionState.getState() == CoordinationSession.State.CONNECTING && sessionState.hasStream(stream)) {
            return SessionState.connected(sessionState, j);
        }
        if (sessionState.getState() == CoordinationSession.State.RECONNECTING && sessionState.hasStream(stream)) {
            return SessionState.reconnected(sessionState);
        }
        return null;
    }

    private SessionState makeLostState(SessionState sessionState) {
        if (sessionState.getState() == CoordinationSession.State.CONNECTING) {
            return SessionState.unstarted();
        }
        if (sessionState.getState() == CoordinationSession.State.RECONNECTING) {
            return SessionState.lost();
        }
        return null;
    }

    private boolean updateState(SessionState sessionState, SessionState sessionState2) {
        if (sessionState2 == null || !this.state.compareAndSet(sessionState, sessionState2)) {
            return false;
        }
        if (sessionState2.getState() == sessionState.getState()) {
            return true;
        }
        Iterator<Consumer<CoordinationSession.State>> it = this.listeners.values().iterator();
        while (it.hasNext()) {
            it.next().accept(sessionState2.getState());
        }
        return true;
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public CompletableFuture<Status> createSemaphore(String str, long j, byte[] bArr) {
        StreamMsg<?> createSemaphore = StreamMsg.createSemaphore(str, j, bArr);
        this.state.get().sendMessage(createSemaphore);
        return createSemaphore.getResult().thenApplyAsync(Function.identity(), this.executor);
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public CompletableFuture<Status> updateSemaphore(String str, byte[] bArr) {
        StreamMsg<?> updateSemaphore = StreamMsg.updateSemaphore(str, bArr);
        this.state.get().sendMessage(updateSemaphore);
        return updateSemaphore.getResult().thenApplyAsync(Function.identity(), this.executor);
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public CompletableFuture<Status> deleteSemaphore(String str, boolean z) {
        StreamMsg<?> deleteSemaphore = StreamMsg.deleteSemaphore(str, z);
        this.state.get().sendMessage(deleteSemaphore);
        return deleteSemaphore.getResult().thenApplyAsync(Function.identity(), this.executor);
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public CompletableFuture<Result<SemaphoreDescription>> describeSemaphore(String str, DescribeSemaphoreMode describeSemaphoreMode) {
        StreamMsg<?> describeSemaphore = StreamMsg.describeSemaphore(str, describeSemaphoreMode);
        this.state.get().sendMessage(describeSemaphore);
        return describeSemaphore.getResult().thenApplyAsync(Function.identity(), this.executor);
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public CompletableFuture<Result<SemaphoreWatcher>> watchSemaphore(String str, DescribeSemaphoreMode describeSemaphoreMode, WatchSemaphoreMode watchSemaphoreMode) {
        StreamMsg<?> watchSemaphore = StreamMsg.watchSemaphore(str, describeSemaphoreMode, watchSemaphoreMode);
        this.state.get().sendMessage(watchSemaphore);
        return watchSemaphore.getResult().thenApplyAsync(Function.identity(), this.executor);
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public CompletableFuture<Result<SemaphoreLease>> acquireSemaphore(String str, long j, byte[] bArr, Duration duration) {
        StreamMsg<?> acquireSemaphore = StreamMsg.acquireSemaphore(str, j, bArr, false, duration.toMillis());
        this.state.get().sendMessage(acquireSemaphore);
        return acquireSemaphore.getResult().thenApplyAsync((Function<? super Object, ? extends U>) new LeaseCreator(str), this.executor);
    }

    @Override // tech.ydb.coordination.CoordinationSession
    public CompletableFuture<Result<SemaphoreLease>> acquireEphemeralSemaphore(String str, boolean z, byte[] bArr, Duration duration) {
        StreamMsg<?> acquireSemaphore = StreamMsg.acquireSemaphore(str, z ? -1L : 1L, bArr, true, duration.toMillis());
        this.state.get().sendMessage(acquireSemaphore);
        return acquireSemaphore.getResult().thenApplyAsync((Function<? super Object, ? extends U>) new LeaseCreator(str), this.executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Boolean> releaseSemaphore(String str) {
        StreamMsg<?> releaseSemaphore = StreamMsg.releaseSemaphore(str);
        this.state.get().sendMessage(releaseSemaphore);
        return releaseSemaphore.getResult().thenApplyAsync(result -> {
            return Boolean.valueOf(result.isSuccess() && ((Boolean) result.getValue()).booleanValue());
        }, this.executor);
    }

    private static void completeMessageWithBadSession(StreamMsg<?> streamMsg) {
        StreamMsg<?> streamMsg2 = streamMsg;
        while (true) {
            StreamMsg<?> streamMsg3 = streamMsg2;
            if (streamMsg3 == null) {
                return;
            }
            streamMsg3.handleError(Status.of(StatusCode.BAD_SESSION));
            streamMsg2 = streamMsg3.nextMsg();
        }
    }

    private static void completeMessagesWithBadSession(Collection<StreamMsg<?>> collection) {
        Iterator<StreamMsg<?>> it = collection.iterator();
        while (it.hasNext()) {
            completeMessageWithBadSession(it.next());
        }
    }

    private static ByteString createRandomKey() {
        byte[] bArr = new byte[16];
        ThreadLocalRandom.current().nextBytes(bArr);
        return ByteString.copyFrom(bArr);
    }
}
