package xyz.mytang0.brook.core.monitor;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xyz.mytang0.brook.common.extension.ExtensionDirector;
import xyz.mytang0.brook.common.extension.ExtensionLoader;
import xyz.mytang0.brook.common.utils.ExceptionUtils;
import xyz.mytang0.brook.common.utils.TimeUtils;
import xyz.mytang0.brook.core.FlowExecutor;
import xyz.mytang0.brook.core.exception.FlowException;
import xyz.mytang0.brook.core.lock.FlowLockFacade;
import xyz.mytang0.brook.core.utils.QueueUtils;
import xyz.mytang0.brook.core.utils.ThreadUtils;
import xyz.mytang0.brook.spi.executor.ExecutorFactory;
import xyz.mytang0.brook.spi.queue.QueueService;

/* loaded from: input_file:xyz/mytang0/brook/core/monitor/DelayedTaskMonitor.class */
public class DelayedTaskMonitor {
    private static final Logger log = LoggerFactory.getLogger(DelayedTaskMonitor.class);
    private static final Map<QueueService, DelayedTaskMonitor> monitors = new ConcurrentHashMap();
    private final QueueService queueService;
    private final FlowExecutor<?> flowExecutor;
    private final FlowLockFacade flowLockFacade;
    private final DelayedTaskMonitorProperties monitorProperties;
    private final ScheduledExecutorService unackedExecutor;
    private final ScheduledExecutorService pollingExecutor;
    private volatile boolean inited = false;
    private final String queueName = QueueUtils.getTaskDelayQueueName();
    private final ExecutorService taskExecutor = ((ExecutorFactory) ExtensionDirector.getExtensionLoader(ExecutorFactory.class).getDefaultExtension()).getExecutor(this.queueName);

    public static void init(FlowExecutor<?> flowExecutor, FlowLockFacade flowLockFacade, DelayedTaskMonitorProperties delayedTaskMonitorProperties) {
        if (CollectionUtils.isNotEmpty(delayedTaskMonitorProperties.getQueueProtocols())) {
            ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(QueueService.class);
            new Thread(() -> {
                delayedTaskMonitorProperties.getQueueProtocols().forEach(str -> {
                    if (StringUtils.isNotBlank(str)) {
                        while (!extensionLoader.getSupportedExtensions().contains(str)) {
                            TimeUtils.sleepUninterruptedly(5L, TimeUnit.SECONDS);
                        }
                        try {
                            init((QueueService) ExtensionLoader.getExtension(QueueService.class, str), flowExecutor, flowLockFacade, delayedTaskMonitorProperties);
                        } catch (Exception e) {
                            log.error(String.format("Init %s delayed task monitor exception", str), e);
                        }
                    }
                });
            }).start();
        }
    }

    public static void init(QueueService queueService, FlowExecutor<?> flowExecutor, FlowLockFacade flowLockFacade, DelayedTaskMonitorProperties delayedTaskMonitorProperties) {
        if (monitors.containsKey(queueService)) {
            return;
        }
        synchronized (queueService) {
            if (!monitors.containsKey(queueService)) {
                monitors.computeIfAbsent(queueService, queueService2 -> {
                    return new DelayedTaskMonitor(queueService, flowExecutor, flowLockFacade, delayedTaskMonitorProperties);
                });
            }
        }
    }

    public DelayedTaskMonitor(QueueService queueService, FlowExecutor<?> flowExecutor, FlowLockFacade flowLockFacade, DelayedTaskMonitorProperties delayedTaskMonitorProperties) {
        this.queueService = queueService;
        this.flowExecutor = flowExecutor;
        this.flowLockFacade = flowLockFacade;
        this.monitorProperties = delayedTaskMonitorProperties;
        this.unackedExecutor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.threadsNamed(queueService.getClass().getSimpleName() + "-delayed-task-unacked-%d"));
        this.pollingExecutor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.threadsNamed(queueService.getClass().getSimpleName() + "-delayed-task-polling-%d"));
        initialize();
        log.info("Started queueName:{} queueService:{} delayed task monitor.", this.queueName, queueService);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                this.pollingExecutor.shutdown();
                this.unackedExecutor.shutdown();
            } catch (Exception e) {
            }
        }));
    }

    private void initialize() {
        if (this.inited) {
            return;
        }
        this.inited = true;
        Runnable runnable = () -> {
            try {
                this.queueService.unacked(this.queueName, TimeUtils.currentTimeMillis() - this.monitorProperties.getUnackedTimeoutMs());
            } catch (Throwable th) {
                log.error("Unacked exception", th);
            }
        };
        Runnable runnable2 = new Runnable() { // from class: xyz.mytang0.brook.core.monitor.DelayedTaskMonitor.1
            @Override // java.lang.Runnable
            public void run() {
                long j = 0;
                boolean z = false;
                try {
                    try {
                        if (!DelayedTaskMonitor.this.flowLockFacade.acquireLock(DelayedTaskMonitor.class.getName(), DelayedTaskMonitor.this.monitorProperties.getPollTimeoutMs())) {
                            if (0 != 0) {
                                try {
                                    DelayedTaskMonitor.this.flowLockFacade.releaseLock(DelayedTaskMonitor.class.getName());
                                } catch (Exception e) {
                                }
                            }
                            DelayedTaskMonitor.this.pollingExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
                            return;
                        }
                        z = true;
                        List poll = DelayedTaskMonitor.this.queueService.poll(DelayedTaskMonitor.this.queueName, DelayedTaskMonitor.this.monitorProperties.getPollPerMaxSize(), DelayedTaskMonitor.this.monitorProperties.getPollTimeoutMs(), TimeUnit.MILLISECONDS);
                        if (CollectionUtils.isNotEmpty(poll)) {
                            poll.forEach(queueMessage -> {
                                if (queueMessage == null) {
                                    return;
                                }
                                try {
                                    DelayedTaskMonitor.this.taskExecutor.execute(() -> {
                                        try {
                                            DelayedTaskMonitor.log.info("Execute delayed task: {}", queueMessage.getId());
                                            String[] split = StringUtils.split(queueMessage.getId(), "@");
                                            if (split == null || split.length != 2) {
                                                DelayedTaskMonitor.log.warn("Invalid delayed task: {}", queueMessage.getId());
                                            } else {
                                                DelayedTaskMonitor.this.flowExecutor.executeTask(split[0]);
                                            }
                                            DelayedTaskMonitor.log.info("Execute delayed task: {} completion", queueMessage.getId());
                                            DelayedTaskMonitor.this.queueService.ack(DelayedTaskMonitor.this.queueName, queueMessage.getId());
                                        } catch (Throwable th) {
                                            if (th instanceof FlowException) {
                                                DelayedTaskMonitor.log.error("Execute delayed task:({}) fail, reason:({}), postpone retry", queueMessage.getId(), ExceptionUtils.getMessage(th));
                                            } else {
                                                DelayedTaskMonitor.log.error("Execute delayed task:({}) exception, postpone retry", queueMessage.getId(), th);
                                            }
                                            DelayedTaskMonitor.this.queueService.postpone(DelayedTaskMonitor.this.queueName, queueMessage);
                                        }
                                    });
                                } catch (Throwable th) {
                                    DelayedTaskMonitor.log.error("Submit delayed task:({}) to executor exception", queueMessage.getId(), th);
                                }
                            });
                        }
                        if (1 != 0) {
                            try {
                                DelayedTaskMonitor.this.flowLockFacade.releaseLock(DelayedTaskMonitor.class.getName());
                            } catch (Exception e2) {
                            }
                        }
                        DelayedTaskMonitor.this.pollingExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
                    } catch (Throwable th) {
                        j = DelayedTaskMonitor.this.monitorProperties.getExceptionRetryIntervalMs();
                        DelayedTaskMonitor.log.warn("Poll exception, retry after {} milliseconds", Long.valueOf(j), th);
                        if (z) {
                            try {
                                DelayedTaskMonitor.this.flowLockFacade.releaseLock(DelayedTaskMonitor.class.getName());
                            } catch (Exception e3) {
                            }
                        }
                        DelayedTaskMonitor.this.pollingExecutor.schedule(this, j, TimeUnit.MILLISECONDS);
                    }
                } catch (Throwable th2) {
                    if (z) {
                        try {
                            DelayedTaskMonitor.this.flowLockFacade.releaseLock(DelayedTaskMonitor.class.getName());
                        } catch (Exception e4) {
                        }
                    }
                    DelayedTaskMonitor.this.pollingExecutor.schedule(this, j, TimeUnit.MILLISECONDS);
                    throw th2;
                }
            }
        };
        this.unackedExecutor.scheduleAtFixedRate(runnable, 0L, this.monitorProperties.getUnackedIntervalMs(), TimeUnit.MILLISECONDS);
        this.pollingExecutor.schedule(runnable2, 0L, TimeUnit.MILLISECONDS);
    }
}
