package io.pravega.client.stream.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.Transaction;
import io.pravega.common.Exceptions;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.io.grpc.Status;
import io.pravega.shaded.io.grpc.StatusRuntimeException;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/stream/impl/Pinger.class */
public class Pinger implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(Pinger.class);
    private final Stream stream;
    private final Controller controller;
    private final long txnLeaseMillis;
    private final ScheduledExecutorService executor;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Set<UUID> txnList = new HashSet();

    @GuardedBy("lock")
    @VisibleForTesting
    private final Set<UUID> completedTxns = new HashSet();
    private final AtomicBoolean isStarted = new AtomicBoolean();
    private final AtomicReference<ScheduledFuture<?>> scheduledFuture = new AtomicReference<>();
    private final long pingIntervalMillis = getPingInterval();

    /* JADX INFO: Access modifiers changed from: package-private */
    public Pinger(long j, Stream stream, Controller controller, ScheduledExecutorService scheduledExecutorService) {
        this.txnLeaseMillis = j;
        this.stream = stream;
        this.controller = controller;
        this.executor = scheduledExecutorService;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startPing(UUID uuid) {
        synchronized (this.lock) {
            this.txnList.add(uuid);
        }
        startPeriodicPingTxn();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopPing(UUID uuid) {
        synchronized (this.lock) {
            this.txnList.remove(uuid);
        }
    }

    private long getPingInterval() {
        return Math.round(this.txnLeaseMillis / Math.max(1.0d, Math.sqrt(this.txnLeaseMillis / 1000.0d)));
    }

    private void startPeriodicPingTxn() {
        if (this.isStarted.getAndSet(true)) {
            return;
        }
        log.info("Starting Pinger at an interval of {}ms ", Long.valueOf(this.pingIntervalMillis));
        this.scheduledFuture.set(this.executor.scheduleAtFixedRate(this::pingTransactions, 10L, this.pingIntervalMillis, TimeUnit.MILLISECONDS));
    }

    private void pingTransactions() {
        log.info("Start sending transaction pings.");
        synchronized (this.lock) {
            this.txnList.removeAll(this.completedTxns);
            this.completedTxns.clear();
            this.txnList.forEach(uuid -> {
                try {
                    log.debug("Sending ping request for txn ID: {} with lease: {}", uuid, Long.valueOf(this.txnLeaseMillis));
                    this.controller.pingTransaction(this.stream, uuid, this.txnLeaseMillis).whenComplete((pingStatus, th) -> {
                        if (th == null) {
                            if (Transaction.PingStatus.ABORTED.equals(pingStatus) || Transaction.PingStatus.COMMITTED.equals(pingStatus)) {
                                this.completedTxns.add(uuid);
                                return;
                            }
                            return;
                        }
                        Throwable unwrap = Exceptions.unwrap(th);
                        if ((unwrap instanceof StatusRuntimeException) && ((StatusRuntimeException) unwrap).getStatus().equals(Status.NOT_FOUND)) {
                            log.info("Ping Transaction for txn ID:{} did not find the transaction");
                            this.completedTxns.add(uuid);
                        }
                        log.warn("Ping Transaction for txn ID:{} failed", uuid, Exceptions.unwrap(th));
                    });
                } catch (Exception e) {
                    log.warn("Encountered exception when attempting to ping transactions", e);
                }
            });
        }
        log.trace("Completed sending transaction pings.");
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        log.info("Closing Pinger periodic task");
        ScheduledFuture<?> andSet = this.scheduledFuture.getAndSet(null);
        if (andSet != null) {
            andSet.cancel(false);
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    Set<UUID> getCompletedTxns() {
        return this.completedTxns;
    }
}
