package xyz.mytang0.brook.core.queue;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import xyz.mytang0.brook.common.extension.Disposable;
import xyz.mytang0.brook.common.metadata.model.QueueMessage;
import xyz.mytang0.brook.core.utils.ThreadUtils;
import xyz.mytang0.brook.spi.queue.QueueService;

/* loaded from: input_file:xyz/mytang0/brook/core/queue/LocalQueueService.class */
public class LocalQueueService implements QueueService, Disposable {
    private final Map<String, ArrayBlockingQueue<QueueMessage>> queueMap = new ConcurrentHashMap();
    private final Map<String, QueueMessage> delayedMessages = new ConcurrentHashMap();
    private final ScheduledExecutorService delayedDeliver = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), ThreadUtils.threadsNamed("local-queue-delayed-deliver-%d"));

    public void offer(String str, QueueMessage queueMessage) {
        if (0 < queueMessage.getDelayMs()) {
            delayedDeliver(str, queueMessage, queueMessage.getDelayMs());
        } else {
            getQueue(str).add(queueMessage);
        }
    }

    private void delayedDeliver(String str, QueueMessage queueMessage, long j) {
        if (Objects.isNull(this.delayedMessages.put(queueMessage.getId(), queueMessage))) {
            try {
                this.delayedDeliver.schedule(() -> {
                    deliver(str, queueMessage.getId());
                }, j, TimeUnit.MILLISECONDS);
            } catch (Throwable th) {
                this.delayedMessages.remove(queueMessage.getId());
                throw th;
            }
        }
    }

    private void deliver(String str, String str2) {
        Optional.ofNullable(this.delayedMessages.remove(str2)).ifPresent(queueMessage -> {
            getQueue(str).add(queueMessage);
        });
    }

    public void offer(String str, List<QueueMessage> list) {
        if (CollectionUtils.isNotEmpty(list)) {
            list.forEach(queueMessage -> {
                offer(str, queueMessage);
            });
        }
    }

    public List<QueueMessage> poll(String str, int i, long j, TimeUnit timeUnit) {
        if (i < 1) {
            i = 1;
        }
        ArrayList arrayList = new ArrayList(i);
        ArrayBlockingQueue<QueueMessage> queue = getQueue(str);
        try {
            arrayList.add(queue.poll(j, timeUnit));
            int i2 = i - 1;
            if (!queue.isEmpty() && 0 < i2) {
                queue.drainTo(arrayList, i2);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return arrayList;
    }

    public void remove(String str, String str2) {
        this.delayedMessages.remove(str2);
    }

    public void remove(String str, List<String> list) {
        if (CollectionUtils.isNotEmpty(list)) {
            list.forEach(str2 -> {
                remove(str, str2);
            });
        }
    }

    private ArrayBlockingQueue<QueueMessage> getQueue(String str) {
        ArrayBlockingQueue<QueueMessage> arrayBlockingQueue = this.queueMap.get(str);
        if (arrayBlockingQueue == null) {
            synchronized (this) {
                arrayBlockingQueue = this.queueMap.get(str);
                if (arrayBlockingQueue == null) {
                    arrayBlockingQueue = this.queueMap.computeIfAbsent(str, str2 -> {
                        return new ArrayBlockingQueue(64);
                    });
                }
            }
        }
        return arrayBlockingQueue;
    }

    public void destroy() {
        this.delayedDeliver.shutdown();
        this.delayedMessages.clear();
        this.queueMap.values().forEach((v0) -> {
            v0.clear();
        });
        this.queueMap.clear();
    }
}
