package io.polyglotted.aws.message;

import com.amazonaws.AbortedException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import io.polyglotted.aws.common.AwsClientFactory;
import io.polyglotted.aws.config.AwsConfig;
import io.polyglotted.common.util.CollUtil;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/polyglotted/aws/message/SqsSubscriber.class */
public class SqsSubscriber implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(SqsSubscriber.class);
    private final ExecutorService receiverPool;
    private final ExecutorService handlerPool;
    private final AmazonSQS sqsClient;
    private final QueueConfig subscriber;
    private final Object messageHandler;

    public SqsSubscriber(AwsConfig awsConfig, QueueConfig queueConfig, Object obj) {
        this(AwsClientFactory.createSqsClient(awsConfig), queueConfig, obj);
        this.receiverPool.execute(this::receiveMessage);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        SubscriberUtil.stopThreadPool(this.receiverPool, this.subscriber.getShutdownTimeInMillis(), TimeUnit.MILLISECONDS);
        SubscriberUtil.stopThreadPool(this.handlerPool, this.subscriber.getShutdownTimeInMillis(), TimeUnit.MILLISECONDS);
    }

    private void receiveMessage() {
        if (this.subscriber.isEnabled()) {
            String queueUrl = this.sqsClient.getQueueUrl(this.subscriber.getQueue()).getQueueUrl();
            while (!Thread.interrupted()) {
                try {
                    List<Message> messages = this.sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(Integer.valueOf(this.subscriber.getWaitTimeInSeconds())).withMaxNumberOfMessages(Integer.valueOf(this.subscriber.getMaxMessages()))).getMessages();
                    if (messages.size() > 0) {
                        handleMessages(queueUrl, messages);
                    }
                } catch (Exception e) {
                    log.error("error receiving message from sqs", e);
                } catch (AbortedException e2) {
                    return;
                }
            }
        }
    }

    private void handleMessages(String str, List<Message> list) {
        for (Message message : list) {
            this.handlerPool.submit(() -> {
                try {
                    SubscriberUtil.safeInvoke(this.messageHandler, "handleMessage", new Class[]{String.class}, message.getBody());
                    this.sqsClient.deleteMessage(new DeleteMessageRequest(str, message.getReceiptHandle()));
                } catch (Exception e) {
                    log.error("error processing messages: " + CollUtil.transformList(list, (v0) -> {
                        return v0.getMessageId();
                    }), e);
                }
            });
        }
    }

    public SqsSubscriber(AmazonSQS amazonSQS, QueueConfig queueConfig, Object obj) {
        this.receiverPool = Executors.newSingleThreadExecutor();
        this.handlerPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        this.sqsClient = amazonSQS;
        this.subscriber = queueConfig;
        this.messageHandler = obj;
    }
}
