package org.opendaylight.controller.cluster.access.client;

import akka.actor.ActorRef;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.checkerframework.checker.lock.qual.Holding;
import org.opendaylight.controller.cluster.access.client.BackendInfo;
import org.opendaylight.controller.cluster.access.client.TransmitQueue;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/opendaylight/controller/cluster/access/client/AbstractClientConnection.class */
public abstract class AbstractClientConnection<T extends BackendInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractClientConnection.class);
    public static final long DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
    public static final long DEFAULT_REQUEST_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(2);
    public static final long DEFAULT_NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
    private static final long DEBUG_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
    private static final long MAX_DELAY_SECONDS = 5;
    private static final long MAX_DELAY_NANOS = TimeUnit.SECONDS.toNanos(MAX_DELAY_SECONDS);
    private final Lock lock;
    private final ClientActorContext context;
    private final TransmitQueue queue;
    private final Long cookie;
    private final String backendName;
    private boolean haveTimer;
    private long lastReceivedTicks;
    private volatile RequestException poisoned;

    private AbstractClientConnection(AbstractClientConnection<T> abstractClientConnection, TransmitQueue transmitQueue, String str) {
        this.lock = new ReentrantLock();
        this.context = (ClientActorContext) Objects.requireNonNull(abstractClientConnection.context);
        this.cookie = (Long) Objects.requireNonNull(abstractClientConnection.cookie);
        this.backendName = (String) Objects.requireNonNull(str);
        this.queue = (TransmitQueue) Objects.requireNonNull(transmitQueue);
        this.lastReceivedTicks = abstractClientConnection.lastReceivedTicks;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractClientConnection(ClientActorContext clientActorContext, Long l, String str, int i) {
        this.lock = new ReentrantLock();
        this.context = (ClientActorContext) Objects.requireNonNull(clientActorContext);
        this.cookie = (Long) Objects.requireNonNull(l);
        this.backendName = (String) Objects.requireNonNull(str);
        this.queue = new TransmitQueue.Halted(i);
        this.lastReceivedTicks = currentTime();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractClientConnection(AbstractClientConnection<T> abstractClientConnection) {
        this(abstractClientConnection, new TransmitQueue.Halted(abstractClientConnection.queue, abstractClientConnection.currentTime()), abstractClientConnection.backendName);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractClientConnection(AbstractClientConnection<T> abstractClientConnection, T t, int i) {
        this(abstractClientConnection, new TransmitQueue.Transmitting(abstractClientConnection.queue, i, t, abstractClientConnection.currentTime(), ((ClientActorContext) Objects.requireNonNull(abstractClientConnection.context)).messageSlicer()), t.getName());
    }

    public final ClientActorContext context() {
        return this.context;
    }

    public final Long cookie() {
        return this.cookie;
    }

    public final ActorRef localActor() {
        return this.context.self();
    }

    public final long currentTime() {
        return this.context.ticker().read();
    }

    public final void sendRequest(Request<?, ?> request, Consumer<Response<?, ?>> consumer) {
        long currentTime = currentTime();
        sendEntry(new ConnectionEntry(request, consumer, currentTime), currentTime);
    }

    public final void enqueueRequest(Request<?, ?> request, Consumer<Response<?, ?>> consumer, long j) {
        enqueueEntry(new ConnectionEntry(request, consumer, j), currentTime());
    }

    private long enqueueOrForward(ConnectionEntry connectionEntry, long j) {
        this.lock.lock();
        try {
            commonEnqueue(connectionEntry, j);
            long enqueueOrForward = this.queue.enqueueOrForward(connectionEntry, j);
            this.lock.unlock();
            return enqueueOrForward;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public final void enqueueEntry(ConnectionEntry connectionEntry, long j) {
        this.lock.lock();
        try {
            commonEnqueue(connectionEntry, j);
            this.queue.enqueueOrReplay(connectionEntry, j);
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Holding({"lock"})
    private void commonEnqueue(ConnectionEntry connectionEntry, long j) {
        RequestException requestException = this.poisoned;
        if (requestException != null) {
            throw new IllegalStateException("Connection " + this + " has been poisoned", requestException);
        }
        if (this.queue.isEmpty()) {
            scheduleTimer((connectionEntry.getEnqueuedTicks() + this.context.config().getRequestTimeout()) - j);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void cancelDebt() {
        this.queue.cancelDebt(currentTime());
    }

    public abstract Optional<T> getBackendInfo();

    /* JADX INFO: Access modifiers changed from: package-private */
    public final Collection<ConnectionEntry> startReplay() {
        this.lock.lock();
        return this.queue.drain();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Holding({"lock"})
    public final void finishReplay(ReconnectForwarder reconnectForwarder) {
        setForwarder(reconnectForwarder);
        this.lastReceivedTicks = currentTime();
        this.lock.unlock();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Holding({"lock"})
    public final void setForwarder(ReconnectForwarder reconnectForwarder) {
        this.queue.setForwarder(reconnectForwarder, currentTime());
    }

    @Holding({"lock"})
    abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> clientActorBehavior, RequestException requestException);

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void sendEntry(ConnectionEntry connectionEntry, long j) {
        long enqueueOrForward = enqueueOrForward(connectionEntry, j);
        try {
            if (enqueueOrForward >= DEBUG_DELAY_NANOS) {
                if (enqueueOrForward > MAX_DELAY_NANOS) {
                    LOG.info("Capping {} throttle delay from {} to {} seconds", new Object[]{this, Long.valueOf(TimeUnit.NANOSECONDS.toSeconds(enqueueOrForward)), Long.valueOf(MAX_DELAY_SECONDS), new Throwable()});
                    enqueueOrForward = MAX_DELAY_NANOS;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{}: Sleeping for {}ms on connection {}", new Object[]{this.context.persistenceId(), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(enqueueOrForward)), this});
                }
            }
            TimeUnit.NANOSECONDS.sleep(enqueueOrForward);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.debug("Interrupted after sleeping {}ns", Long.valueOf(currentTime() - j), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final ClientActorBehavior<T> reconnect(ClientActorBehavior<T> clientActorBehavior, RequestException requestException) {
        this.lock.lock();
        try {
            ClientActorBehavior<T> lockedReconnect = lockedReconnect(clientActorBehavior, requestException);
            this.lock.unlock();
            return lockedReconnect;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Holding({"lock"})
    private void scheduleTimer(long j) {
        if (this.haveTimer) {
            LOG.debug("{}: timer already scheduled on {}", this.context.persistenceId(), this);
            return;
        }
        if (this.queue.hasSuccessor()) {
            LOG.debug("{}: connection {} has a successor, not scheduling timer", this.context.persistenceId(), this);
            return;
        }
        FiniteDuration fromNanos = FiniteDuration.fromNanos(j <= 0 ? 0L : Math.min(j, this.context.config().getBackendAlivenessTimerInterval()));
        LOG.debug("{}: connection {} scheduling timeout in {}", new Object[]{this.context.persistenceId(), this, fromNanos});
        this.context.executeInActor(this::runTimer, fromNanos);
        this.haveTimer = true;
    }

    @VisibleForTesting
    final ClientActorBehavior<T> runTimer(ClientActorBehavior<T> clientActorBehavior) {
        this.lock.lock();
        try {
            this.haveTimer = false;
            long currentTime = currentTime();
            LOG.debug("{}: running timer on {}", this.context.persistenceId(), this);
            long ticksStalling = this.queue.ticksStalling(currentTime);
            if (ticksStalling >= this.context.config().getNoProgressTimeout()) {
                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, Long.valueOf(TimeUnit.NANOSECONDS.toSeconds(ticksStalling)));
                NoProgressException noProgressException = new NoProgressException(ticksStalling);
                List<ConnectionEntry> lockedPoison = lockedPoison(noProgressException);
                clientActorBehavior.removeConnection(this);
                this.lock.unlock();
                poison(lockedPoison, noProgressException);
                return clientActorBehavior;
            }
            OptionalLong lockedCheckTimeout = lockedCheckTimeout(currentTime);
            if (lockedCheckTimeout == null) {
                LOG.debug("{}: connection {} timed out", this.context.persistenceId(), this);
                ClientActorBehavior<T> lockedReconnect = lockedReconnect(clientActorBehavior, new RuntimeRequestException("Backend connection timed out", new TimeoutException()));
                this.lock.unlock();
                return lockedReconnect;
            }
            if (lockedCheckTimeout.isPresent()) {
                scheduleTimer(lockedCheckTimeout.getAsLong());
            } else {
                LOG.debug("{}: not scheduling timeout on {}", this.context.persistenceId(), this);
            }
            return clientActorBehavior;
        } finally {
            this.lock.unlock();
        }
    }

    @VisibleForTesting
    final OptionalLong checkTimeout(long j) {
        this.lock.lock();
        try {
            OptionalLong lockedCheckTimeout = lockedCheckTimeout(j);
            this.lock.unlock();
            return lockedCheckTimeout;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    long backendSilentTicks(long j) {
        return j - this.lastReceivedTicks;
    }

    @SuppressFBWarnings(value = {"NP_OPTIONAL_RETURN_NULL"}, justification = "Returning null Optional is documented in the API contract.")
    private OptionalLong lockedCheckTimeout(long j) {
        if (this.queue.isEmpty()) {
            LOG.debug("{}: connection {} is empty", this.context.persistenceId(), this);
            return OptionalLong.empty();
        }
        long backendSilentTicks = backendSilentTicks(j);
        if (backendSilentTicks >= this.context.config().getBackendAlivenessTimerInterval()) {
            LOG.debug("{}: Connection {} has not seen activity from backend for {} nanoseconds, timing out", new Object[]{this.context.persistenceId(), this, Long.valueOf(backendSilentTicks)});
            return null;
        }
        int i = 0;
        ConnectionEntry peek = this.queue.peek();
        while (true) {
            ConnectionEntry connectionEntry = peek;
            if (connectionEntry == null) {
                LOG.debug("Connection {} timed out {} tasks", this, Integer.valueOf(i));
                if (i != 0) {
                    this.queue.tryTransmit(j);
                }
                return OptionalLong.empty();
            }
            long enqueuedTicks = j - connectionEntry.getEnqueuedTicks();
            long requestTimeout = this.context.config().getRequestTimeout();
            if (enqueuedTicks < requestTimeout) {
                return OptionalLong.of(requestTimeout - enqueuedTicks);
            }
            i++;
            this.queue.remove(j);
            LOG.debug("{}: Connection {} timed out entry {}", new Object[]{this.context.persistenceId(), this, connectionEntry});
            timeoutEntry(connectionEntry, enqueuedTicks);
            peek = this.queue.peek();
        }
    }

    private void timeoutEntry(ConnectionEntry connectionEntry, long j) {
        this.context.executeInActor(clientActorBehavior -> {
            connectionEntry.complete(connectionEntry.getRequest().toRequestFailure(new RequestTimeoutException(connectionEntry.getRequest() + " timed out after " + ((j * 1.0d) / 1.0E9d) + " seconds. The backend for " + this.backendName + " is not available.")));
            return clientActorBehavior;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void poison(RequestException requestException) {
        this.lock.lock();
        try {
            poison(lockedPoison(requestException), requestException);
        } finally {
            this.lock.unlock();
        }
    }

    private static void poison(Collection<? extends ConnectionEntry> collection, RequestException requestException) {
        for (ConnectionEntry connectionEntry : collection) {
            Request<?, ?> request = connectionEntry.getRequest();
            LOG.trace("Poisoning request {}", request, requestException);
            connectionEntry.complete(request.toRequestFailure(requestException));
        }
    }

    @Holding({"lock"})
    private List<ConnectionEntry> lockedPoison(RequestException requestException) {
        this.poisoned = enrichPoison(requestException);
        return this.queue.poison();
    }

    RequestException enrichPoison(RequestException requestException) {
        return requestException;
    }

    @VisibleForTesting
    final RequestException poisoned() {
        return this.poisoned;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receiveResponse(ResponseEnvelope<?> responseEnvelope) {
        long currentTime = currentTime();
        this.lastReceivedTicks = currentTime;
        this.lock.lock();
        try {
            Optional<TransmittedConnectionEntry> complete = this.queue.complete(responseEnvelope, currentTime);
            this.lock.unlock();
            if (complete.isPresent()) {
                TransmittedConnectionEntry transmittedConnectionEntry = complete.get();
                LOG.debug("Completing {} with {}", transmittedConnectionEntry, responseEnvelope);
                transmittedConnectionEntry.complete((Response) responseEnvelope.getMessage());
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public final String toString() {
        return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues()).toString();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MoreObjects.ToStringHelper addToStringAttributes(MoreObjects.ToStringHelper toStringHelper) {
        return toStringHelper.add("client", this.context.m6getIdentifier()).add("cookie", this.cookie).add("poisoned", this.poisoned);
    }
}
