/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.controller.cluster.access.client;

import akka.actor.ActorRef;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.client.AveragingProgressTracker;
import org.opendaylight.controller.cluster.access.client.BackendInfo;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.client.ProgressTracker;
import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
import org.opendaylight.controller.cluster.access.client.TransmittedConnectionEntry;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
abstract class TransmitQueue {
    private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
    private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<TransmittedConnectionEntry>();
    private final Deque<ConnectionEntry> pending = new ArrayDeque<ConnectionEntry>();
    private final ProgressTracker tracker;
    private ReconnectForwarder successor;

    TransmitQueue(int targetDepth) {
        this.tracker = new AveragingProgressTracker(targetDepth);
    }

    final Collection<ConnectionEntry> drain() {
        ArrayDeque<ConnectionEntry> ret = new ArrayDeque<ConnectionEntry>(this.inflight.size() + this.pending.size());
        ret.addAll(this.inflight);
        ret.addAll(this.pending);
        this.inflight.clear();
        this.pending.clear();
        return ret;
    }

    final long ticksStalling(long now) {
        return this.tracker.ticksStalling(now);
    }

    final boolean hasSuccessor() {
        return this.successor != null;
    }

    final Optional<TransmittedConnectionEntry> complete(ResponseEnvelope<?> envelope, long now) {
        Optional<TransmittedConnectionEntry> maybeEntry = TransmitQueue.findMatchingEntry(this.inflight, envelope);
        if (maybeEntry == null) {
            LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
            maybeEntry = TransmitQueue.findMatchingEntry(this.pending, envelope);
        }
        if (maybeEntry == null || !maybeEntry.isPresent()) {
            LOG.warn("No request matching {} found, ignoring response", envelope);
            return Optional.empty();
        }
        TransmittedConnectionEntry entry = maybeEntry.get();
        this.tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
        this.tryTransmit(now);
        return Optional.of(entry);
    }

    final void tryTransmit(long now) {
        int toSend = this.canTransmitCount(this.inflight.size());
        if (toSend > 0 && !this.pending.isEmpty()) {
            this.transmitEntries(toSend, now);
        }
    }

    private void transmitEntries(int maxTransmit, long now) {
        for (int i = 0; i < maxTransmit; ++i) {
            ConnectionEntry e = this.pending.poll();
            if (e == null) {
                LOG.debug("Queue {} transmitted {} requests", (Object)this, (Object)i);
                return;
            }
            this.transmitEntry(e, now);
        }
        LOG.debug("Queue {} transmitted {} requests", (Object)this, (Object)maxTransmit);
    }

    private void transmitEntry(ConnectionEntry entry, long now) {
        LOG.debug("Queue {} transmitting entry {}", (Object)this, (Object)entry);
        this.inflight.addLast(this.transmit(entry, now));
    }

    final long enqueue(ConnectionEntry entry, long now) {
        if (this.successor != null) {
            this.successor.forwardEntry(entry, now);
            return 0L;
        }
        long delay = this.tracker.openTask(now);
        int toSend = this.canTransmitCount(this.inflight.size());
        if (toSend <= 0) {
            LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
            this.pending.addLast(entry);
            return delay;
        }
        if (this.pending.isEmpty()) {
            this.transmitEntry(entry, now);
            return delay;
        }
        this.pending.addLast(entry);
        this.transmitEntries(toSend, now);
        return delay;
    }

    abstract int canTransmitCount(int var1);

    abstract TransmittedConnectionEntry transmit(ConnectionEntry var1, long var2);

    final boolean isEmpty() {
        return this.inflight.isEmpty() && this.pending.isEmpty();
    }

    final ConnectionEntry peek() {
        ConnectionEntry ret = this.inflight.peek();
        if (ret != null) {
            return ret;
        }
        return this.pending.peek();
    }

    final void poison(RequestException cause) {
        TransmitQueue.poisonQueue(this.inflight, cause);
        TransmitQueue.poisonQueue(this.pending, cause);
    }

    final void setForwarder(ReconnectForwarder forwarder, long now) {
        Verify.verify((this.successor == null ? 1 : 0) != 0, (String)"Successor {} already set on connection {}", (Object[])new Object[]{this.successor, this});
        this.successor = (ReconnectForwarder)Preconditions.checkNotNull((Object)forwarder);
        LOG.debug("Connection {} superseded by {}, splicing queue", (Object)this, (Object)this.successor);
        int count = 0;
        ConnectionEntry entry = this.inflight.poll();
        while (entry != null) {
            this.successor.replayEntry(entry, now);
            entry = this.inflight.poll();
            ++count;
        }
        entry = this.pending.poll();
        while (entry != null) {
            this.successor.replayEntry(entry, now);
            entry = this.pending.poll();
            ++count;
        }
        LOG.debug("Connection {} queue spliced {} messages", (Object)this, (Object)count);
    }

    final void remove(long now) {
        TransmittedConnectionEntry txe = this.inflight.poll();
        if (txe == null) {
            ConnectionEntry entry = this.pending.pop();
            this.tracker.closeTask(now, entry.getEnqueuedTicks(), 0L, 0L);
        } else {
            this.tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0L);
        }
    }

    @VisibleForTesting
    Deque<TransmittedConnectionEntry> getInflight() {
        return this.inflight;
    }

    @VisibleForTesting
    Deque<ConnectionEntry> getPending() {
        return this.pending;
    }

    @SuppressFBWarnings(value={"NP_OPTIONAL_RETURN_NULL"}, justification="Returning null Optional is documented in the API contract.")
    private static Optional<TransmittedConnectionEntry> findMatchingEntry(Queue<? extends ConnectionEntry> queue, ResponseEnvelope<?> envelope) {
        Iterator it = queue.iterator();
        while (it.hasNext()) {
            ConnectionEntry e = (ConnectionEntry)it.next();
            Request<?, ?> request = e.getRequest();
            Response response = (Response)envelope.getMessage();
            if (!request.getTarget().equals(response.getTarget())) continue;
            if (request.getSequence() != response.getSequence()) {
                LOG.debug("Expecting sequence {}, ignoring response {}", (Object)request.getSequence(), envelope);
                return Optional.empty();
            }
            if (!(e instanceof TransmittedConnectionEntry)) {
                return Optional.empty();
            }
            TransmittedConnectionEntry te = (TransmittedConnectionEntry)e;
            if (envelope.getSessionId() != te.getSessionId()) {
                LOG.debug("Expecting session {}, ignoring response {}", (Object)te.getSessionId(), envelope);
                return Optional.empty();
            }
            if (envelope.getTxSequence() != te.getTxSequence()) {
                LOG.warn("Expecting txSequence {}, ignoring response {}", (Object)te.getTxSequence(), envelope);
                return Optional.empty();
            }
            LOG.debug("Completing request {} with {}", request, envelope);
            it.remove();
            return Optional.of(te);
        }
        return null;
    }

    private static void poisonQueue(Queue<? extends ConnectionEntry> queue, RequestException cause) {
        for (ConnectionEntry connectionEntry : queue) {
            Request<?, ?> request = connectionEntry.getRequest();
            LOG.trace("Poisoning request {}", request, (Object)cause);
            connectionEntry.complete((Response<?, ?>)request.toRequestFailure(cause));
        }
        queue.clear();
    }

    static final class Transmitting
    extends TransmitQueue {
        private final BackendInfo backend;
        private long nextTxSequence;

        Transmitting(int targetDepth, BackendInfo backend) {
            super(targetDepth);
            this.backend = (BackendInfo)Preconditions.checkNotNull((Object)backend);
        }

        @Override
        int canTransmitCount(int inflightSize) {
            return this.backend.getMaxMessages() - inflightSize;
        }

        @Override
        TransmittedConnectionEntry transmit(ConnectionEntry entry, long now) {
            RequestEnvelope env = new RequestEnvelope((Request)entry.getRequest().toVersion(this.backend.getVersion()), this.backend.getSessionId(), this.nextTxSequence++);
            TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(), env.getTxSequence(), now);
            this.backend.getActor().tell((Object)env, ActorRef.noSender());
            return ret;
        }
    }

    static final class Halted
    extends TransmitQueue {
        Halted(int targetDepth) {
            super(targetDepth);
        }

        @Override
        int canTransmitCount(int inflightSize) {
            return 0;
        }

        @Override
        TransmittedConnectionEntry transmit(ConnectionEntry entry, long now) {
            throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
        }
    }
}

