package io.choerodon.asgard.schedule;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.choerodon.asgard.AsgardApplicationContextHelper;
import io.choerodon.asgard.InstanceCommonUtils;
import io.choerodon.asgard.UpdateTaskInstanceStatusDTO;
import io.choerodon.asgard.schedule.QuartzDefinition;
import io.choerodon.asgard.schedule.annotation.JobTask;
import io.choerodon.asgard.schedule.dto.ScheduleInstanceConsumerDTO;
import io.choerodon.asgard.schedule.feign.ScheduleMonitorClient;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.StringUtils;

/* loaded from: input_file:io/choerodon/asgard/schedule/ScheduleMonitor.class */
public class ScheduleMonitor {
    private static final Logger log = LoggerFactory.getLogger(ScheduleMonitor.class);
    private static final Map<String, JobTaskInvokeBean> invokeBeanMap = new HashMap();
    private final DataSourceTransactionManager transactionManager;
    private final Environment environment;
    private final Executor executor;
    private ScheduleMonitorClient scheduleMonitorClient;
    private final AsgardApplicationContextHelper applicationContextHelper;
    private final ScheduledExecutorService scheduledExecutorService;
    private final long pollIntervalMs;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private Set<ScheduleInstanceConsumerDTO> msgQueue = Collections.synchronizedSet(new LinkedHashSet(32));

    /* loaded from: input_file:io/choerodon/asgard/schedule/ScheduleMonitor$InvokeTask.class */
    private class InvokeTask implements Runnable {
        private final ScheduleInstanceConsumerDTO dto;
        private final JobTaskInvokeBean invokeBean;
        private final JobTask jobTask;

        InvokeTask(ScheduleInstanceConsumerDTO scheduleInstanceConsumerDTO) {
            this.dto = scheduleInstanceConsumerDTO;
            this.invokeBean = (JobTaskInvokeBean) ScheduleMonitor.invokeBeanMap.get(scheduleInstanceConsumerDTO.getMethod());
            this.jobTask = this.invokeBean.jobTask;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                invoke(this.dto);
            } catch (Exception e) {
                ScheduleMonitor.log.error("scheduleMonitor consume message error, cause {}", e);
            } finally {
                ScheduleMonitor.this.msgQueue.remove(this.dto);
            }
        }

        private void invoke(ScheduleInstanceConsumerDTO scheduleInstanceConsumerDTO) {
            DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
            defaultTransactionDefinition.setPropagationBehavior(0);
            defaultTransactionDefinition.setReadOnly(this.jobTask.transactionReadOnly());
            defaultTransactionDefinition.setIsolationLevel(this.jobTask.transactionIsolation().value());
            defaultTransactionDefinition.setTimeout(this.jobTask.transactionTimeout());
            String transactionManager = this.jobTask.transactionManager();
            DataSourceTransactionManager dataSourceTransactionManager = StringUtils.isEmpty(transactionManager) ? ScheduleMonitor.this.transactionManager : (PlatformTransactionManager) ScheduleMonitor.this.applicationContextHelper.getSpringFactory().getBean(transactionManager, PlatformTransactionManager.class);
            TransactionStatus transaction = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);
            try {
                this.invokeBean.method.setAccessible(true);
                Object invoke = this.invokeBean.method.invoke(this.invokeBean.object, getInputMap(scheduleInstanceConsumerDTO.getExecuteParams()));
                if (invoke != null) {
                    invoke = ScheduleMonitor.this.objectMapper.writeValueAsString(invoke);
                }
                ScheduleMonitor.this.scheduleMonitorClient.updateStatus(scheduleInstanceConsumerDTO.getId(), new UpdateTaskInstanceStatusDTO(scheduleInstanceConsumerDTO.getId(), QuartzDefinition.InstanceStatus.COMPLETED.name(), InstanceCommonUtils.resultToJson(invoke, ScheduleMonitor.this.objectMapper), null, scheduleInstanceConsumerDTO.getObjectVersionNumber()));
                dataSourceTransactionManager.commit(transaction);
            } catch (Exception e) {
                dataSourceTransactionManager.rollback(transaction);
                String errorInfoFromException = InstanceCommonUtils.getErrorInfoFromException(e);
                ScheduleMonitor.log.warn("scheduleMonitor invoke method error, transaction rollback, msg {}, cause {}", scheduleInstanceConsumerDTO, errorInfoFromException);
                ScheduleMonitor.this.scheduleMonitorClient.updateStatus(scheduleInstanceConsumerDTO.getId(), new UpdateTaskInstanceStatusDTO(scheduleInstanceConsumerDTO.getId(), QuartzDefinition.InstanceStatus.FAILED.name(), null, errorInfoFromException, scheduleInstanceConsumerDTO.getObjectVersionNumber()));
            }
        }

        private Map<String, Object> getInputMap(String str) throws IOException {
            return StringUtils.isEmpty(str) ? new HashMap() : (Map) ScheduleMonitor.this.objectMapper.readValue(str, new TypeReference<Map<String, Object>>() { // from class: io.choerodon.asgard.schedule.ScheduleMonitor.InvokeTask.1
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void addInvokeBean(String str, JobTaskInvokeBean jobTaskInvokeBean) {
        invokeBeanMap.put(str, jobTaskInvokeBean);
    }

    public ScheduleMonitor(DataSourceTransactionManager dataSourceTransactionManager, Environment environment, Executor executor, ScheduleMonitorClient scheduleMonitorClient, AsgardApplicationContextHelper asgardApplicationContextHelper, ScheduledExecutorService scheduledExecutorService, long j) {
        this.transactionManager = dataSourceTransactionManager;
        this.environment = environment;
        this.executor = executor;
        this.scheduleMonitorClient = scheduleMonitorClient;
        this.applicationContextHelper = asgardApplicationContextHelper;
        this.scheduledExecutorService = scheduledExecutorService;
        this.pollIntervalMs = j;
    }

    public void setScheduleMonitorClient(ScheduleMonitorClient scheduleMonitorClient) {
        this.scheduleMonitorClient = scheduleMonitorClient;
    }

    @PostConstruct
    public void start() {
        Set set = (Set) invokeBeanMap.entrySet().stream().map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet());
        try {
            String str = InetAddress.getLocalHost().getHostAddress() + ":" + this.environment.getProperty("server.port");
            log.info("scheduleMonitor prepare to start schedule consumer, methods {}, instance {}", set, str);
            this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
                invokeRunner(set, str);
            }, 20000L, this.pollIntervalMs, TimeUnit.MILLISECONDS);
        } catch (UnknownHostException e) {
            log.error("scheduleMonitor can't get localhost, failed to start schedule consumer. {}", e.getCause());
        }
    }

    public void invokeRunner(Set<String> set, String str) {
        if (!this.msgQueue.isEmpty()) {
            log.debug("scheduleMonitor skip poll, dbRecordNotEmpty {}, msgQueue {}", this.msgQueue);
            return;
        }
        try {
            List<ScheduleInstanceConsumerDTO> pollBatch = this.scheduleMonitorClient.pollBatch(set, str);
            log.debug("scheduleMonitor polled messages, size {} data {}", Integer.valueOf(pollBatch.size()), pollBatch);
            this.msgQueue.addAll(pollBatch);
            this.msgQueue.forEach(scheduleInstanceConsumerDTO -> {
                this.executor.execute(new InvokeTask(scheduleInstanceConsumerDTO));
            });
        } catch (Exception e) {
            log.warn("scheduleMonitor poll error {}", e.getMessage());
        }
    }
}
