package io.choerodon.asgard.saga;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.choerodon.asgard.AsgardApplicationContextHelper;
import io.choerodon.asgard.InstanceCommonUtils;
import io.choerodon.asgard.UpdateTaskInstanceStatusDTO;
import io.choerodon.asgard.saga.SagaDefinition;
import io.choerodon.asgard.saga.annotation.SagaTask;
import io.choerodon.asgard.saga.dto.PollBatchDTO;
import io.choerodon.asgard.saga.dto.PollCodeDTO;
import io.choerodon.asgard.saga.dto.SagaTaskInstanceDTO;
import io.choerodon.asgard.saga.feign.SagaMonitorClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.StringUtils;

/* loaded from: input_file:io/choerodon/asgard/saga/SagaMonitor.class */
public class SagaMonitor {
    private final ChoerodonSagaProperties choerodonSagaProperties;
    private SagaMonitorClient sagaMonitorClient;
    private final Executor executor;
    private final DataSourceTransactionManager transactionManager;
    private final Environment environment;
    private ScheduledExecutorService scheduledExecutorService;
    private Set<SagaTaskInstanceDTO> msgQueue;
    private final AsgardApplicationContextHelper asgardApplicationContextHelper;
    private final SagaTaskInstanceStore taskInstanceStore;
    private static final Logger LOGGER = LoggerFactory.getLogger(SagaMonitor.class);
    static final Map<String, SagaTaskInvokeBean> invokeBeanMap = new HashMap();
    private static Boolean enabledDbRecord = false;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private Set<Long> records = Collections.synchronizedSet(new LinkedHashSet());

    /* loaded from: input_file:io/choerodon/asgard/saga/SagaMonitor$InvokeTask.class */
    private class InvokeTask implements Runnable {
        private final SagaTaskInstanceDTO dto;
        private final SagaTaskInvokeBean invokeBean;
        private final SagaTask sagaTask;

        InvokeTask(SagaTaskInstanceDTO sagaTaskInstanceDTO) {
            this.dto = sagaTaskInstanceDTO;
            this.invokeBean = SagaMonitor.invokeBeanMap.get(sagaTaskInstanceDTO.getSagaCode() + sagaTaskInstanceDTO.getTaskCode());
            this.sagaTask = this.invokeBean.sagaTask;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                invoke(this.dto);
            } catch (Exception e) {
                SagaMonitor.LOGGER.error("sagaMonitor consume message error, cause {}", e);
            } finally {
                SagaMonitor.this.msgQueue.remove(this.dto);
            }
        }

        private void invoke(SagaTaskInstanceDTO sagaTaskInstanceDTO) {
            if (this.sagaTask.enabledDbRecord()) {
                SagaMonitor.this.taskInstanceStore.storeTaskInstance(sagaTaskInstanceDTO.getId());
            }
            DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
            defaultTransactionDefinition.setPropagationBehavior(0);
            defaultTransactionDefinition.setReadOnly(this.sagaTask.transactionReadOnly());
            defaultTransactionDefinition.setIsolationLevel(this.sagaTask.transactionIsolation().value());
            defaultTransactionDefinition.setTimeout(this.sagaTask.transactionTimeout());
            String transactionManager = this.sagaTask.transactionManager();
            DataSourceTransactionManager dataSourceTransactionManager = StringUtils.isEmpty(transactionManager) ? SagaMonitor.this.transactionManager : (PlatformTransactionManager) SagaMonitor.this.asgardApplicationContextHelper.getSpringFactory().getBean(transactionManager, PlatformTransactionManager.class);
            TransactionStatus transaction = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);
            try {
                this.invokeBean.method.setAccessible(true);
                SagaMonitor.this.sagaMonitorClient.updateStatus(sagaTaskInstanceDTO.getId(), new UpdateTaskInstanceStatusDTO(sagaTaskInstanceDTO.getId(), SagaDefinition.TaskInstanceStatus.COMPLETED.name(), InstanceCommonUtils.resultToJson(this.invokeBean.method.invoke(this.invokeBean.object, sagaTaskInstanceDTO.getInput()), SagaMonitor.this.objectMapper), null, null));
                if (this.sagaTask.enabledDbRecord()) {
                    SagaMonitor.this.taskInstanceStore.removeTaskInstance(sagaTaskInstanceDTO.getId());
                }
                dataSourceTransactionManager.commit(transaction);
            } catch (Exception e) {
                dataSourceTransactionManager.rollback(transaction);
                String errorInfoFromException = InstanceCommonUtils.getErrorInfoFromException(e);
                SagaMonitor.LOGGER.warn("sagaMonitor invoke method error, transaction rollback, msg {}, cause {}", sagaTaskInstanceDTO, errorInfoFromException);
                SagaMonitor.this.sagaMonitorClient.updateStatus(sagaTaskInstanceDTO.getId(), new UpdateTaskInstanceStatusDTO(sagaTaskInstanceDTO.getId(), SagaDefinition.TaskInstanceStatus.FAILED.name(), null, errorInfoFromException, null));
                if (this.sagaTask.enabledDbRecord()) {
                    SagaMonitor.this.taskInstanceStore.removeTaskInstance(sagaTaskInstanceDTO.getId());
                }
            }
        }
    }

    /* loaded from: input_file:io/choerodon/asgard/saga/SagaMonitor$UpdateStatusFailedTask.class */
    private class UpdateStatusFailedTask implements Runnable {
        private final long taskInstanceId;

        UpdateStatusFailedTask(long j) {
            this.taskInstanceId = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                SagaMonitor.this.sagaMonitorClient.updateStatus(Long.valueOf(this.taskInstanceId), new UpdateTaskInstanceStatusDTO(Long.valueOf(this.taskInstanceId), SagaDefinition.TaskInstanceStatus.FAILED.name(), null, "error.SagaMonitor.updateStatusFailed", null));
                SagaMonitor.this.taskInstanceStore.removeTaskInstance(Long.valueOf(this.taskInstanceId));
            } catch (Exception e) {
                SagaMonitor.LOGGER.warn("error.SagaMonitor.updateStatusFailed.reRun, {}", e);
            } finally {
                SagaMonitor.this.records.remove(Long.valueOf(this.taskInstanceId));
            }
        }
    }

    public SagaMonitor(ChoerodonSagaProperties choerodonSagaProperties, SagaMonitorClient sagaMonitorClient, Executor executor, DataSourceTransactionManager dataSourceTransactionManager, Environment environment, SagaTaskInstanceStore sagaTaskInstanceStore, AsgardApplicationContextHelper asgardApplicationContextHelper) {
        this.choerodonSagaProperties = choerodonSagaProperties;
        this.sagaMonitorClient = sagaMonitorClient;
        this.executor = executor;
        this.transactionManager = dataSourceTransactionManager;
        this.environment = environment;
        this.asgardApplicationContextHelper = asgardApplicationContextHelper;
        this.taskInstanceStore = sagaTaskInstanceStore;
        this.msgQueue = Collections.synchronizedSet(new LinkedHashSet(choerodonSagaProperties.getMaxPollSize().intValue()));
    }

    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = scheduledExecutorService;
    }

    public void setSagaMonitorClient(SagaMonitorClient sagaMonitorClient) {
        this.sagaMonitorClient = sagaMonitorClient;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void setEnabledDbRecordTrue() {
        enabledDbRecord = true;
    }

    @PostConstruct
    private void start() {
        List list = (List) invokeBeanMap.entrySet().stream().map(entry -> {
            return new PollCodeDTO(((SagaTaskInvokeBean) entry.getValue()).sagaTask.sagaCode(), ((SagaTaskInvokeBean) entry.getValue()).sagaTask.code());
        }).collect(Collectors.toList());
        int intValue = this.choerodonSagaProperties.getMaxPollSize().intValue();
        try {
            String str = InetAddress.getLocalHost().getHostAddress() + ":" + this.environment.getProperty("server.port");
            PollBatchDTO pollBatchDTO = new PollBatchDTO(str, list, Integer.valueOf(intValue));
            LOGGER.info("sagaMonitor prepare to start saga consumer, pollTasks {}, instance {}, maxPollSize {}, ", new Object[]{list, str, Integer.valueOf(intValue)});
            this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
                invokeRunner(pollBatchDTO);
            }, 20000L, this.choerodonSagaProperties.getPollIntervalMs().longValue(), TimeUnit.MILLISECONDS);
        } catch (UnknownHostException e) {
            LOGGER.error("sagaMonitor can't get localhost, failed to start saga consumer. {}", e.getCause());
        }
    }

    public void invokeRunner(PollBatchDTO pollBatchDTO) {
        boolean noNeedUpdateSagaStatus = noNeedUpdateSagaStatus();
        if (!noNeedUpdateSagaStatus || !this.msgQueue.isEmpty()) {
            LOGGER.debug("sagaMonitor skip poll, dbRecordNotEmpty {}, msgQueue {}", Boolean.valueOf(noNeedUpdateSagaStatus), this.msgQueue);
            return;
        }
        try {
            List<SagaTaskInstanceDTO> pollBatch = this.sagaMonitorClient.pollBatch(pollBatchDTO);
            LOGGER.debug("sagaMonitor polled messages, size {} data {}", Integer.valueOf(pollBatch.size()), pollBatch);
            this.msgQueue.addAll(pollBatch);
            this.msgQueue.forEach(sagaTaskInstanceDTO -> {
                this.executor.execute(new InvokeTask(sagaTaskInstanceDTO));
            });
        } catch (Exception e) {
            LOGGER.warn("sagaMonitor poll error {}", e.getMessage());
        }
    }

    private boolean noNeedUpdateSagaStatus() {
        if (!enabledDbRecord.booleanValue()) {
            return true;
        }
        if (!this.records.isEmpty()) {
            return false;
        }
        if (!this.msgQueue.isEmpty()) {
            return true;
        }
        this.records.addAll(this.taskInstanceStore.selectOvertimeTaskInstance());
        if (this.records.isEmpty()) {
            return true;
        }
        this.records.forEach(l -> {
            this.executor.execute(new UpdateStatusFailedTask(l.longValue()));
        });
        return false;
    }
}
