package io.reflectoring.sqs.internal;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.reflectoring.sqs.api.SqsMessageHandler;
import io.reflectoring.sqs.api.SqsMessagePollerProperties;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/reflectoring/sqs/internal/SqsMessagePoller.class */
class SqsMessagePoller<T> {
    private static final Logger logger = LoggerFactory.getLogger(SqsMessagePoller.class);
    private final SqsMessageHandler<T> messageHandler;
    private final SqsMessagePollerProperties pollingProperties;
    private final AmazonSQS sqsClient;
    private final ObjectMapper objectMapper;
    private final ScheduledThreadPoolExecutor pollerThreadPool;
    private final ThreadPoolExecutor handlerThreadPool;

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        logger.info("starting SqsMessagePoller");
        for (int i = 0; i < this.pollerThreadPool.getCorePoolSize(); i++) {
            logger.info("starting SqsMessagePoller - thread {}", Integer.valueOf(i));
            this.pollerThreadPool.scheduleWithFixedDelay(this::poll, 1L, this.pollingProperties.getPollDelay().toSeconds(), TimeUnit.SECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        logger.info("stopping SqsMessagePoller");
        this.pollerThreadPool.shutdownNow();
        this.handlerThreadPool.shutdownNow();
    }

    private void poll() {
        for (Message message : fetchMessages(this.sqsClient, this.pollingProperties)) {
            try {
                Object readValue = this.objectMapper.readValue(message.getBody(), this.messageHandler.messageType());
                this.handlerThreadPool.submit(() -> {
                    this.messageHandler.handle(readValue);
                    acknowledgeMessage(message);
                });
            } catch (JsonProcessingException e) {
                logger.warn("error parsing message: ", e);
            }
        }
    }

    private List<Message> fetchMessages(AmazonSQS amazonSQS, SqsMessagePollerProperties sqsMessagePollerProperties) {
        logger.debug("polling messages from SQS queue {}", sqsMessagePollerProperties.getQueueUrl());
        ReceiveMessageResult receiveMessage = amazonSQS.receiveMessage(new ReceiveMessageRequest().withMaxNumberOfMessages(Integer.valueOf(sqsMessagePollerProperties.getBatchSize())).withQueueUrl(sqsMessagePollerProperties.getQueueUrl()).withWaitTimeSeconds(Integer.valueOf((int) sqsMessagePollerProperties.getWaitTime().toSeconds())));
        if (receiveMessage.getSdkHttpMetadata().getHttpStatusCode() != 200) {
            logger.error("got error response from SQS queue {}: {}", sqsMessagePollerProperties.getQueueUrl(), receiveMessage.getSdkHttpMetadata());
            return Collections.emptyList();
        }
        if (receiveMessage.getMessages().isEmpty()) {
            logger.debug("empty polling result from SQS queue {}", sqsMessagePollerProperties.getQueueUrl());
            return Collections.emptyList();
        }
        logger.debug("polled {} messages from SQS queue {}", Integer.valueOf(receiveMessage.getMessages().size()), sqsMessagePollerProperties.getQueueUrl());
        return receiveMessage.getMessages();
    }

    private void acknowledgeMessage(Message message) {
        this.sqsClient.deleteMessage(this.pollingProperties.getQueueUrl(), message.getReceiptHandle());
    }

    @Generated
    public SqsMessagePoller(SqsMessageHandler<T> sqsMessageHandler, SqsMessagePollerProperties sqsMessagePollerProperties, AmazonSQS amazonSQS, ObjectMapper objectMapper, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, ThreadPoolExecutor threadPoolExecutor) {
        this.messageHandler = sqsMessageHandler;
        this.pollingProperties = sqsMessagePollerProperties;
        this.sqsClient = amazonSQS;
        this.objectMapper = objectMapper;
        this.pollerThreadPool = scheduledThreadPoolExecutor;
        this.handlerThreadPool = threadPoolExecutor;
    }
}
