package org.elasticsoftware.elasticactors.rabbitmq.ack;

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.elasticsoftware.elasticactors.rabbitmq.MessageAcker;
import org.elasticsoftware.elasticactors.util.concurrent.DaemonThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/ack/BufferingMessageAcker.class */
public final class BufferingMessageAcker implements Runnable, MessageAcker {
    private static final Logger logger = LoggerFactory.getLogger(BufferingMessageAcker.class);
    private final Channel consumerChannel;
    private final LinkedBlockingQueue<Tag> tagQueue = new LinkedBlockingQueue<>();
    private long lastAckedTag = -1;
    private long highestAckedTag = -1;
    private final TreeSet<Long> pendingTags = new TreeSet<>();
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private final ThreadFactory threadFactory = new DaemonThreadFactory("RABBITMQ-MESSAGE-ACKER");

    /* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/ack/BufferingMessageAcker$Tag.class */
    private static final class Tag {
        private final TagType type;
        private final long value;

        private Tag(TagType tagType, long j) {
            this.type = tagType;
            this.value = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/rabbitmq/ack/BufferingMessageAcker$TagType.class */
    public enum TagType {
        ACK,
        STOP
    }

    public BufferingMessageAcker(Channel channel) {
        this.consumerChannel = channel;
    }

    @Override // org.elasticsoftware.elasticactors.rabbitmq.MessageAcker
    public void deliver(long j) {
    }

    @Override // org.elasticsoftware.elasticactors.rabbitmq.MessageAcker
    public void ack(long j) {
        this.tagQueue.offer(new Tag(TagType.ACK, j));
    }

    @Override // org.elasticsoftware.elasticactors.rabbitmq.MessageAcker
    public void start() {
        logger.info("Using MessageAcker [{}]", getClass().getSimpleName());
        this.threadFactory.newThread(this).start();
    }

    @Override // org.elasticsoftware.elasticactors.rabbitmq.MessageAcker
    public void stop() {
        this.tagQueue.offer(new Tag(TagType.STOP, -1L));
        try {
            this.shutdownLatch.await(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                Tag poll = this.tagQueue.poll(200L, TimeUnit.MICROSECONDS);
                if (poll == null) {
                    flushAck();
                } else if (poll.type == TagType.ACK) {
                    this.pendingTags.add(Long.valueOf(poll.value));
                    this.highestAckedTag = Math.max(poll.value, this.highestAckedTag);
                } else {
                    if (poll.type == TagType.STOP) {
                        flushAck();
                        this.shutdownLatch.countDown();
                        return;
                    }
                    continue;
                }
            } catch (Throwable th) {
                logger.warn("Caught Throwable", th);
            }
        }
    }

    private void flushAck() {
        if (this.lastAckedTag == this.highestAckedTag) {
            return;
        }
        Long first = this.pendingTags.isEmpty() ? null : this.pendingTags.first();
        if (first == null) {
            return;
        }
        if (this.lastAckedTag != -1 || first.longValue() <= 1) {
            if (this.lastAckedTag <= 0 || first.longValue() <= this.lastAckedTag + 1) {
                this.pendingTags.pollFirst();
                while (true) {
                    Long pollFirst = this.pendingTags.pollFirst();
                    if (pollFirst != null) {
                        if (pollFirst.longValue() != first.longValue() + 1) {
                            this.pendingTags.add(pollFirst);
                            break;
                        }
                        first = pollFirst;
                    } else {
                        break;
                    }
                }
                if (first.longValue() <= 0 || first.longValue() <= this.lastAckedTag) {
                    return;
                }
                try {
                    this.consumerChannel.basicAck(first.longValue(), true);
                    logger.debug("Acked all messages from {} up until {}", Long.valueOf(this.lastAckedTag), first);
                    this.lastAckedTag = first.longValue();
                } catch (IOException e) {
                    logger.error("Exception while acking message", e);
                }
            }
        }
    }
}
