package com.hpe.caf.worker.queue.rabbit;

import com.hpe.caf.api.worker.InvalidTaskException;
import com.hpe.caf.api.worker.TaskCallback;
import com.hpe.caf.api.worker.TaskRejectedException;
import com.hpe.caf.util.rabbitmq.ConsumerAckEvent;
import com.hpe.caf.util.rabbitmq.ConsumerDropEvent;
import com.hpe.caf.util.rabbitmq.ConsumerRejectEvent;
import com.hpe.caf.util.rabbitmq.Delivery;
import com.hpe.caf.util.rabbitmq.Event;
import com.hpe.caf.util.rabbitmq.QueueConsumer;
import com.hpe.caf.util.rabbitmq.RabbitHeaders;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.propertyeditors.CustomBooleanEditor;

/* loaded from: input_file:com/hpe/caf/worker/queue/rabbit/WorkerQueueConsumerImpl.class */
public class WorkerQueueConsumerImpl implements QueueConsumer {
    public static final String REJECTED_REASON_TASKMESSAGE = "TASKMESSAGE_INVALID";
    public static final String REJECTED_REASON_RETRIES_EXCEEDED = "RETRIES_EXCEEDED";
    private final TaskCallback callback;
    private final RabbitMetricsReporter metrics;
    private final BlockingQueue<Event<QueueConsumer>> consumerEventQueue;
    private final BlockingQueue<Event<WorkerPublisher>> publisherEventQueue;
    private final Channel channel;
    private final String retryRoutingKey;
    private final int retryLimit;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) WorkerQueueConsumerImpl.class);

    public WorkerQueueConsumerImpl(TaskCallback taskCallback, RabbitMetricsReporter rabbitMetricsReporter, BlockingQueue<Event<QueueConsumer>> blockingQueue, Channel channel, BlockingQueue<Event<WorkerPublisher>> blockingQueue2, String str, int i) {
        this.callback = (TaskCallback) Objects.requireNonNull(taskCallback);
        this.metrics = (RabbitMetricsReporter) Objects.requireNonNull(rabbitMetricsReporter);
        this.consumerEventQueue = (BlockingQueue) Objects.requireNonNull(blockingQueue);
        this.channel = (Channel) Objects.requireNonNull(channel);
        this.publisherEventQueue = (BlockingQueue) Objects.requireNonNull(blockingQueue2);
        this.retryRoutingKey = (String) Objects.requireNonNull(str);
        this.retryLimit = i;
    }

    @Override // com.hpe.caf.util.rabbitmq.QueueConsumer
    public void processDelivery(Delivery delivery) {
        long deliveryTag = delivery.getEnvelope().getDeliveryTag();
        this.metrics.incrementReceived();
        if (delivery.getEnvelope().isRedeliver()) {
            handleRedelivery(delivery);
            return;
        }
        try {
            LOG.debug("Registering new message {}", Long.valueOf(deliveryTag));
            this.callback.registerNewTask(String.valueOf(deliveryTag), delivery.getMessageData(), delivery.getHeaders());
        } catch (InvalidTaskException e) {
            LOG.error("Cannot register new message, rejecting {}", Long.valueOf(deliveryTag), e);
            this.publisherEventQueue.add(new WorkerPublishQueueEvent(delivery.getMessageData(), this.retryRoutingKey, delivery.getEnvelope().getDeliveryTag(), Collections.singletonMap(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_REJECTED, REJECTED_REASON_TASKMESSAGE)));
        } catch (TaskRejectedException e2) {
            LOG.warn("Message {} rejected as a task at this time, returning to queue", Long.valueOf(deliveryTag), e2);
            this.publisherEventQueue.add(new WorkerPublishQueueEvent(delivery.getMessageData(), delivery.getEnvelope().getRoutingKey(), delivery.getEnvelope().getDeliveryTag()));
        }
    }

    @Override // com.hpe.caf.util.rabbitmq.QueueConsumer
    public void processAck(long j) {
        try {
            LOG.debug("Acknowledging message {}", Long.valueOf(j));
            this.channel.basicAck(j, false);
        } catch (IOException e) {
            LOG.warn("Couldn't ack message {}, will retry", Long.valueOf(j), e);
            this.metrics.incremementErrors();
            this.consumerEventQueue.add(new ConsumerAckEvent(j));
        }
    }

    @Override // com.hpe.caf.util.rabbitmq.QueueConsumer
    public void processReject(long j) {
        processReject(j, true);
    }

    @Override // com.hpe.caf.util.rabbitmq.QueueConsumer
    public void processDrop(long j) {
        processReject(j, false);
    }

    private void processReject(long j, boolean z) {
        try {
            this.channel.basicReject(j, z);
            if (z) {
                LOG.debug("Rejecting message {}", Long.valueOf(j));
                this.metrics.incrementRejected();
            } else {
                LOG.warn("Dropping message {}", Long.valueOf(j));
                this.metrics.incrementDropped();
            }
        } catch (IOException e) {
            LOG.warn("Couldn't reject message {}, will retry", Long.valueOf(j), e);
            this.metrics.incremementErrors();
            this.consumerEventQueue.add(z ? new ConsumerRejectEvent(j) : new ConsumerDropEvent(j));
        }
    }

    private void handleRedelivery(Delivery delivery) {
        int parseInt = Integer.parseInt(String.valueOf(delivery.getHeaders().getOrDefault(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY, CustomBooleanEditor.VALUE_0)));
        if (parseInt >= this.retryLimit) {
            LOG.debug("Retry exceeded for message with id {}, republishing to rejected queue", Long.valueOf(delivery.getEnvelope().getDeliveryTag()));
            HashMap hashMap = new HashMap();
            hashMap.put(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY, String.valueOf(parseInt));
            hashMap.put(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_REJECTED, REJECTED_REASON_RETRIES_EXCEEDED);
            this.publisherEventQueue.add(new WorkerPublishQueueEvent(delivery.getMessageData(), this.retryRoutingKey, delivery.getEnvelope().getDeliveryTag(), hashMap));
            return;
        }
        LOG.debug("Received redelivered message with id {}, retry count {}, retry limit {}, republishing to retry queue", Long.valueOf(delivery.getEnvelope().getDeliveryTag()), Integer.valueOf(this.retryLimit), Integer.valueOf(parseInt + 1));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY, String.valueOf(parseInt + 1));
        hashMap2.put(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY_LIMIT, new Integer(this.retryLimit));
        this.publisherEventQueue.add(new WorkerPublishQueueEvent(delivery.getMessageData(), this.retryRoutingKey, delivery.getEnvelope().getDeliveryTag(), hashMap2));
    }
}
