package org.hswebframework.task.cluster.redisson;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.hswebframework.task.cluster.Queue;
import org.redisson.api.RBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/hswebframework/task/cluster/redisson/RedissonQueue.class */
public class RedissonQueue<T> implements Queue<T> {
    private static final Logger log = LoggerFactory.getLogger(RedissonQueue.class);
    private RBlockingQueue<T> realQueue;
    private ExecutorService executorService;
    private boolean running;
    private Stream<T> stream;
    private Future<?> stage;
    private Set<Consumer<T>> consumers = new HashSet();
    private volatile Consumer<T> consumer = obj -> {
        Iterator<Consumer<T>> it = this.consumers.iterator();
        while (it.hasNext()) {
            try {
                it.next().accept(obj);
            } catch (Exception e) {
                log.warn("accept queue data error:{}", e.getMessage(), e);
            }
        }
    };

    public RedissonQueue(RBlockingQueue<T> rBlockingQueue, ExecutorService executorService) {
        this.realQueue = rBlockingQueue;
        this.executorService = executorService;
    }

    @Override // org.hswebframework.task.cluster.Queue
    public boolean add(T t) {
        return this.realQueue.add(t);
    }

    @Override // org.hswebframework.task.cluster.Queue
    public synchronized void consume(Consumer<T> consumer) {
        this.consumers.add(consumer);
        if (this.running) {
            return;
        }
        startConsumer();
    }

    protected T take() {
        return (T) this.realQueue.take();
    }

    protected void startConsumer() {
        this.running = true;
        this.stream = Stream.generate(this::take).filter(Objects::nonNull);
        this.stage = CompletableFuture.runAsync(() -> {
            ((Stream) this.stream.parallel()).forEach(obj -> {
                this.consumer.accept(obj);
            });
        }, this.executorService);
    }

    @Override // org.hswebframework.task.cluster.Queue
    public T poll(long j, TimeUnit timeUnit) {
        return (T) this.realQueue.poll(j, timeUnit);
    }

    @Override // org.hswebframework.task.cluster.Queue
    public void close() {
        this.realQueue.expire(1L, TimeUnit.MILLISECONDS);
        this.running = false;
        if (this.stream != null) {
            this.stream.close();
            this.stage.cancel(true);
        }
    }
}
