package com.hpe.caf.worker.queue.rabbit;

import com.hpe.caf.util.rabbitmq.ConsumerAckEvent;
import com.hpe.caf.util.rabbitmq.ConsumerRejectEvent;
import com.hpe.caf.util.rabbitmq.Event;
import com.hpe.caf.util.rabbitmq.QueueConsumer;
import com.rabbitmq.client.ConfirmListener;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hpe/caf/worker/queue/rabbit/WorkerConfirmListener.class */
class WorkerConfirmListener implements ConfirmListener {
    private final SortedMap<Long, Long> confirmMap = Collections.synchronizedSortedMap(new TreeMap());
    private final BlockingQueue<Event<QueueConsumer>> consumerEvents;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) WorkerConfirmListener.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public WorkerConfirmListener(BlockingQueue<Event<QueueConsumer>> blockingQueue) {
        this.consumerEvents = (BlockingQueue) Objects.requireNonNull(blockingQueue);
    }

    public void registerResponseSequence(long j, long j2) {
        if (this.confirmMap.putIfAbsent(Long.valueOf(j), Long.valueOf(j2)) != null) {
            throw new IllegalStateException("Sequence id " + j + " already present in confirmations map");
        }
        LOG.debug("Listening for confirmation of publish sequence {} (ack message: {})", Long.valueOf(j), Long.valueOf(j2));
        this.confirmMap.put(Long.valueOf(j), Long.valueOf(j2));
    }

    public void clearConfirmations() {
        LOG.info("Clearing confirmations map");
        this.confirmMap.clear();
    }

    @Override // com.rabbitmq.client.ConfirmListener
    public void handleAck(long j, boolean z) throws IOException {
        LOG.debug("RabbitMQ broker ACKed published sequence id {} (multiple: {})", Long.valueOf(j), Boolean.valueOf(z));
        handle(j, z, (v1) -> {
            return new ConsumerAckEvent(v1);
        });
    }

    @Override // com.rabbitmq.client.ConfirmListener
    public void handleNack(long j, boolean z) throws IOException {
        LOG.warn("RabbitMQ broker NACKed published sequence id {} (multiple: {})", Long.valueOf(j), Boolean.valueOf(z));
        handle(j, z, (v1) -> {
            return new ConsumerRejectEvent(v1);
        });
    }

    private void handle(long j, boolean z, Function<Long, Event<QueueConsumer>> function) {
        if (!z) {
            Long remove = this.confirmMap.remove(Long.valueOf(j));
            if (remove == null) {
                LOG.error("RabbitMQ broker sent confirm for sequence number {}, which is not registered", Long.valueOf(j));
                throw new IllegalStateException("Sequence number " + j + " not found in WorkerConfirmListener");
            }
            this.consumerEvents.add(function.apply(remove));
            return;
        }
        SortedMap<Long, Long> headMap = this.confirmMap.headMap(Long.valueOf(j + 1));
        synchronized (this.confirmMap) {
            BlockingQueue<Event<QueueConsumer>> blockingQueue = this.consumerEvents;
            Stream<Long> stream = headMap.values().stream();
            function.getClass();
            blockingQueue.addAll((Collection) stream.map((v1) -> {
                return r2.apply(v1);
            }).collect(Collectors.toList()));
        }
        headMap.clear();
    }
}
