package com.netflix.conductor.sqs.eventqueue;

import com.amazonaws.auth.policy.Policy;
import com.amazonaws.auth.policy.Principal;
import com.amazonaws.auth.policy.Resource;
import com.amazonaws.auth.policy.Statement;
import com.amazonaws.auth.policy.actions.SQSActions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.ListQueuesRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SetQueueAttributesResult;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.metrics.Monitors;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Action1;

/* loaded from: input_file:com/netflix/conductor/sqs/eventqueue/SQSObservableQueue.class */
public class SQSObservableQueue implements ObservableQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(SQSObservableQueue.class);
    private static final String QUEUE_TYPE = "sqs";
    private final String queueName;
    private final int visibilityTimeoutInSeconds;
    private final int batchSize;
    private final AmazonSQS client;
    private final long pollTimeInMS;
    private final String queueURL = getOrCreateQueue();
    private final Scheduler scheduler;
    private volatile boolean running;

    /* loaded from: input_file:com/netflix/conductor/sqs/eventqueue/SQSObservableQueue$Builder.class */
    public static class Builder {
        private String queueName;
        private AmazonSQS client;
        private Scheduler scheduler;
        private int visibilityTimeout = 30;
        private int batchSize = 5;
        private long pollTimeInMS = 100;
        private List<String> accountsToAuthorize = new LinkedList();

        public Builder withQueueName(String str) {
            this.queueName = str;
            return this;
        }

        public Builder withVisibilityTimeout(int i) {
            this.visibilityTimeout = i;
            return this;
        }

        public Builder withBatchSize(int i) {
            this.batchSize = i;
            return this;
        }

        public Builder withClient(AmazonSQS amazonSQS) {
            this.client = amazonSQS;
            return this;
        }

        public Builder withPollTimeInMS(long j) {
            this.pollTimeInMS = j;
            return this;
        }

        public Builder withAccountsToAuthorize(List<String> list) {
            this.accountsToAuthorize = list;
            return this;
        }

        public Builder addAccountToAuthorize(String str) {
            this.accountsToAuthorize.add(str);
            return this;
        }

        public Builder withScheduler(Scheduler scheduler) {
            this.scheduler = scheduler;
            return this;
        }

        public SQSObservableQueue build() {
            return new SQSObservableQueue(this.queueName, this.client, this.visibilityTimeout, this.batchSize, this.pollTimeInMS, this.accountsToAuthorize, this.scheduler);
        }
    }

    private SQSObservableQueue(String str, AmazonSQS amazonSQS, int i, int i2, long j, List<String> list, Scheduler scheduler) {
        this.queueName = str;
        this.client = amazonSQS;
        this.visibilityTimeoutInSeconds = i;
        this.batchSize = i2;
        this.pollTimeInMS = j;
        this.scheduler = scheduler;
        addPolicy(list);
    }

    public Observable<Message> observe() {
        return Observable.create(getOnSubscribe());
    }

    public List<String> ack(List<Message> list) {
        return delete(list);
    }

    public void publish(List<Message> list) {
        publishMessages(list);
    }

    public long size() {
        try {
            return Long.parseLong((String) this.client.getQueueAttributes(this.queueURL, Collections.singletonList("ApproximateNumberOfMessages")).getAttributes().get("ApproximateNumberOfMessages"));
        } catch (Exception e) {
            return -1L;
        }
    }

    public void setUnackTimeout(Message message, long j) {
        this.client.changeMessageVisibility(new ChangeMessageVisibilityRequest(this.queueURL, message.getReceipt(), Integer.valueOf((int) (j / 1000))));
    }

    public String getType() {
        return QUEUE_TYPE;
    }

    public String getName() {
        return this.queueName;
    }

    public String getURI() {
        return this.queueURL;
    }

    public long getPollTimeInMS() {
        return this.pollTimeInMS;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public int getVisibilityTimeoutInSeconds() {
        return this.visibilityTimeoutInSeconds;
    }

    public void start() {
        LOGGER.info("Started listening to {}:{}", getClass().getSimpleName(), this.queueName);
        this.running = true;
    }

    public void stop() {
        LOGGER.info("Stopped listening to {}:{}", getClass().getSimpleName(), this.queueName);
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    String getOrCreateQueue() {
        List<String> listQueues = listQueues(this.queueName);
        if (listQueues != null && !listQueues.isEmpty()) {
            return listQueues.get(0);
        }
        return this.client.createQueue(new CreateQueueRequest().withQueueName(this.queueName)).getQueueUrl();
    }

    private String getQueueARN() {
        return (String) this.client.getQueueAttributes(this.queueURL, Collections.singletonList("QueueArn")).getAttributes().get("QueueArn");
    }

    private void addPolicy(List<String> list) {
        if (list == null || list.isEmpty()) {
            LOGGER.info("No additional security policies attached for the queue " + this.queueName);
            return;
        }
        LOGGER.info("Authorizing " + list + " to the queue " + this.queueName);
        HashMap hashMap = new HashMap();
        hashMap.put("Policy", getPolicy(list));
        SetQueueAttributesResult queueAttributes = this.client.setQueueAttributes(this.queueURL, hashMap);
        LOGGER.info("policy attachment result: " + queueAttributes);
        LOGGER.info("policy attachment result: status=" + queueAttributes.getSdkHttpMetadata().getHttpStatusCode());
    }

    private String getPolicy(List<String> list) {
        Policy policy = new Policy("AuthorizedWorkerAccessPolicy");
        Statement statement = new Statement(Statement.Effect.Allow);
        statement.getActions().add(SQSActions.SendMessage);
        statement.setResources(new LinkedList());
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            statement.getPrincipals().add(new Principal(it.next()));
        }
        statement.getResources().add(new Resource(getQueueARN()));
        policy.getStatements().add(statement);
        return policy.toJson();
    }

    private List<String> listQueues(String str) {
        return (List) this.client.listQueues(new ListQueuesRequest().withQueueNamePrefix(str)).getQueueUrls().stream().filter(str2 -> {
            return str2.contains(str);
        }).collect(Collectors.toList());
    }

    private void publishMessages(List<Message> list) {
        LOGGER.debug("Sending {} messages to the SQS queue: {}", Integer.valueOf(list.size()), this.queueName);
        SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest(this.queueURL);
        list.forEach(message -> {
            sendMessageBatchRequest.getEntries().add(new SendMessageBatchRequestEntry(message.getId(), message.getPayload()));
        });
        LOGGER.debug("sending {} messages in batch", Integer.valueOf(sendMessageBatchRequest.getEntries().size()));
        LOGGER.debug("send result: {} for SQS queue: {}", this.client.sendMessageBatch(sendMessageBatchRequest).getFailed().toString(), this.queueName);
    }

    List<Message> receiveMessages() {
        try {
            List<Message> list = (List) this.client.receiveMessage(new ReceiveMessageRequest().withQueueUrl(this.queueURL).withVisibilityTimeout(Integer.valueOf(this.visibilityTimeoutInSeconds)).withMaxNumberOfMessages(Integer.valueOf(this.batchSize))).getMessages().stream().map(message -> {
                return new Message(message.getMessageId(), message.getBody(), message.getReceiptHandle());
            }).collect(Collectors.toList());
            Monitors.recordEventQueueMessagesProcessed(QUEUE_TYPE, this.queueName, list.size());
            return list;
        } catch (Exception e) {
            LOGGER.error("Exception while getting messages from SQS", e);
            Monitors.recordObservableQMessageReceivedErrors(QUEUE_TYPE);
            return new ArrayList();
        }
    }

    Observable.OnSubscribe<Message> getOnSubscribe() {
        return subscriber -> {
            Observable flatMap = Observable.interval(this.pollTimeInMS, TimeUnit.MILLISECONDS).flatMap(l -> {
                if (isRunning()) {
                    return Observable.from(receiveMessages());
                }
                LOGGER.debug("Component stopped, skip listening for messages from SQS");
                return Observable.from(Collections.emptyList());
            });
            Objects.requireNonNull(subscriber);
            Action1 action1 = (v1) -> {
                r1.onNext(v1);
            };
            Objects.requireNonNull(subscriber);
            flatMap.subscribe(action1, subscriber::onError);
        };
    }

    private List<String> delete(List<Message> list) {
        if (list == null || list.isEmpty()) {
            return null;
        }
        DeleteMessageBatchRequest withQueueUrl = new DeleteMessageBatchRequest().withQueueUrl(this.queueURL);
        List entries = withQueueUrl.getEntries();
        list.forEach(message -> {
            entries.add(new DeleteMessageBatchRequestEntry().withId(message.getId()).withReceiptHandle(message.getReceipt()));
        });
        List<String> list2 = (List) this.client.deleteMessageBatch(withQueueUrl).getFailed().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        LOGGER.debug("Failed to delete messages from queue: {}: {}", this.queueName, list2);
        return list2;
    }
}
