package org.tiogasolutions.notify.kernel.task;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Named;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.tiogasolutions.dev.common.exceptions.ApiConflictException;
import org.tiogasolutions.dev.common.exceptions.ApiNotFoundException;
import org.tiogasolutions.dev.domain.query.ListQueryResult;
import org.tiogasolutions.dev.domain.query.QueryResult;
import org.tiogasolutions.notify.kernel.domain.DomainKernel;
import org.tiogasolutions.notify.kernel.event.EventBus;
import org.tiogasolutions.notify.kernel.event.TaskEventListener;
import org.tiogasolutions.notify.kernel.notification.NotificationDomain;
import org.tiogasolutions.notify.pub.domain.DomainProfile;
import org.tiogasolutions.notify.pub.notification.Notification;
import org.tiogasolutions.notify.pub.task.TaskQuery;
import org.tiogasolutions.notify.pub.task.TaskResponse;
import org.tiogasolutions.notify.pub.task.TaskStatus;

@Named
/* loaded from: input_file:org/tiogasolutions/notify/kernel/task/TaskProcessorExecutor.class */
public class TaskProcessorExecutor implements BeanFactoryAware, TaskEventListener {
    private static final String NAME = TaskProcessorExecutor.class.getSimpleName();
    private static final Logger log = LoggerFactory.getLogger(TaskProcessorExecutor.class);
    private final DomainKernel domainKernel;
    private final Map<TaskProcessorType, TaskProcessor> processorMap = new HashMap();
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private ScheduledFuture executorFuture = null;
    private TaskProcessorExecutorStatus executorStatus = TaskProcessorExecutorStatus.STOPPED;
    private final ExecutorService threadPoolExecutor = Executors.newCachedThreadPool();

    @Inject
    public TaskProcessorExecutor(DomainKernel domainKernel, EventBus eventBus) {
        this.domainKernel = domainKernel;
        ServiceLoader load = ServiceLoader.load(TaskProcessor.class);
        load.reload();
        Iterator it = load.iterator();
        while (it.hasNext()) {
            TaskProcessor taskProcessor = (TaskProcessor) it.next();
            TaskProcessorType type = taskProcessor.getType();
            if (this.processorMap.containsKey(type)) {
                throw new IllegalArgumentException(String.format("The processor type \"%s\" has already been registered.", type));
            }
            this.processorMap.put(type, taskProcessor);
        }
        eventBus.subscribe(this);
    }

    public TaskProcessorExecutorStatus getExecutorStatus() {
        return this.executorStatus;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        Iterator<TaskProcessor> it = this.processorMap.values().iterator();
        while (it.hasNext()) {
            it.next().init(beanFactory);
        }
    }

    public synchronized void start() {
        if (this.executorFuture != null) {
            throw new IllegalStateException(NAME + " is already started.");
        }
        this.executorFuture = this.scheduledExecutorService.scheduleWithFixedDelay(this::execute, 15L, 60L, TimeUnit.SECONDS);
        this.executorStatus = TaskProcessorExecutorStatus.IDLE;
        log.info(NAME + " started, now idle.");
    }

    public synchronized void stop() {
        if (this.executorFuture != null) {
            this.executorFuture.cancel(false);
            this.executorFuture = null;
        }
        this.executorStatus = TaskProcessorExecutorStatus.STOPPED;
        log.info(NAME + " stopped.");
    }

    @PreDestroy
    private void shutdown() {
        stop();
        this.scheduledExecutorService.shutdown();
    }

    @Override // org.tiogasolutions.notify.kernel.event.TaskEventListener
    public void taskCreated(String str, TaskEntity taskEntity, Notification notification) {
        try {
            NotificationDomain notificationDomain = this.domainKernel.notificationDomain(str);
            HashMap hashMap = new HashMap();
            hashMap.put(taskEntity.getDestination().getProvider(), Arrays.asList(taskEntity));
            processTasksByProvider(notificationDomain, hashMap);
        } catch (Exception e) {
            log.error("Unexpected exception during processing.", e);
        }
    }

    public void execute() {
        if (!this.running.compareAndSet(false, true)) {
            log.debug(NAME + " already running.");
            return;
        }
        log.debug(NAME + "is executing.");
        this.executorStatus = TaskProcessorExecutorStatus.EXECUTING;
        try {
            this.domainKernel.listActiveNotificationDomains().stream().forEach(this::processDomain);
        } catch (Exception e) {
            log.error("Unexpected exception during processing.", e);
        } finally {
            this.executorStatus = TaskProcessorExecutorStatus.IDLE;
            log.debug(NAME + " finished, now idle.");
            this.running.set(false);
        }
    }

    private void processDomain(NotificationDomain notificationDomain) {
        log.debug("Processing all tasks for domain {}.", notificationDomain.getDomainName());
        ListQueryResult<TaskEntity> query = notificationDomain.query(new TaskQuery().setTaskStatus(TaskStatus.PENDING));
        if (query.isNotEmpty()) {
            processTasksByProvider(notificationDomain, mapTasksByProvider(query));
        }
    }

    private void processTasksByProvider(NotificationDomain notificationDomain, Map<String, List<TaskEntity>> map) {
        for (Map.Entry<String, List<TaskEntity>> entry : map.entrySet()) {
            TaskProcessor findTaskProcessor = findTaskProcessor(entry.getKey());
            if (findTaskProcessor == null) {
                log.error("A processor was not found for {}, skipping {} tasks.", entry.getKey(), Integer.valueOf(entry.getValue().size()));
                return;
            }
            processTaskForProvider(notificationDomain, findTaskProcessor, entry.getValue());
        }
    }

    private TaskProcessor findTaskProcessor(String str) {
        return this.processorMap.get(TaskProcessorType.valueOf(str));
    }

    private Map<String, List<TaskEntity>> mapTasksByProvider(QueryResult<TaskEntity> queryResult) {
        HashMap hashMap = new HashMap();
        for (TaskEntity taskEntity : queryResult) {
            try {
                String provider = taskEntity.getDestination().getProvider();
                if (!hashMap.containsKey(provider)) {
                    hashMap.put(provider, new ArrayList());
                }
                ((List) hashMap.get(provider)).add(taskEntity);
            } catch (NullPointerException e) {
                log.error("Weird bug", e);
            }
        }
        return hashMap;
    }

    private void processTaskForProvider(NotificationDomain notificationDomain, TaskProcessor taskProcessor, List<TaskEntity> list) {
        log.debug("Processing {} provider's tasks for domain {}.", taskProcessor.getType(), notificationDomain.getDomainName());
        if (!taskProcessor.isReady()) {
            log.warn("The {} provider is not ready to process tasks for the domain {}.", taskProcessor.getType(), notificationDomain.getDomainName());
            return;
        }
        for (TaskEntity taskEntity : list) {
            processTask(notificationDomain, taskProcessor, taskEntity, notificationDomain.findNotificationById(taskEntity.getNotificationId()).toNotification());
        }
    }

    private void processTask(NotificationDomain notificationDomain, TaskProcessor taskProcessor, TaskEntity taskEntity, Notification notification) {
        this.threadPoolExecutor.submit(() -> {
            String domainName = notificationDomain.getDomainName();
            TaskEntity taskEntity2 = taskEntity;
            try {
                taskEntity2.sending();
                taskEntity2 = notificationDomain.saveAndReload(taskEntity2);
                String str = "n/a";
                try {
                    str = taskProcessor.getType().getCode();
                    DomainProfile orCreateDomain = this.domainKernel.getOrCreateDomain(domainName);
                    log.debug("Begin processing task for domain {} with processor {}: {}", new Object[]{domainName, str, taskEntity2.getLabel()});
                    TaskResponse processTask = taskProcessor.processTask(orCreateDomain, notification, taskEntity2.toTask());
                    taskEntity2.response(processTask);
                    notificationDomain.save(taskEntity2);
                    log.debug("Finished processing task for domain {} and processor {} with response action {}: {}", new Object[]{domainName, str, processTask.getResponseAction(), taskEntity2.getLabel()});
                    return null;
                } catch (Exception e) {
                    taskEntity2.response(TaskResponse.fail("Exception thrown from task processor", e));
                    notificationDomain.save(taskEntity2);
                    log.error("Exception processing task for domain {} and processor {}: {}", new Object[]{domainName, str, taskEntity2.getLabel(), e});
                    return null;
                }
            } catch (ApiNotFoundException | ApiConflictException e2) {
                taskEntity2.pending();
                log.info(e2 instanceof ApiNotFoundException ? String.format("Cannot find task for domain %s: %s", domainName, taskEntity2.getLabel()) : String.format("DB conflict processing task for domain %s, (already processed?): %s", domainName, taskEntity2.getLabel()));
                return null;
            } catch (Exception e3) {
                log.error("Exception setting task to sending", e3);
                return null;
            }
        });
    }
}
