package com.rivigo.finance.service.scheduleTask.impl;

import com.rivigo.finance.concurrent.ContextAwareExecutorFactory;
import com.rivigo.finance.constants.Constant;
import com.rivigo.finance.context.UserContext;
import com.rivigo.finance.entity.mongo.ScheduleTask;
import com.rivigo.finance.service.scheduleTask.ScheduleTaskHandler;
import com.rivigo.finance.service.scheduleTask.ScheduleTaskService;
import com.rivigo.finance.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.Metadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:BOOT-INF/lib/rivigo-finance-web-2.3.60.jar:com/rivigo/finance/service/scheduleTask/impl/ScheduleTaskExecutorImpl.class */
public class ScheduleTaskExecutorImpl {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ScheduleTaskExecutorImpl.class);
    private static final int ALLOWED_BATCH_SIZE = 10;
    private ScheduleTaskService scheduleTaskService;
    private ApplicationContext applicationContext;
    private ContextAwareExecutorFactory executorFactory;

    @Autowired
    public ScheduleTaskExecutorImpl(ScheduleTaskService scheduleTaskService, ApplicationContext applicationContext, ContextAwareExecutorFactory contextAwareExecutorFactory) {
        this.scheduleTaskService = scheduleTaskService;
        this.applicationContext = applicationContext;
        this.executorFactory = contextAwareExecutorFactory;
    }

    @PostConstruct
    public void cleanupRunningTasks() {
        this.scheduleTaskService.cleanupRunningTasks();
    }

    @Scheduled(fixedDelay = Metadata.TOPIC_EXPIRY_MS)
    public void executeScheduleTasks() {
        String uuid = UUID.randomUUID().toString();
        log.info("{} Executing scheduled tasks....", uuid);
        List<ScheduleTask> andUpdateRunnableScheduleTasks = this.scheduleTaskService.getAndUpdateRunnableScheduleTasks();
        if (CollectionUtils.isEmpty(andUpdateRunnableScheduleTasks)) {
            log.info("{} No scheduled tasks to execute", uuid);
            return;
        }
        log.info("{} Marked {} tasks as running, will execute them in batch of {}", uuid, Integer.valueOf(andUpdateRunnableScheduleTasks.size()), 10);
        HashMap hashMap = new HashMap();
        int i = 0;
        int i2 = 1;
        for (ScheduleTask scheduleTask : andUpdateRunnableScheduleTasks) {
            log.info("{} Executing task {} with handler {}", uuid, scheduleTask.getTaskId(), scheduleTask.getHandlerClass());
            ScheduleTaskHandler scheduleTaskHandler = (ScheduleTaskHandler) this.applicationContext.getBean(scheduleTask.getHandlerClass());
            HashMap hashMap2 = new HashMap();
            UserContext current = UserContext.current();
            current.setUsername(scheduleTask.getCreatedBy());
            hashMap2.put("payload", scheduleTask.getPayload());
            hashMap2.put(Constant.SCHEDULE_TASK_MILESTONE_ATTRIBUTE_KEY, scheduleTask.getServiceMilestone());
            hashMap2.put(Constant.SCHEDULE_TASK_ID_ATTRIBUTE_KEY, scheduleTask.getTaskId());
            hashMap2.put(Constant.SCHEDULE_TASK_CREATED_AT_ATTRIBUTE_KEY, scheduleTask.getCreatedTimestamp().toString());
            current.setAttributes(hashMap2);
            hashMap.put(scheduleTask.getTaskId(), this.executorFactory.getExecutor(scheduleTask.getHandlerClass()).submit(scheduleTaskHandler));
            i++;
            if (i == 10) {
                log.info("{} Tasks of batch {} are all scheduled. Will join all these threads to fetch the result before starting batch {}", uuid, Integer.valueOf(i2), Integer.valueOf(i2 + 1));
                i = 0;
                hashMap.forEach((str, future) -> {
                    fetchResult(str, future, uuid);
                });
                hashMap = new HashMap();
                log.info("{} Batch {} completed. Starting batch {}", uuid, Integer.valueOf(i2), Integer.valueOf(i2 + 1));
                i2++;
            }
        }
        hashMap.forEach((str2, future2) -> {
            fetchResult(str2, future2, uuid);
        });
    }

    @Scheduled(fixedDelay = Metadata.TOPIC_EXPIRY_MS)
    public void executeScheduleTasksInIsolation() {
        String uuid = UUID.randomUUID().toString();
        log.info("{} Executing scheduled tasks....", uuid);
        List<ScheduleTask> runInIsolationTasks = this.scheduleTaskService.getRunInIsolationTasks();
        if (CollectionUtils.isEmpty(runInIsolationTasks)) {
            log.info("{} No scheduled tasks to execute in isolation", uuid);
            return;
        }
        HashMap hashMap = new HashMap();
        ArrayList<List> arrayList = new ArrayList(((Map) runInIsolationTasks.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getHandlerClass();
        }))).values());
        int size = ((List) arrayList.get(0)).size();
        for (List list : arrayList) {
            if (list.size() > size) {
                size = list.size();
            }
        }
        log.info("{} Marked {} tasks as running, will execute them in isolation", uuid, Integer.valueOf(runInIsolationTasks.size()));
        for (int i = 0; i < size; i++) {
            int i2 = 0;
            int i3 = 1;
            for (List list2 : arrayList) {
                if (list2.size() > i) {
                    ScheduleTask scheduleTask = (ScheduleTask) list2.get(i);
                    log.info("{} Executing task {} with handler {}", uuid, scheduleTask.getTaskId(), scheduleTask.getHandlerClass());
                    ScheduleTaskHandler scheduleTaskHandler = (ScheduleTaskHandler) this.applicationContext.getBean(scheduleTask.getHandlerClass());
                    HashMap hashMap2 = new HashMap();
                    UserContext current = UserContext.current();
                    current.setUsername(scheduleTask.getCreatedBy());
                    hashMap2.put("payload", scheduleTask.getPayload());
                    hashMap2.put(Constant.SCHEDULE_TASK_MILESTONE_ATTRIBUTE_KEY, scheduleTask.getServiceMilestone());
                    hashMap2.put(Constant.SCHEDULE_TASK_ID_ATTRIBUTE_KEY, scheduleTask.getTaskId());
                    hashMap2.put(Constant.SCHEDULE_TASK_CREATED_AT_ATTRIBUTE_KEY, scheduleTask.getCreatedTimestamp().toString());
                    current.setAttributes(hashMap2);
                    i2++;
                    hashMap.put(scheduleTask.getTaskId(), this.executorFactory.getExecutor(scheduleTask.getHandlerClass()).submit(scheduleTaskHandler));
                }
                if (i2 == 10) {
                    log.info("{} Tasks of batch {} are all scheduled. Will join all these threads to fetch the result before starting batch {}", uuid, Integer.valueOf(i3), Integer.valueOf(i3 + 1));
                    i2 = 0;
                    hashMap.forEach((str, future) -> {
                        fetchResult(str, future, uuid);
                    });
                    hashMap = new HashMap();
                    log.info("{} Batch {} completed. Starting batch {}", uuid, Integer.valueOf(i3), Integer.valueOf(i3 + 1));
                    i3++;
                }
            }
            if (i2 < 10 && i2 > 0) {
                log.info("{} Tasks of batch {} are all scheduled. Will join all these threads to fetch the result before starting batch {}", uuid, Integer.valueOf(i3), Integer.valueOf(i3 + 1));
                hashMap.forEach((str2, future2) -> {
                    fetchResult(str2, future2, uuid);
                });
                hashMap = new HashMap();
                log.info("{} Batch {} completed. Starting batch {}", uuid, Integer.valueOf(i3), Integer.valueOf(i3 + 1));
            }
        }
    }

    private void fetchResult(String str, Future<?> future, String str2) {
        try {
            log.info("{} Fetching result for task - {}", str2, str);
            future.get();
            this.scheduleTaskService.markTaskCompleted(str);
            log.info("{} Successfully completed task - {}", str2, str);
        } catch (Exception e) {
            log.warn("{} Exception occoured while executing schedule task {}. Error - {}", str2, str, ExceptionUtils.getStackTrace(e));
            this.scheduleTaskService.markAsFailed(str, e.getMessage());
        }
    }
}
