package io.nats.client.impl;

import io.nats.client.support.NatsConstants;
import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/jnats-2.12.0.jar:io/nats/client/impl/MessageQueue.class */
public class MessageQueue {
    private static final int STOPPED = 0;
    private static final int RUNNING = 1;
    private static final int DRAINING = 2;
    private final AtomicLong length;
    private final AtomicLong sizeInBytes;
    private final AtomicInteger running;
    private final boolean singleThreadedReader;
    private final LinkedBlockingQueue<NatsMessage> queue;
    private final Lock filterLock;
    private final boolean discardWhenFull;
    private final NatsMessage poisonPill;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageQueue(boolean z, int i, boolean z2) {
        this.queue = i > 0 ? new LinkedBlockingQueue<>(i) : new LinkedBlockingQueue<>();
        this.discardWhenFull = z2;
        this.running = new AtomicInteger(1);
        this.sizeInBytes = new AtomicLong(0L);
        this.length = new AtomicLong(0L);
        this.poisonPill = new NatsMessage("_poison", null, NatsConstants.EMPTY_BODY);
        this.filterLock = new ReentrantLock();
        this.singleThreadedReader = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageQueue(boolean z) {
        this(z, 0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageQueue(boolean z, int i) {
        this(z, i, false);
    }

    boolean isSingleReaderMode() {
        return this.singleThreadedReader;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isRunning() {
        return this.running.get() != 0;
    }

    boolean isDraining() {
        return this.running.get() == 2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pause() {
        this.running.set(0);
        poisonTheQueue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resume() {
        this.running.set(1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void drain() {
        this.running.set(2);
        poisonTheQueue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isDrained() {
        return this.running.get() == 2 && length() == 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean push(NatsMessage natsMessage) {
        return push(natsMessage, false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean push(NatsMessage natsMessage, boolean z) {
        this.filterLock.lock();
        if (!z) {
            try {
                if (this.discardWhenFull) {
                    boolean offer = this.queue.offer(natsMessage);
                    this.filterLock.unlock();
                    return offer;
                }
            } catch (Throwable th) {
                this.filterLock.unlock();
                throw th;
            }
        }
        if (!offer(natsMessage)) {
            throw new IllegalStateException("Output queue is full " + this.queue.size());
        }
        this.sizeInBytes.getAndAdd(natsMessage.getSizeInBytes());
        this.length.incrementAndGet();
        this.filterLock.unlock();
        return true;
    }

    void poisonTheQueue() {
        try {
            this.queue.add(this.poisonPill);
        } catch (IllegalStateException e) {
        }
    }

    boolean offer(NatsMessage natsMessage) {
        try {
            return this.queue.offer(natsMessage, 5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            return false;
        }
    }

    NatsMessage poll(Duration duration) throws InterruptedException {
        NatsMessage natsMessage = null;
        if (duration == null || isDraining()) {
            natsMessage = this.queue.poll();
        } else {
            long nanos = duration.toNanos();
            if (nanos != 0) {
                natsMessage = this.queue.poll(nanos, TimeUnit.NANOSECONDS);
            }
            while (isRunning()) {
                natsMessage = this.queue.poll(100L, TimeUnit.DAYS);
                if (natsMessage != null) {
                    break;
                }
            }
        }
        if (natsMessage == this.poisonPill) {
            return null;
        }
        return natsMessage;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NatsMessage pop(Duration duration) throws InterruptedException {
        NatsMessage poll;
        if (!isRunning() || (poll = poll(duration)) == null) {
            return null;
        }
        this.sizeInBytes.getAndAdd(-poll.getSizeInBytes());
        this.length.decrementAndGet();
        return poll;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NatsMessage accumulate(long j, long j2, Duration duration) throws InterruptedException {
        NatsMessage poll;
        NatsMessage peek;
        if (!this.singleThreadedReader) {
            throw new IllegalStateException("Accumulate is only supported in single reader mode.");
        }
        if (!isRunning() || (poll = poll(duration)) == null) {
            return null;
        }
        long sizeInBytes = poll.getSizeInBytes();
        if (j2 <= 1 || sizeInBytes >= j) {
            this.sizeInBytes.addAndGet(-sizeInBytes);
            this.length.decrementAndGet();
            return poll;
        }
        long j3 = 1;
        NatsMessage natsMessage = poll;
        while (natsMessage != null && (peek = this.queue.peek()) != null && peek != this.poisonPill) {
            long sizeInBytes2 = peek.getSizeInBytes();
            if (j >= 0 && sizeInBytes + sizeInBytes2 >= j) {
                break;
            }
            sizeInBytes += sizeInBytes2;
            j3++;
            natsMessage.next = this.queue.poll();
            natsMessage = natsMessage.next;
            if (j3 == j2) {
                break;
            }
        }
        this.sizeInBytes.addAndGet(-sizeInBytes);
        this.length.addAndGet(-j3);
        return poll;
    }

    NatsMessage popNow() throws InterruptedException {
        return pop(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long length() {
        return this.length.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long sizeInBytes() {
        return this.sizeInBytes.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void filter(Predicate<NatsMessage> predicate) {
        this.filterLock.lock();
        try {
            if (isRunning()) {
                throw new IllegalStateException("Filter is only supported when the queue is paused");
            }
            ArrayList arrayList = new ArrayList();
            NatsMessage poll = this.queue.poll();
            while (poll != null) {
                if (predicate.test(poll)) {
                    this.sizeInBytes.addAndGet(-poll.getSizeInBytes());
                    this.length.decrementAndGet();
                } else {
                    arrayList.add(poll);
                }
                poll = this.queue.poll();
            }
            this.queue.addAll(arrayList);
            this.filterLock.unlock();
        } catch (Throwable th) {
            this.filterLock.unlock();
            throw th;
        }
    }
}
