package tech.ydb.topic.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import tech.ydb.core.Status;

/* loaded from: input_file:tech/ydb/topic/impl/GrpcStreamRetrier.class */
public abstract class GrpcStreamRetrier {
    private static final int MAX_RECONNECT_COUNT = 0;
    private static final int EXP_BACKOFF_BASE_MS = 256;
    private static final int EXP_BACKOFF_CEILING_MS = 40000;
    private static final int EXP_BACKOFF_MAX_POWER = 7;
    private final ScheduledExecutorService scheduler;
    protected final AtomicBoolean isReconnecting = new AtomicBoolean(false);
    protected final AtomicBoolean isStopped = new AtomicBoolean(false);
    protected final AtomicInteger reconnectCounter = new AtomicInteger(MAX_RECONNECT_COUNT);

    /* JADX INFO: Access modifiers changed from: protected */
    public GrpcStreamRetrier(ScheduledExecutorService scheduledExecutorService) {
        this.scheduler = scheduledExecutorService;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract Logger getLogger();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract String getStreamName();

    protected abstract void onStreamReconnect();

    protected abstract void onStreamFinished();

    protected abstract void onShutdown(String str);

    private void tryScheduleReconnect() {
        int i = this.reconnectCounter.get() + 1;
        if (!this.isReconnecting.compareAndSet(false, true)) {
            getLogger().info("should reconnect {} stream, but reconnect is already in progress", getStreamName());
            return;
        }
        this.reconnectCounter.set(i);
        int i2 = i <= EXP_BACKOFF_MAX_POWER ? EXP_BACKOFF_BASE_MS * (1 << i) : EXP_BACKOFF_CEILING_MS;
        int nextInt = i2 + ThreadLocalRandom.current().nextInt(i2);
        getLogger().warn("Retry #{}. Scheduling {} reconnect in {}ms...", new Object[]{Integer.valueOf(i), getStreamName(), Integer.valueOf(nextInt)});
        try {
            this.scheduler.schedule(this::reconnect, nextInt, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            String str = "Couldn't schedule reconnect: scheduler is already shut down. Shutting down " + getStreamName();
            getLogger().error(str);
            shutdownImpl(str);
        }
    }

    void reconnect() {
        getLogger().info("{} reconnect #{} started", Integer.valueOf(this.reconnectCounter.get()), getStreamName());
        if (!this.isReconnecting.compareAndSet(true, false)) {
            getLogger().warn("Couldn't reset reconnect flag. Shouldn't happen");
        }
        onStreamReconnect();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> shutdownImpl() {
        return shutdownImpl("");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> shutdownImpl(String str) {
        getLogger().info("Shutting down {}" + ((str == null || str.isEmpty()) ? "" : " with reason: " + str), getStreamName());
        this.isStopped.set(true);
        return CompletableFuture.runAsync(() -> {
            onShutdown(str);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void completeSession(Status status, Throwable th) {
        getLogger().info("CompleteSession called");
        onStreamFinished();
        if (th != null) {
            getLogger().error("Exception in {} stream session: ", getStreamName(), th);
        } else if (!status.isSuccess()) {
            getLogger().warn("Error in {} stream session: {}", getStreamName(), status);
        } else {
            if (this.isStopped.get()) {
                getLogger().info("{} stream session closed successfully", getStreamName());
                return;
            }
            getLogger().warn("{} stream session was closed unexpectedly", getStreamName());
        }
        if (this.isStopped.get()) {
            getLogger().info(" {} is already stopped, no need to schedule reconnect", getStreamName());
        } else {
            tryScheduleReconnect();
        }
    }
}
