package com.swak.cache.queue;

import com.google.common.collect.Maps;
import com.swak.cache.spi.DelayedConfigurer;
import com.swak.cache.spi.DelayedQueueManager;
import com.swak.common.exception.ThrowableWrapper;
import com.swak.common.timer.CycleTask;
import com.swak.common.timer.WheelTimerHolder;
import java.lang.Thread;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/swak/cache/queue/RedissonDelayedQueueManager.class */
public class RedissonDelayedQueueManager implements DelayedQueueManager {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RedissonDelayedQueueManager.class);
    private final RedissonClient redissonClient;
    private volatile Map<String, DelayedQueueHandler> delayedQueueHandlers = Maps.newConcurrentMap();
    private volatile boolean threadToStop = false;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final Map<String, Thread> subscribeThreads = Maps.newConcurrentMap();
    private final Map<String, Long> delayedSpin = Maps.newConcurrentMap();
    private final Executor executor = initializeExecutor(DelayedConfigurer.getDelayedConfigurer());

    /* loaded from: input_file:com/swak/cache/queue/RedissonDelayedQueueManager$SpinCycle.class */
    protected class SpinCycle extends CycleTask {
        private final long spinTime;

        public SpinCycle(long j, long j2) {
            super.config(Long.valueOf(j), TimeUnit.MILLISECONDS, true);
            this.spinTime = j2;
        }

        protected void invoke() throws ThrowableWrapper {
            Long valueOf = Long.valueOf(System.currentTimeMillis());
            Iterator it = RedissonDelayedQueueManager.this.delayedSpin.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                if (valueOf.longValue() - ((Long) entry.getValue()).longValue() >= this.spinTime) {
                    RedissonDelayedQueueManager.log.warn("SpinCycle {}-{}", entry.getValue(), Long.valueOf(this.spinTime));
                    it.remove();
                }
            }
        }
    }

    public RedissonDelayedQueueManager(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    @Override // com.swak.cache.spi.DelayedQueueManager
    public DelayedQueueHandler createQueue(String str) {
        DelayedQueueHandler delayedQueueHandler = this.delayedQueueHandlers.get(str);
        if (Objects.isNull(delayedQueueHandler)) {
            delayedQueueHandler = new SimpleDelayedQueueHandler(str, this.redissonClient, this.executor);
            this.delayedQueueHandlers.putIfAbsent(str, delayedQueueHandler);
        }
        return delayedQueueHandler;
    }

    @Override // com.swak.cache.spi.DelayedQueueManager
    public void addListeners(String str, DelayedQueueListener... delayedQueueListenerArr) {
        this.delayedQueueHandlers.computeIfAbsent(str, str2 -> {
            return createQueue(str);
        }).addListeners(delayedQueueListenerArr);
    }

    @Override // com.swak.cache.spi.DelayedQueueManager
    public DelayedQueueHandler getDelayedQueue(String str) {
        return this.delayedQueueHandlers.computeIfAbsent(str, str2 -> {
            return createQueue(str);
        });
    }

    @Override // com.swak.cache.spi.DelayedQueueManager
    public void onSubscribe(long j) {
        if (this.started.compareAndSet(false, true)) {
            Thread thread = new Thread(() -> {
                while (!this.threadToStop) {
                    for (String str : this.delayedQueueHandlers.keySet()) {
                        if (!this.delayedSpin.containsKey(str) && !onSubscribe(str)) {
                            this.delayedSpin.putIfAbsent(str, Long.valueOf(System.currentTimeMillis()));
                        }
                    }
                }
            });
            thread.setDaemon(true);
            thread.setName("delayedSingle");
            thread.start();
            WheelTimerHolder.lockWatchWheel().newTimeout(new SpinCycle(j, j), j, TimeUnit.SECONDS);
            this.subscribeThreads.put("SINGLE_SUBSCRIBE", thread);
        }
    }

    @Override // com.swak.cache.spi.DelayedQueueManager
    public void onMultiSubscribe(long j, String... strArr) {
        if (!ArrayUtils.isEmpty(strArr) && this.started.compareAndSet(false, true)) {
            for (String str : strArr) {
                Thread thread = new Thread(() -> {
                    while (!this.threadToStop) {
                        if (!onWaitSubscribe(str)) {
                            try {
                                log.info("[Swak-DelayedQueue] >>>>>>>>>>> Wait {} ms onSubscribe", Long.valueOf(j));
                                TimeUnit.MILLISECONDS.sleep(j);
                            } catch (InterruptedException e) {
                                log.error("onWaitSubscribe error", e);
                            }
                        }
                    }
                });
                thread.setName(str + "-Thread");
                thread.setDaemon(true);
                thread.start();
                this.subscribeThreads.put(str, thread);
            }
        }
    }

    @Override // com.swak.cache.spi.DelayedQueueManager
    public void stop() {
        this.threadToStop = true;
        this.started.set(false);
        try {
            TimeUnit.SECONDS.sleep(1L);
        } catch (InterruptedException e) {
            log.error(e.getMessage(), e);
        }
        if (MapUtils.isEmpty(this.subscribeThreads)) {
            return;
        }
        this.subscribeThreads.forEach((str, thread) -> {
            log.info("[Swak-DelayedQueue] >>>>>>>>>>> {}  Thread onSubscribe stop", str);
            if (thread.getState() != Thread.State.TERMINATED) {
                thread.interrupt();
                try {
                    thread.join();
                } catch (InterruptedException e2) {
                    log.error(e2.getMessage(), e2);
                }
            }
        });
        log.info("[Swak-DelayedQueue] >>>>>>>>>>> all onSubscribe stop");
    }

    @Override // com.swak.cache.spi.DelayedQueueManager
    public RedissonClient getClient() {
        return this.redissonClient;
    }

    private boolean onSubscribe(String str) {
        if (!this.delayedQueueHandlers.containsKey(str)) {
            return false;
        }
        DelayedQueueHandler delayedQueue = getDelayedQueue(str);
        if (Objects.isNull(delayedQueue)) {
            return false;
        }
        DelayEvent<?> poll = delayedQueue.poll();
        if (!Objects.nonNull(poll)) {
            return false;
        }
        delayedQueue.onSubscribe(poll);
        return true;
    }

    private boolean onWaitSubscribe(String str) {
        if (!this.delayedQueueHandlers.containsKey(str)) {
            return false;
        }
        DelayedQueueHandler delayedQueue = getDelayedQueue(str);
        if (Objects.isNull(delayedQueue)) {
            return false;
        }
        DelayEvent<?> poll = delayedQueue.poll();
        if (!Objects.nonNull(poll)) {
            return false;
        }
        delayedQueue.onSubscribe(poll);
        return true;
    }

    protected Executor initializeExecutor(DelayedConfigurer delayedConfigurer) {
        Executor executor = (Executor) Optional.ofNullable(delayedConfigurer).map((v0) -> {
            return v0.getDelayedExecutor();
        }).orElse(null);
        if (Objects.nonNull(executor)) {
            return executor;
        }
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(100);
        int max = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
        return new ThreadPoolExecutor(max, (int) Math.max(max * 1.5d, max + 1), 60L, TimeUnit.SECONDS, linkedBlockingQueue);
    }

    public int priority() {
        return 0;
    }
}
