package io.pythagoras.common.s3eventqueue;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.google.gson.Gson;
import io.pythagoras.common.s3eventqueue.classes.Record;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;

/* loaded from: input_file:io/pythagoras/common/s3eventqueue/S3EventQueue.class */
public class S3EventQueue {
    public static final String S3E_TestEvent = "s3:TestEvent";
    public static final String S3E_ObjectCreated_Put = "s3:ObjectCreated:Put";
    public static final String S3E_ObjectCreated_Post = "s3:ObjectCreated:Post";
    public static final String S3E_ObjectCreated_Copy = "s3:ObjectCreated:Copy";
    public static final String S3E_ObjectCreated_CompleteMultipartUpload = "s3:ObjectCreated:CompleteMultipartUpload";
    public static final String S3E_ObjectRemoved_Delete = "s3:ObjectRemoved:Delete";
    public static final String S3E_ObjectRemoved_DeleteMarkerCreated = "s3:ObjectRemoved:DeleteMarkerCreated";
    public static final String S3E_ReducedRedundancyLostObject = "s3:ReducedRedundancyLostObject";
    private Logger logger;
    private AmazonSQS sqs;
    private String queueUrl;
    private Integer parallelReceivers = 4;
    private boolean started = false;
    private S3EventProcessorInterface eventProcessor;
    private boolean isSNStoSQS;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pythagoras/common/s3eventqueue/S3EventQueue$S3Notification.class */
    public class S3Notification {
        List<Record> Records = new ArrayList();
        String Event;

        S3Notification() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pythagoras/common/s3eventqueue/S3EventQueue$SNSNotification.class */
    public class SNSNotification {
        String Type;
        String MessageId;
        String TopicArn;
        String Subject;
        String Message;
        String Timestamp;

        SNSNotification() {
        }
    }

    public S3EventQueue(S3EventProcessorInterface s3EventProcessorInterface, Logger logger, String str, String str2, String str3, String str4, Boolean bool) {
        this.isSNStoSQS = false;
        this.isSNStoSQS = bool.booleanValue();
        this.queueUrl = str4;
        this.logger = logger;
        this.eventProcessor = s3EventProcessorInterface;
        this.sqs = (AmazonSQS) AmazonSQSClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(str, str2))).withRegion(str3).build();
    }

    public void setParallelReceivers(Integer num) throws S3EventQueueException {
        if (this.started) {
            throw new S3EventQueueException("Cannot change number of parallel receivers once SQSConsumer is started.");
        }
        if (num.intValue() > 20) {
            throw new S3EventQueueException("Lets not be crazy. " + num + " parallel receivers is a lot.  Keep it to 20 or less.");
        }
        this.parallelReceivers = num;
    }

    public void start() {
        if (this.started) {
            return;
        }
        this.logger.info("Starting");
        this.started = true;
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(this.parallelReceivers.intValue());
        Runnable runnable = () -> {
            receiveMessages();
        };
        for (int i = 0; i < this.parallelReceivers.intValue(); i++) {
            newScheduledThreadPool.scheduleWithFixedDelay(runnable, 0L, 1L, TimeUnit.MILLISECONDS);
        }
    }

    private void receiveMessages() {
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
        receiveMessageRequest.setQueueUrl(this.queueUrl);
        receiveMessageRequest.setMaxNumberOfMessages(1);
        receiveMessageRequest.setWaitTimeSeconds(20);
        for (Message message : this.sqs.receiveMessage(receiveMessageRequest).getMessages()) {
            this.logger.info("S3 Change Detected: ", message);
            try {
                handleSQSMessage(message);
                this.sqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, message.getReceiptHandle()));
            } catch (Exception e) {
                this.logger.error("Unable to fully process notification message from S3: " + e.getMessage() + "\n" + new Gson().toJson(message), e);
            }
        }
    }

    void handleSQSMessage(Message message) throws S3EventQueueException {
        String body = message.getBody();
        if (this.isSNStoSQS) {
            body = jsonToSnsMessage(message.getBody()).Message;
        }
        handleMessage(jsonToS3Notification(body));
    }

    Message jsonToSqsMessage(String str) throws S3EventQueueException {
        try {
            Message message = (Message) new Gson().fromJson(str, Message.class);
            if (message == null) {
                throw new RuntimeException("Unable to unwrap events from SNS.");
            }
            return message;
        } catch (Exception e) {
            throw new S3EventQueueException("Error handling S3 Watcher message.", e);
        }
    }

    SNSNotification jsonToSnsMessage(String str) throws S3EventQueueException {
        try {
            SNSNotification sNSNotification = (SNSNotification) new Gson().fromJson(str, SNSNotification.class);
            if (sNSNotification == null) {
                throw new RuntimeException("Unable to unwrap events from SNS.");
            }
            return sNSNotification;
        } catch (Exception e) {
            throw new S3EventQueueException("Error handling S3 Watcher message.", e);
        }
    }

    S3Notification jsonToS3Notification(String str) throws S3EventQueueException {
        Gson gson = new Gson();
        this.logger.info("Unwrapping SNS Message.");
        try {
            S3Notification s3Notification = (S3Notification) gson.fromJson(str, S3Notification.class);
            if (s3Notification == null || (s3Notification.Event == null && s3Notification.Records == null)) {
                throw new RuntimeException("Unable to unwrap events from SNS.");
            }
            return s3Notification;
        } catch (Exception e) {
            throw new S3EventQueueException("Error handling S3 Watcher message.", e);
        }
    }

    private void handleMessage(S3Notification s3Notification) throws S3EventQueueException {
        if (s3Notification.Event != null && s3Notification.Event.equals(S3E_TestEvent)) {
            this.logger.info("S3 Test event received.");
        } else {
            if (s3Notification.Records == null) {
                throw new S3EventQueueException("Not a test, not a real message.  I'm confused.");
            }
            this.logger.info("Looping notification records.");
            Iterator<Record> it = s3Notification.Records.iterator();
            while (it.hasNext()) {
                handleRecord(it.next());
            }
        }
    }

    void handleRecord(Record record) {
        this.eventProcessor.handleEvent(record);
    }
}
