/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.controller.cluster.access.client;

import akka.actor.ActorRef;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.client.BackendInfo;
import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.client.NoProgressException;
import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
import org.opendaylight.controller.cluster.access.client.TransmitQueue;
import org.opendaylight.controller.cluster.access.client.TransmittedConnectionEntry;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

@NotThreadSafe
public abstract class AbstractClientConnection<T extends BackendInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractClientConnection.class);
    @VisibleForTesting
    static final long BACKEND_ALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30L);
    @VisibleForTesting
    static final long REQUEST_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(2L);
    @VisibleForTesting
    static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15L);
    private static final long DEBUG_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
    private static final long MAX_DELAY_SECONDS = 5L;
    private static final long MAX_DELAY_NANOS = TimeUnit.SECONDS.toNanos(5L);
    private final Lock lock = new ReentrantLock();
    private final ClientActorContext context;
    @GuardedBy(value="lock")
    private final TransmitQueue queue;
    private final Long cookie;
    @GuardedBy(value="lock")
    private boolean haveTimer;
    private long lastReceivedTicks;
    private volatile RequestException poisoned;

    AbstractClientConnection(ClientActorContext context, Long cookie, TransmitQueue queue) {
        this.context = (ClientActorContext)Preconditions.checkNotNull((Object)context);
        this.cookie = (Long)Preconditions.checkNotNull((Object)cookie);
        this.queue = (TransmitQueue)Preconditions.checkNotNull((Object)queue);
        this.lastReceivedTicks = this.currentTime();
    }

    AbstractClientConnection(AbstractClientConnection<T> oldConnection, int targetQueueSize) {
        this.context = oldConnection.context;
        this.cookie = oldConnection.cookie;
        this.queue = new TransmitQueue.Halted(targetQueueSize);
        this.lastReceivedTicks = oldConnection.lastReceivedTicks;
    }

    public final ClientActorContext context() {
        return this.context;
    }

    @Nonnull
    public final Long cookie() {
        return this.cookie;
    }

    public final ActorRef localActor() {
        return this.context.self();
    }

    public final long currentTime() {
        return this.context.ticker().read();
    }

    public final void sendRequest(Request<?, ?> request, Consumer<Response<?, ?>> callback) {
        long now = this.currentTime();
        this.sendEntry(new ConnectionEntry(request, callback, now), now);
    }

    public final void enqueueRequest(Request<?, ?> request, Consumer<Response<?, ?>> callback, long enqueuedTicks) {
        this.enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), this.currentTime());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final long enqueueEntry(ConnectionEntry entry, long now) {
        this.lock.lock();
        try {
            RequestException maybePoison = this.poisoned;
            if (maybePoison != null) {
                throw new IllegalStateException("Connection " + this + " has been poisoned", (Throwable)maybePoison);
            }
            if (this.queue.isEmpty()) {
                this.scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now);
            }
            long l = this.queue.enqueue(entry, now);
            return l;
        }
        finally {
            this.lock.unlock();
        }
    }

    public abstract Optional<T> getBackendInfo();

    final Collection<ConnectionEntry> startReplay() {
        this.lock.lock();
        return this.queue.drain();
    }

    @GuardedBy(value="lock")
    final void finishReplay(ReconnectForwarder forwarder) {
        this.setForwarder(forwarder);
        this.lastReceivedTicks = this.currentTime();
        this.lock.unlock();
    }

    @GuardedBy(value="lock")
    final void setForwarder(ReconnectForwarder forwarder) {
        this.queue.setForwarder(forwarder, this.currentTime());
    }

    @GuardedBy(value="lock")
    abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> var1, RequestException var2);

    final void sendEntry(ConnectionEntry entry, long now) {
        long delay = this.enqueueEntry(entry, now);
        try {
            if (delay >= DEBUG_DELAY_NANOS) {
                if (delay > MAX_DELAY_NANOS) {
                    LOG.info("Capping {} throttle delay from {} to {} seconds", new Object[]{this, TimeUnit.NANOSECONDS.toSeconds(delay), 5L, new Throwable()});
                    delay = MAX_DELAY_NANOS;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{}: Sleeping for {}ms on connection {}", new Object[]{this.context.persistenceId(), TimeUnit.NANOSECONDS.toMillis(delay), this});
                }
            }
            TimeUnit.NANOSECONDS.sleep(delay);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.debug("Interrupted after sleeping {}ns", (Object)e, (Object)(this.currentTime() - now));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    final ClientActorBehavior<T> reconnect(ClientActorBehavior<T> current, RequestException cause) {
        this.lock.lock();
        try {
            ClientActorBehavior<T> clientActorBehavior = this.lockedReconnect(current, cause);
            return clientActorBehavior;
        }
        finally {
            this.lock.unlock();
        }
    }

    @GuardedBy(value="lock")
    private void scheduleTimer(long delay) {
        if (this.haveTimer) {
            LOG.debug("{}: timer already scheduled on {}", (Object)this.context.persistenceId(), (Object)this);
            return;
        }
        if (this.queue.hasSuccessor()) {
            LOG.debug("{}: connection {} has a successor, not scheduling timer", (Object)this.context.persistenceId(), (Object)this);
            return;
        }
        long normalized = delay <= 0L ? 0L : Math.min(delay, BACKEND_ALIVE_TIMEOUT_NANOS);
        FiniteDuration dur = FiniteDuration.fromNanos((long)normalized);
        LOG.debug("{}: connection {} scheduling timeout in {}", new Object[]{this.context.persistenceId(), this, dur});
        this.context.executeInActor(this::runTimer, dur);
        this.haveTimer = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    final ClientActorBehavior<T> runTimer(ClientActorBehavior<T> current) {
        this.lock.lock();
        try {
            this.haveTimer = false;
            long now = this.currentTime();
            LOG.debug("{}: running timer on {}", (Object)this.context.persistenceId(), (Object)this);
            long ticksSinceProgress = this.queue.ticksStalling(now);
            if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", (Object)this, (Object)TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
                this.lockedPoison(new NoProgressException(ticksSinceProgress));
                current.removeConnection(this);
                ClientActorBehavior<T> clientActorBehavior = current;
                return clientActorBehavior;
            }
            Optional<Long> delay = this.lockedCheckTimeout(now);
            if (delay == null) {
                LOG.debug("{}: connection {} timed out", (Object)this.context.persistenceId(), (Object)this);
                ClientActorBehavior<T> clientActorBehavior = this.lockedReconnect(current, (RequestException)new RuntimeRequestException("Backend connection timed out", (Throwable)new TimeoutException()));
                return clientActorBehavior;
            }
            if (delay.isPresent()) {
                this.scheduleTimer(delay.get());
            } else {
                LOG.debug("{}: not scheduling timeout on {}", (Object)this.context.persistenceId(), (Object)this);
            }
        }
        finally {
            this.lock.unlock();
        }
        return current;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    final Optional<Long> checkTimeout(long now) {
        this.lock.lock();
        try {
            Optional<Long> optional = this.lockedCheckTimeout(now);
            return optional;
        }
        finally {
            this.lock.unlock();
        }
    }

    long backendSilentTicks(long now) {
        return now - this.lastReceivedTicks;
    }

    @SuppressFBWarnings(value={"NP_OPTIONAL_RETURN_NULL"}, justification="Returning null Optional is documented in the API contract.")
    @GuardedBy(value="lock")
    private Optional<Long> lockedCheckTimeout(long now) {
        if (this.queue.isEmpty()) {
            LOG.debug("{}: connection {} is empty", (Object)this.context.persistenceId(), (Object)this);
            return Optional.empty();
        }
        long backendSilentTicks = this.backendSilentTicks(now);
        if (backendSilentTicks >= BACKEND_ALIVE_TIMEOUT_NANOS) {
            LOG.debug("{}: Connection {} has not seen activity from backend for {} nanoseconds, timing out", new Object[]{this.context.persistenceId(), this, backendSilentTicks});
            return null;
        }
        int tasksTimedOut = 0;
        ConnectionEntry head = this.queue.peek();
        while (head != null) {
            long beenOpen = now - head.getEnqueuedTicks();
            if (beenOpen < REQUEST_TIMEOUT_NANOS) {
                return Optional.of(REQUEST_TIMEOUT_NANOS - beenOpen);
            }
            ++tasksTimedOut;
            this.queue.remove(now);
            LOG.debug("{}: Connection {} timed out entry {}", new Object[]{this.context.persistenceId(), this, head});
            double time = (double)beenOpen * 1.0 / 1.0E9;
            head.complete((Response<?, ?>)head.getRequest().toRequestFailure((RequestException)new RequestTimeoutException("Timed out after " + time + "seconds")));
            head = this.queue.peek();
        }
        LOG.debug("Connection {} timed out {} tasks", (Object)this, (Object)tasksTimedOut);
        if (tasksTimedOut != 0) {
            this.queue.tryTransmit(now);
        }
        return Optional.empty();
    }

    final void poison(RequestException cause) {
        this.lock.lock();
        try {
            this.lockedPoison(cause);
        }
        finally {
            this.lock.unlock();
        }
    }

    @GuardedBy(value="lock")
    private void lockedPoison(RequestException cause) {
        this.poisoned = this.enrichPoison(cause);
        this.queue.poison(cause);
    }

    RequestException enrichPoison(RequestException ex) {
        return ex;
    }

    @VisibleForTesting
    final RequestException poisoned() {
        return this.poisoned;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void receiveResponse(ResponseEnvelope<?> envelope) {
        Optional<TransmittedConnectionEntry> maybeEntry;
        long now;
        this.lastReceivedTicks = now = this.currentTime();
        this.lock.lock();
        try {
            maybeEntry = this.queue.complete(envelope, now);
        }
        finally {
            this.lock.unlock();
        }
        if (maybeEntry.isPresent()) {
            TransmittedConnectionEntry entry = maybeEntry.get();
            LOG.debug("Completing {} with {}", (Object)entry, envelope);
            entry.complete((Response)envelope.getMessage());
        }
    }

    public final String toString() {
        return this.addToStringAttributes(MoreObjects.toStringHelper((Object)this).omitNullValues()).toString();
    }

    MoreObjects.ToStringHelper addToStringAttributes(MoreObjects.ToStringHelper toStringHelper) {
        return toStringHelper.add("client", (Object)this.context.getIdentifier()).add("cookie", (Object)this.cookie).add("poisoned", (Object)this.poisoned);
    }
}

