package org.opendaylight.controller.cluster.access.client;

import akka.actor.ActorRef;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Verify;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
import org.opendaylight.controller.cluster.messaging.MessageSlicer;
import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/opendaylight/controller/cluster/access/client/TransmitQueue.class */
public abstract class TransmitQueue {
    private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
    private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque();
    private final Deque<ConnectionEntry> pending = new ArrayDeque();
    private final AveragingProgressTracker tracker;
    private ReconnectForwarder successor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opendaylight/controller/cluster/access/client/TransmitQueue$Halted.class */
    public static final class Halted extends TransmitQueue {
        /* JADX INFO: Access modifiers changed from: package-private */
        public Halted(int i) {
            super(i);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Halted(TransmitQueue transmitQueue, long j) {
            super(transmitQueue, j);
        }

        @Override // org.opendaylight.controller.cluster.access.client.TransmitQueue
        int canTransmitCount(int i) {
            return 0;
        }

        @Override // org.opendaylight.controller.cluster.access.client.TransmitQueue
        Optional<TransmittedConnectionEntry> transmit(ConnectionEntry connectionEntry, long j) {
            throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
        }

        @Override // org.opendaylight.controller.cluster.access.client.TransmitQueue
        void preComplete(ResponseEnvelope<?> responseEnvelope) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opendaylight/controller/cluster/access/client/TransmitQueue$Transmitting.class */
    public static final class Transmitting extends TransmitQueue {
        private static final long NOT_SLICING = -1;
        private final BackendInfo backend;
        private final MessageSlicer messageSlicer;
        private long nextTxSequence;
        private long currentSlicedEnvSequenceId;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Transmitting(TransmitQueue transmitQueue, int i, BackendInfo backendInfo, long j, MessageSlicer messageSlicer) {
            super(transmitQueue, i, j);
            this.currentSlicedEnvSequenceId = NOT_SLICING;
            this.backend = (BackendInfo) Objects.requireNonNull(backendInfo);
            this.messageSlicer = (MessageSlicer) Objects.requireNonNull(messageSlicer);
        }

        @Override // org.opendaylight.controller.cluster.access.client.TransmitQueue
        int canTransmitCount(int i) {
            return this.backend.getMaxMessages() - i;
        }

        @Override // org.opendaylight.controller.cluster.access.client.TransmitQueue
        Optional<TransmittedConnectionEntry> transmit(ConnectionEntry connectionEntry, long j) {
            if (this.currentSlicedEnvSequenceId >= 0) {
                return Optional.empty();
            }
            Request<?, ?> request = connectionEntry.getRequest();
            Request version = request.toVersion(this.backend.getVersion());
            long sessionId = this.backend.getSessionId();
            long j2 = this.nextTxSequence;
            this.nextTxSequence = j2 + 1;
            RequestEnvelope requestEnvelope = new RequestEnvelope(version, sessionId, j2);
            if (!(request instanceof SliceableMessage)) {
                this.backend.getActor().tell(requestEnvelope, ActorRef.noSender());
            } else if (this.messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget()).message(requestEnvelope).replyTo(request.getReplyTo()).sendTo(this.backend.getActor()).onFailureCallback(th -> {
                requestEnvelope.sendFailure(new RuntimeRequestException("Failed to slice request " + String.valueOf(request), th), 0L);
            }).build())) {
                this.currentSlicedEnvSequenceId = requestEnvelope.getTxSequence();
            }
            return Optional.of(new TransmittedConnectionEntry(connectionEntry, requestEnvelope.getSessionId(), requestEnvelope.getTxSequence(), j));
        }

        @Override // org.opendaylight.controller.cluster.access.client.TransmitQueue
        void preComplete(ResponseEnvelope<?> responseEnvelope) {
            if (responseEnvelope.getTxSequence() == this.currentSlicedEnvSequenceId) {
                this.currentSlicedEnvSequenceId = NOT_SLICING;
            }
        }
    }

    TransmitQueue(int i) {
        this.tracker = new AveragingProgressTracker(i);
    }

    TransmitQueue(TransmitQueue transmitQueue, int i, long j) {
        this.tracker = new AveragingProgressTracker(transmitQueue.tracker, i, j);
    }

    TransmitQueue(TransmitQueue transmitQueue, long j) {
        this.tracker = new AveragingProgressTracker(transmitQueue.tracker, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancelDebt(long j) {
        this.tracker.cancelDebt(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final Collection<ConnectionEntry> drain() {
        ArrayDeque arrayDeque = new ArrayDeque(this.inflight.size() + this.pending.size());
        arrayDeque.addAll(this.inflight);
        arrayDeque.addAll(this.pending);
        this.inflight.clear();
        this.pending.clear();
        return arrayDeque;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final long ticksStalling(long j) {
        return this.tracker.ticksStalling(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final boolean hasSuccessor() {
        return this.successor != null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final Optional<TransmittedConnectionEntry> complete(ResponseEnvelope<?> responseEnvelope, long j) {
        preComplete(responseEnvelope);
        Optional<TransmittedConnectionEntry> findMatchingEntry = findMatchingEntry(this.inflight, responseEnvelope);
        if (findMatchingEntry == null) {
            LOG.debug("Request for {} not found in inflight queue, checking pending queue", responseEnvelope);
            findMatchingEntry = findMatchingEntry(this.pending, responseEnvelope);
        }
        if (findMatchingEntry == null || !findMatchingEntry.isPresent()) {
            LOG.warn("No request matching {} found, ignoring response", responseEnvelope);
            return Optional.empty();
        }
        TransmittedConnectionEntry orElseThrow = findMatchingEntry.orElseThrow();
        this.tracker.closeTask(j, orElseThrow.getEnqueuedTicks(), orElseThrow.getTxTicks(), responseEnvelope.getExecutionTimeNanos());
        tryTransmit(j);
        return Optional.of(orElseThrow);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void tryTransmit(long j) {
        int canTransmitCount = canTransmitCount(this.inflight.size());
        if (canTransmitCount <= 0 || this.pending.isEmpty()) {
            return;
        }
        transmitEntries(canTransmitCount, j);
    }

    private void transmitEntries(int i, long j) {
        for (int i2 = 0; i2 < i; i2++) {
            ConnectionEntry poll = this.pending.poll();
            if (poll == null || !transmitEntry(poll, j)) {
                LOG.debug("Queue {} transmitted {} requests", this, Integer.valueOf(i2));
                return;
            }
        }
        LOG.debug("Queue {} transmitted {} requests", this, Integer.valueOf(i));
    }

    private boolean transmitEntry(ConnectionEntry connectionEntry, long j) {
        LOG.debug("Queue {} transmitting entry {}", this, connectionEntry);
        Optional<TransmittedConnectionEntry> transmit = transmit(connectionEntry, j);
        if (!transmit.isPresent()) {
            return false;
        }
        this.inflight.addLast(transmit.orElseThrow());
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final long enqueueOrForward(ConnectionEntry connectionEntry, long j) {
        if (this.successor == null) {
            return enqueue(connectionEntry, j);
        }
        this.successor.forwardEntry(connectionEntry, j);
        return 0L;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void enqueueOrReplay(ConnectionEntry connectionEntry, long j) {
        if (this.successor != null) {
            this.successor.replayEntry(connectionEntry, j);
        } else {
            enqueue(connectionEntry, j);
        }
    }

    private long enqueue(ConnectionEntry connectionEntry, long j) {
        long openTask = this.tracker.openTask(j);
        int canTransmitCount = canTransmitCount(this.inflight.size());
        if (canTransmitCount <= 0) {
            LOG.trace("Queue is at capacity, delayed sending of request {}", connectionEntry.getRequest());
            this.pending.addLast(connectionEntry);
            return openTask;
        }
        if (!this.pending.isEmpty()) {
            this.pending.addLast(connectionEntry);
            transmitEntries(canTransmitCount, j);
            return openTask;
        }
        if (!transmitEntry(connectionEntry, j)) {
            LOG.debug("Queue {} cannot transmit request {} - delaying it", this, connectionEntry.getRequest());
            this.pending.addLast(connectionEntry);
        }
        return openTask;
    }

    abstract int canTransmitCount(int i);

    abstract Optional<TransmittedConnectionEntry> transmit(ConnectionEntry connectionEntry, long j);

    abstract void preComplete(ResponseEnvelope<?> responseEnvelope);

    /* JADX INFO: Access modifiers changed from: package-private */
    public final boolean isEmpty() {
        return this.inflight.isEmpty() && this.pending.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final ConnectionEntry peek() {
        TransmittedConnectionEntry peek = this.inflight.peek();
        return peek != null ? peek : this.pending.peek();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final List<ConnectionEntry> poison() {
        ArrayList arrayList = new ArrayList(this.inflight.size() + this.pending.size());
        arrayList.addAll(this.inflight);
        this.inflight.clear();
        arrayList.addAll(this.pending);
        this.pending.clear();
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void setForwarder(ReconnectForwarder reconnectForwarder, long j) {
        Verify.verify(this.successor == null, "Successor %s already set on connection %s", this.successor, this);
        this.successor = (ReconnectForwarder) Objects.requireNonNull(reconnectForwarder);
        LOG.debug("Connection {} superseded by {}, splicing queue", this, this.successor);
        int i = 0;
        TransmittedConnectionEntry poll = this.inflight.poll();
        while (poll != null) {
            this.successor.replayEntry(poll, j);
            poll = this.inflight.poll();
            i++;
        }
        ConnectionEntry poll2 = this.pending.poll();
        while (poll2 != null) {
            this.successor.replayEntry(poll2, j);
            poll2 = this.pending.poll();
            i++;
        }
        LOG.debug("Connection {} queue spliced {} messages", this, Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void remove(long j) {
        TransmittedConnectionEntry poll = this.inflight.poll();
        if (poll != null) {
            this.tracker.closeTask(j, poll.getEnqueuedTicks(), poll.getTxTicks(), 0L);
        } else {
            this.tracker.closeTask(j, this.pending.pop().getEnqueuedTicks(), 0L, 0L);
        }
    }

    @VisibleForTesting
    Deque<TransmittedConnectionEntry> getInflight() {
        return this.inflight;
    }

    @VisibleForTesting
    Deque<ConnectionEntry> getPending() {
        return this.pending;
    }

    @SuppressFBWarnings(value = {"NP_OPTIONAL_RETURN_NULL"}, justification = "Returning null Optional is documented in the API contract.")
    private static Optional<TransmittedConnectionEntry> findMatchingEntry(Queue<? extends ConnectionEntry> queue, ResponseEnvelope<?> responseEnvelope) {
        Iterator<? extends ConnectionEntry> it = queue.iterator();
        while (it.hasNext()) {
            ConnectionEntry next = it.next();
            Request<?, ?> request = next.getRequest();
            Response message = responseEnvelope.getMessage();
            if (request.getTarget().equals(message.getTarget())) {
                if (request.getSequence() != message.getSequence()) {
                    LOG.debug("Expecting sequence {}, ignoring response {}", Long.valueOf(request.getSequence()), responseEnvelope);
                    return Optional.empty();
                }
                if (!(next instanceof TransmittedConnectionEntry)) {
                    return Optional.empty();
                }
                TransmittedConnectionEntry transmittedConnectionEntry = (TransmittedConnectionEntry) next;
                if (responseEnvelope.getSessionId() != transmittedConnectionEntry.getSessionId()) {
                    LOG.debug("Expecting session {}, ignoring response {}", Long.valueOf(transmittedConnectionEntry.getSessionId()), responseEnvelope);
                    return Optional.empty();
                }
                if (responseEnvelope.getTxSequence() != transmittedConnectionEntry.getTxSequence()) {
                    LOG.warn("Expecting txSequence {}, ignoring response {}", Long.valueOf(transmittedConnectionEntry.getTxSequence()), responseEnvelope);
                    return Optional.empty();
                }
                LOG.debug("Completing request {} with {}", request, responseEnvelope);
                it.remove();
                return Optional.of(transmittedConnectionEntry);
            }
        }
        return null;
    }
}
