package org.awsutils.sqs.autoconfigure;

import io.vavr.Tuple;
import io.vavr.Tuple2;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.validation.ValidationException;
import jakarta.validation.constraints.NotNull;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.awsutils.common.util.LimitedQueue;
import org.awsutils.common.util.Utils;
import org.awsutils.sqs.autoconfigure.SqsConfig;
import org.awsutils.sqs.client.SyncSqsMessageClient;
import org.awsutils.sqs.config.WorkerNodeCheckFunc;
import org.awsutils.sqs.handler.MessageHandlerFactory;
import org.awsutils.sqs.listener.SqsMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConstructorArgumentValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.CollectionUtils;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsClient;

@Configuration
@ConditionalOnBean({TaskScheduler.class, SyncSqsMessageClient.class, MessageHandlerFactory.class, SqsAsyncClient.class})
/* loaded from: input_file:org/awsutils/sqs/autoconfigure/SqsMessageListenerInitializer.class */
public class SqsMessageListenerInitializer {
    private final SqsMessageListenerListProperties sqsMessageListenerListProperties;
    private final SqsCommonProperties sqsCommonProperties;
    private final ApplicationContext applicationContext;
    private final SqsConfig.SqsPropertyFunc1<String, Integer> propertyFunc;
    private final MessageHandlerFactory messageHandlerFactory;
    private final SyncSqsMessageClient syncSqsMessageClient;
    private final SqsClient sqsSyncClient;
    private final SqsListenerScheduleConfig schedulingConfigurer;
    private final Environment environment;
    private final List<ExecutorService> executorServices = new ArrayList();

    @Autowired(required = false)
    private SqsConfig.CommonExecutorService commonExecutorService;

    @Autowired(required = false)
    private WorkerNodeCheckFunc workerNodeCheckFunc;
    private static final String SQS_MESSAGE_LISTENER_KEY = "sqsMessageListener_{0}";
    private static final Logger LOGGER = LoggerFactory.getLogger(SqsMessageListenerInitializer.class);
    private static final Integer DEFAULT_WAIT_TIME_IN_SECONDS = 10;

    /* loaded from: input_file:org/awsutils/sqs/autoconfigure/SqsMessageListenerInitializer$SqsMessageListenerWrapper.class */
    private static class SqsMessageListenerWrapper implements SqsMessageListener {
        private static final long SEMAPHORE_TIMEOUT_IN_SECONDS = 15;
        private final Environment environment;
        private final String numberOfListenersProperty;
        private final Function<Integer, SqsMessageListener> sqsMessageListenerFunc;
        private final List<ExecutorService> executorServices;
        private final String listenerName;
        private List<SqsMessageListener> sqsMessageListeners;
        private ExecutorService executorService;
        private Semaphore semaphore;
        private long lastCheckedTime;
        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        private static final long TEN_MINUTES_IN_MILLIS = TimeUnit.MINUTES.toMillis(10);

        public SqsMessageListenerWrapper(List<ExecutorService> list, String str, Environment environment, Function<Integer, SqsMessageListener> function, String str2) {
            this.environment = environment;
            this.numberOfListenersProperty = str;
            this.sqsMessageListenerFunc = function;
            this.listenerName = str2;
            int numberOfListeners = getNumberOfListeners();
            this.semaphore = new Semaphore(numberOfListeners);
            this.sqsMessageListeners = new ArrayList();
            this.sqsMessageListeners = (List) IntStream.range(0, numberOfListeners).boxed().map(function).collect(Collectors.toList());
            this.lastCheckedTime = System.currentTimeMillis();
            synchronized (SqsMessageListenerWrapper.class) {
                this.executorService = Executors.newFixedThreadPool(numberOfListeners);
                this.executorServices = list;
                list.add(this.executorService);
            }
        }

        public void receive() {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastCheckedTime >= TEN_MINUTES_IN_MILLIS) {
                checkForUpdates();
                this.lastCheckedTime = currentTimeMillis;
            }
            Utils.executeUsingLock(this.lock.readLock(), () -> {
                this.sqsMessageListeners.stream().map(this::submitJobToListener).forEach(this::waitForCompletion);
            });
        }

        private void checkForUpdates() {
            int numberOfListeners = getNumberOfListeners();
            if (numberOfListeners != this.sqsMessageListeners.size()) {
                SqsMessageListenerInitializer.LOGGER.info("Number of listeners have changed for {} from {} to {}", new Object[]{this.listenerName, Integer.valueOf(this.sqsMessageListeners.size()), Integer.valueOf(numberOfListeners)});
                Utils.executeUsingLock(this.lock.writeLock(), () -> {
                    this.sqsMessageListeners = (List) IntStream.range(0, numberOfListeners).boxed().map(this.sqsMessageListenerFunc).collect(Collectors.toList());
                    this.semaphore = new Semaphore(numberOfListeners);
                    synchronized (SqsMessageListenerWrapper.class) {
                        this.executorServices.remove(this.executorService);
                        this.executorService.shutdown();
                        this.executorService = Executors.newFixedThreadPool(this.sqsMessageListeners.size());
                        this.executorServices.add(this.executorService);
                    }
                });
            }
        }

        private int getNumberOfListeners() {
            if (StringUtils.isEmpty(this.numberOfListenersProperty)) {
                return 1;
            }
            return ((Integer) this.environment.getProperty(this.numberOfListenersProperty, Integer.class)).intValue();
        }

        public void waitForCompletion(Tuple2<Boolean, Future<?>> tuple2) {
            try {
                try {
                    ((Future) tuple2._2()).get();
                    if (((Boolean) tuple2._1()).booleanValue()) {
                        this.semaphore.release();
                    }
                } catch (InterruptedException e) {
                    Utils.handleInterruptedException(e, () -> {
                    });
                    if (((Boolean) tuple2._1()).booleanValue()) {
                        this.semaphore.release();
                    }
                } catch (ExecutionException e2) {
                    Exception exc = (Exception) e2.getCause();
                    if (!(exc instanceof RuntimeException)) {
                        throw new RuntimeException(exc);
                    }
                }
            } catch (Throwable th) {
                if (((Boolean) tuple2._1()).booleanValue()) {
                    this.semaphore.release();
                }
                throw th;
            }
        }

        public Tuple2<Boolean, Future<?>> submitJobToListener(SqsMessageListener sqsMessageListener) {
            try {
                if (!this.semaphore.tryAcquire(SEMAPHORE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)) {
                    return Tuple.of(false, CompletableFuture.completedFuture(null));
                }
                ExecutorService executorService = this.executorService;
                Objects.requireNonNull(sqsMessageListener);
                return Tuple.of(true, executorService.submit(sqsMessageListener::receive));
            } catch (InterruptedException e) {
                return (Tuple2) Utils.handleInterruptedException(e, () -> {
                    return Tuple.of(false, CompletableFuture.completedFuture(null));
                });
            }
        }
    }

    public SqsMessageListenerInitializer(SqsMessageListenerListProperties sqsMessageListenerListProperties, SqsCommonProperties sqsCommonProperties, ApplicationContext applicationContext, SqsConfig.SqsPropertyFunc1<String, Integer> sqsPropertyFunc1, MessageHandlerFactory messageHandlerFactory, SyncSqsMessageClient syncSqsMessageClient, SqsClient sqsClient, SqsListenerScheduleConfig sqsListenerScheduleConfig, Environment environment) {
        this.sqsMessageListenerListProperties = sqsMessageListenerListProperties;
        this.sqsCommonProperties = sqsCommonProperties;
        this.applicationContext = applicationContext;
        this.propertyFunc = sqsPropertyFunc1;
        this.messageHandlerFactory = messageHandlerFactory;
        this.syncSqsMessageClient = syncSqsMessageClient;
        this.sqsSyncClient = sqsClient;
        this.schedulingConfigurer = sqsListenerScheduleConfig;
        this.environment = environment;
    }

    @PostConstruct
    public void init() {
        if (this.sqsMessageListenerListProperties != null) {
            Map<String, SqsMessageListenerProperties> listener = this.sqsMessageListenerListProperties.getListener();
            if (CollectionUtils.isEmpty(listener)) {
                return;
            }
            BeanDefinitionRegistry autowireCapableBeanFactory = this.applicationContext.getAutowireCapableBeanFactory();
            listener.keySet().forEach(str -> {
                registerSqsListener(autowireCapableBeanFactory, str, (SqsMessageListenerProperties) listener.get(str));
            });
        }
    }

    public void registerSqsListener(BeanDefinitionRegistry beanDefinitionRegistry, String str, SqsMessageListenerProperties sqsMessageListenerProperties) {
        try {
            GenericBeanDefinition genericBeanDefinition = new GenericBeanDefinition();
            ConstructorArgumentValues constructorArgumentValues = new ConstructorArgumentValues();
            String listenerName = sqsMessageListenerProperties.getListenerName();
            String format = MessageFormat.format(SQS_MESSAGE_LISTENER_KEY, str);
            String rateLimiterName = sqsMessageListenerProperties.getRateLimiterName();
            String statusProperty = sqsMessageListenerProperties.getStatusProperty();
            Integer waitTimeInSeconds = sqsMessageListenerProperties.getWaitTimeInSeconds();
            String messageHandlerRateLimiterName = sqsMessageListenerProperties.getMessageHandlerRateLimiterName();
            WorkerNodeCheckFunc workerNodeCheckFunc = this.workerNodeCheckFunc == null ? () -> {
                return StringUtils.isEmpty(statusProperty) || isSqsListenerEnabled(statusProperty);
            } : () -> {
                return (StringUtils.isEmpty(statusProperty) || isSqsListenerEnabled(statusProperty)) && this.workerNodeCheckFunc.check();
            };
            Function function = num -> {
                return SqsMessageListener.builder().sqsSyncClient(this.sqsSyncClient).messageHandlerFactory(this.messageHandlerFactory).executorService(this.sqsCommonProperties.isUseCommonThreadPool() ? this.commonExecutorService.executorService() : createExecutorService(sqsMessageListenerProperties.getThreadPoolSize())).rateLimiterName(!StringUtils.isEmpty(rateLimiterName) ? rateLimiterName : null).maximumNumberOfMessagesKey(sqsMessageListenerProperties.getMaximumNumberOfMessagesKey()).semaphore(new Semaphore(1)).propertyReaderFunction(this.propertyFunc).syncSqsMessageClient(this.syncSqsMessageClient).workerNodeCheck(workerNodeCheckFunc).listenerName(!StringUtils.isEmpty(listenerName) ? String.format("%s_%d", listenerName, num) : String.format("%s_%d", str, num)).messageHandlerRateLimiter(!StringUtils.isEmpty(messageHandlerRateLimiterName) ? messageHandlerRateLimiterName : null).statusProperty(!StringUtils.isEmpty(statusProperty) ? statusProperty : null).waitTimeInSeconds((waitTimeInSeconds == null || waitTimeInSeconds.intValue() <= 0) ? DEFAULT_WAIT_TIME_IN_SECONDS : waitTimeInSeconds).queueUrl(sqsMessageListenerProperties.getQueueUrl()).build();
            };
            validate(sqsMessageListenerProperties);
            genericBeanDefinition.setBeanClassName("org.awsutils.sqs.autoconfigure.SqsMessageListenerInitializer.SqsMessageListenerWrapper");
            int i = 0 + 1;
            constructorArgumentValues.addIndexedArgumentValue(0, this.executorServices);
            int i2 = i + 1;
            constructorArgumentValues.addIndexedArgumentValue(i, sqsMessageListenerProperties.getNumberOfListenersProperty());
            int i3 = i2 + 1;
            constructorArgumentValues.addIndexedArgumentValue(i2, this.environment);
            int i4 = i3 + 1;
            constructorArgumentValues.addIndexedArgumentValue(i3, function);
            int i5 = i4 + 1;
            constructorArgumentValues.addIndexedArgumentValue(i4, !StringUtils.isEmpty(listenerName) ? listenerName : str);
            genericBeanDefinition.setConstructorArgumentValues(constructorArgumentValues);
            beanDefinitionRegistry.registerBeanDefinition(format, genericBeanDefinition);
            this.schedulingConfigurer.addListener((SqsMessageListener) this.applicationContext.getBean(format), sqsMessageListenerProperties.getMaximumNumberOfMessagesKey(), sqsMessageListenerProperties.getScheduleRunIntervalKey(), this.propertyFunc);
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    private boolean isSqsListenerEnabled(String str) {
        Boolean bool = (Boolean) this.environment.getProperty(str, Boolean.class);
        return bool == null || bool.booleanValue();
    }

    private ExecutorService createExecutorService(int i) {
        ThreadPoolExecutor threadPoolExecutor;
        synchronized (this) {
            LimitedQueue limitedQueue = new LimitedQueue(1000);
            threadPoolExecutor = new ThreadPoolExecutor(i, i, 60L, TimeUnit.SECONDS, limitedQueue, new LimitedQueue.LimitedQueueRejectedExecutionPolicy()) { // from class: org.awsutils.sqs.autoconfigure.SqsMessageListenerInitializer.1
                @Override // java.util.concurrent.ThreadPoolExecutor, java.util.concurrent.ExecutorService
                public void shutdown() {
                    SqsMessageListenerInitializer.LOGGER.info("Shutting down executor service");
                    super.shutdown();
                }
            };
            limitedQueue.setThreadPoolExecutor(threadPoolExecutor);
            this.executorServices.add(threadPoolExecutor);
        }
        return threadPoolExecutor;
    }

    @PreDestroy
    public void cleanUp() {
        this.executorServices.forEach(executorService -> {
            try {
                executorService.shutdown();
            } catch (Exception e) {
            }
        });
    }

    public static void validate(Object obj) {
        Field[] declaredFields = obj.getClass().getDeclaredFields();
        HashMap hashMap = new HashMap();
        List list = Arrays.stream(declaredFields).filter(field -> {
            return !Modifier.isStatic(field.getModifiers());
        }).map(field2 -> {
            return Tuple.of(field2, field2.getAnnotation(NotNull.class));
        }).filter(tuple2 -> {
            return tuple2._2() != null;
        }).map(tuple22 -> {
            return Tuple.of((NotNull) tuple22._2(), getFieldValue(obj, (Field) tuple22._1()), (Field) tuple22._1());
        }).filter(tuple3 -> {
            return tuple3._2() == null;
        }).map(tuple32 -> {
            return ((NotNull) tuple32._1).message();
        }).toList();
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        hashMap.put("message", "Following fields have not been populated");
        hashMap.put("fields", list);
        logErrorMessageToConsole(list);
        throw new ValidationException(Utils.constructJson(hashMap));
    }

    private static Object getFieldValue(Object obj, Field field) {
        try {
            field.setAccessible(true);
            return field.get(obj);
        } catch (IllegalAccessException e) {
            throw new RuntimeException(e);
        }
    }

    private static void logErrorMessageToConsole(List<String> list) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        System.err.println();
        System.err.println();
        System.err.println();
        System.err.println("####################### ALL REQUIRED PROPERTIES NOT POPULATED - Stopping Application #######################");
        System.err.println();
        System.err.println("Following fields not populated, Please add to configuration property/yaml file: ");
        list.forEach(str -> {
            System.err.println(atomicInteger.incrementAndGet() + ": " + str);
        });
        System.err.println();
        System.err.println("####################### ALL REQUIRED PROPERTIES NOT POPULATED - Stopping Application #######################");
        System.err.println();
        System.err.println();
        System.err.println();
    }
}
