/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.controller.cluster.access.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.client.BackendInfo;
import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.access.client.EmptyQueue;
import org.opendaylight.controller.cluster.access.client.NoProgressException;
import org.opendaylight.controller.cluster.access.client.RequestCallback;
import org.opendaylight.controller.cluster.access.client.SequencedQueueEntry;
import org.opendaylight.controller.cluster.access.client.TxDetails;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

@NotThreadSafe
final class SequencedQueue {
    private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
    @VisibleForTesting
    static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15L);
    @VisibleForTesting
    static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30L);
    private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply((long)REQUEST_TIMEOUT_NANOS, (TimeUnit)TimeUnit.NANOSECONDS);
    private static final int DEFAULT_TX_LIMIT = 1000;
    private final Ticker ticker;
    private final Long cookie;
    private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<SequencedQueueEntry>();
    private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<SequencedQueueEntry>();
    private final Queue<SequencedQueueEntry> pending = new ArrayDeque<SequencedQueueEntry>();
    private CompletionStage<? extends BackendInfo> backendProof;
    private BackendInfo backend;
    private long txSequence;
    private int lastTxLimit = 1000;
    private Object expectingTimer;
    private long lastProgress;
    private volatile boolean notClosed = true;

    SequencedQueue(Long cookie, Ticker ticker) {
        this.cookie = (Long)Preconditions.checkNotNull((Object)cookie);
        this.ticker = (Ticker)Preconditions.checkNotNull((Object)ticker);
        this.lastProgress = ticker.read();
    }

    Long getCookie() {
        return this.cookie;
    }

    private void checkNotClosed() {
        Preconditions.checkState((boolean)this.notClosed, (String)"Queue %s is closed", (Object[])new Object[]{this});
    }

    private long nextTxSequence() {
        return this.txSequence++;
    }

    @Nullable
    Optional<FiniteDuration> enqueueRequest(Request<?, ?> request, RequestCallback callback) {
        this.checkNotClosed();
        long now = this.ticker.read();
        SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
        if (this.backend == null) {
            LOG.debug("No backend available, request resolution");
            this.pending.add(e);
            return Optional.empty();
        }
        if (!this.lastInflight.isEmpty()) {
            LOG.debug("Retransmit not yet complete, delaying request {}", request);
            this.pending.add(e);
            return null;
        }
        if (this.currentInflight.size() >= this.lastTxLimit) {
            LOG.debug("Queue is at capacity, delayed sending of request {}", request);
            this.pending.add(e);
            return null;
        }
        this.currentInflight.offer(e);
        LOG.debug("Enqueued request {} to queue {}", request, (Object)this);
        e.retransmit(this.backend, this.nextTxSequence(), now);
        if (this.expectingTimer == null) {
            this.expectingTimer = now + REQUEST_TIMEOUT_NANOS;
            return Optional.of(INITIAL_REQUEST_TIMEOUT);
        }
        return null;
    }

    private static Optional<SequencedQueueEntry> findMatchingEntry(Queue<SequencedQueueEntry> queue, ResponseEnvelope<?> envelope) {
        Iterator it = queue.iterator();
        while (it.hasNext()) {
            SequencedQueueEntry e = (SequencedQueueEntry)it.next();
            TxDetails txDetails = (TxDetails)Verify.verifyNotNull((Object)e.getTxDetails());
            Request<?, ?> request = e.getRequest();
            Response response = (Response)envelope.getMessage();
            if (!request.getTarget().equals(response.getTarget())) continue;
            if (request.getSequence() != response.getSequence()) {
                LOG.debug("Expecting sequence {}, ignoring response {}", (Object)request.getSequence(), envelope);
                return Optional.empty();
            }
            if (envelope.getSessionId() != txDetails.getSessionId()) {
                LOG.debug("Expecting session {}, ignoring response {}", (Object)txDetails.getSessionId(), envelope);
                return Optional.empty();
            }
            if (envelope.getTxSequence() != txDetails.getTxSequence()) {
                LOG.warn("Expecting txSequence {}, ignoring response {}", (Object)txDetails.getTxSequence(), envelope);
                return Optional.empty();
            }
            LOG.debug("Completing request {} with {}", request, envelope);
            it.remove();
            return Optional.of(e);
        }
        return null;
    }

    ClientActorBehavior complete(ClientActorBehavior current, ResponseEnvelope<?> envelope) {
        int toSend;
        Optional<SequencedQueueEntry> maybeEntry = SequencedQueue.findMatchingEntry(this.currentInflight, envelope);
        if (maybeEntry == null) {
            maybeEntry = SequencedQueue.findMatchingEntry(this.lastInflight, envelope);
        }
        if (maybeEntry == null || !maybeEntry.isPresent()) {
            LOG.warn("No request matching {} found, ignoring response", envelope);
            return current;
        }
        this.lastProgress = this.ticker.read();
        ClientActorBehavior ret = maybeEntry.get().complete((Response)envelope.getMessage());
        if (this.backend != null && (toSend = this.lastTxLimit - this.currentInflight.size()) > 0) {
            this.runTransmit(toSend);
        }
        return ret;
    }

    private int transmitEntries(Queue<SequencedQueueEntry> queue, int count) {
        SequencedQueueEntry e;
        int toSend;
        for (toSend = count; toSend > 0 && (e = queue.poll()) != null; --toSend) {
            LOG.debug("Transmitting entry {}", (Object)e);
            e.retransmit(this.backend, this.nextTxSequence(), this.lastProgress);
        }
        return toSend;
    }

    private void runTransmit(int count) {
        int toSend;
        if (!this.lastInflight.isEmpty()) {
            toSend = this.transmitEntries(this.lastInflight, count);
            if (this.lastInflight.isEmpty()) {
                this.lastInflight = EmptyQueue.getInstance();
            }
        } else {
            toSend = count;
        }
        this.transmitEntries(this.pending, toSend);
    }

    Optional<FiniteDuration> setBackendInfo(CompletionStage<? extends BackendInfo> proof, BackendInfo backend) {
        Preconditions.checkNotNull((Object)backend);
        if (!proof.equals(this.backendProof)) {
            LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
            return Optional.empty();
        }
        LOG.debug("Resolved backend {}", (Object)backend);
        ArrayDeque newLast = new ArrayDeque(this.currentInflight.size() + this.lastInflight.size());
        newLast.addAll(this.currentInflight);
        newLast.addAll(this.lastInflight);
        this.lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
        int txLimit = backend.getMaxMessages();
        if (this.lastTxLimit > txLimit) {
            this.currentInflight = new ArrayDeque<SequencedQueueEntry>();
        } else {
            this.currentInflight.clear();
        }
        this.backend = backend;
        this.backendProof = null;
        this.txSequence = 0L;
        this.lastTxLimit = txLimit;
        this.lastProgress = this.ticker.read();
        if (this.lastInflight.isEmpty() && this.pending.isEmpty()) {
            return Optional.empty();
        }
        LOG.debug("Sending up to {} requests to backend {}", (Object)txLimit, (Object)backend);
        this.runTransmit(this.lastTxLimit);
        if (this.expectingTimer == null) {
            long nextTicks = this.ticker.read() + REQUEST_TIMEOUT_NANOS;
            this.expectingTimer = nextTicks;
            return Optional.of(FiniteDuration.apply((long)(nextTicks - this.lastProgress), (TimeUnit)TimeUnit.NANOSECONDS));
        }
        return Optional.empty();
    }

    boolean expectProof(CompletionStage<? extends BackendInfo> proof) {
        if (!proof.equals(this.backendProof)) {
            LOG.debug("Setting resolution handle to {}", proof);
            this.backendProof = proof;
            return true;
        }
        LOG.trace("Already resolving handle {}", proof);
        return false;
    }

    boolean hasCompleted() {
        return !this.notClosed && this.currentInflight.isEmpty() && this.lastInflight.isEmpty() && this.pending.isEmpty();
    }

    boolean runTimeout() throws NoProgressException {
        long ticksSinceProgress;
        this.expectingTimer = null;
        long now = this.ticker.read();
        if (!(this.currentInflight.isEmpty() && this.lastInflight.isEmpty() && this.pending.isEmpty() || (ticksSinceProgress = now - this.lastProgress) < NO_PROGRESS_TIMEOUT_NANOS)) {
            LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", (Object)this, (Object)TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
            NoProgressException ex = new NoProgressException(ticksSinceProgress);
            this.poison(ex);
            throw ex;
        }
        SequencedQueueEntry head = this.currentInflight.peek();
        if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
            this.backend = null;
            LOG.debug("Queue {} invalidated backend info", (Object)this);
            return true;
        }
        return false;
    }

    private static void poisonQueue(Queue<SequencedQueueEntry> queue, RequestException cause) {
        queue.forEach(e -> e.poison(cause));
        queue.clear();
    }

    void poison(RequestException cause) {
        this.close();
        SequencedQueue.poisonQueue(this.currentInflight, cause);
        SequencedQueue.poisonQueue(this.lastInflight, cause);
        SequencedQueue.poisonQueue(this.pending, cause);
    }

    void close() {
        this.notClosed = false;
    }
}

