package org.jetlinks.supports.cluster.redis;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.jetlinks.core.cluster.ClusterQueue;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.script.RedisScript;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/cluster/redis/RedisClusterQueue.class */
public class RedisClusterQueue<T> implements ClusterQueue<T> {
    private final String id;
    private ReactiveRedisOperations<String, T> operations;
    private volatile Disposable disposable;
    private volatile Disposable timer;
    private final RedisScript<Long> pushAndPublish;
    private FluxProcessor<T, T> processor = EmitterProcessor.create(512, false);
    private AtomicBoolean polling = new AtomicBoolean(false);
    private int batchSize = 32;
    private volatile float localConsumerPercent = 1.0f;
    private final RedisScript<List<T>> batchPollScript = RedisScript.of("local val = redis.call('lrange',KEYS[1],0," + this.batchSize + ");redis.call('ltrim',KEYS[1]," + (this.batchSize + 1) + ",-1);return val;", List.class);

    public void setLocalConsumerPercent(float f) {
        this.localConsumerPercent = f;
    }

    public RedisClusterQueue(String str, ReactiveRedisOperations<String, T> reactiveRedisOperations) {
        this.id = str;
        this.operations = reactiveRedisOperations;
        this.pushAndPublish = RedisScript.of("local val = redis.call('lpush',KEYS[1],ARGV[1]);redis.call('publish'," + "'queue:data:produced:".concat(str) + "',ARGV[2]);return val;", Long.class);
    }

    protected void startPoll() {
        if (this.disposable == null && this.timer == null) {
            this.disposable = this.operations.listenToChannel(new String[]{"queue:data:produced:".concat(this.id)}).map((v0) -> {
                return v0.getMessage();
            }).subscribe(obj -> {
                doPoll();
            });
            this.timer = Flux.interval(Duration.ofSeconds(5L)).subscribe(l -> {
                doPoll();
            });
        }
    }

    protected void doPoll() {
        if (this.polling.compareAndSet(false, true)) {
            if (this.processor.hasDownstreams()) {
                pollBatch().doOnNext(obj -> {
                    if (this.processor.hasDownstreams()) {
                        this.processor.onNext(obj);
                    } else {
                        this.operations.opsForList().leftPush(this.id, obj).subscribe();
                    }
                }).count().doFinally(signalType -> {
                    this.polling.set(false);
                }).subscribe(l -> {
                    if (l.longValue() >= this.batchSize) {
                        this.polling.set(false);
                        doPoll();
                    }
                });
            } else {
                stopPoll();
            }
        }
    }

    protected void stopPoll() {
        if (this.disposable != null) {
            this.disposable.dispose();
            this.disposable = null;
        }
        if (this.timer != null) {
            this.timer.dispose();
            this.timer = null;
        }
    }

    @Nonnull
    public Flux<T> subscribe() {
        return this.processor.doOnSubscribe(subscription -> {
            startPoll();
        }).doFinally(signalType -> {
            stopPoll();
        });
    }

    public void stop() {
        stopPoll();
    }

    public Mono<Integer> size() {
        return this.operations.opsForList().size(this.id).map((v0) -> {
            return v0.intValue();
        });
    }

    @Nonnull
    public Mono<T> poll() {
        return this.operations.opsForList().leftPop(this.id);
    }

    private Flux<T> pollBatch() {
        return this.operations.execute(this.batchPollScript, Collections.singletonList(this.id)).flatMap((v0) -> {
            return Flux.fromIterable(v0);
        });
    }

    private ReactiveRedisOperations getOperations() {
        return this.operations;
    }

    public Mono<Boolean> add(Publisher<T> publisher) {
        return Flux.from(publisher).flatMap(obj -> {
            if (!this.processor.hasDownstreams() || Math.random() >= this.localConsumerPercent) {
                return getOperations().execute(this.pushAndPublish, Arrays.asList(this.id), Arrays.asList(obj, "1"));
            }
            this.processor.onNext(obj);
            return Mono.just(1);
        }).then(Mono.just(true));
    }

    public Mono<Boolean> addBatch(Publisher<? extends Collection<T>> publisher) {
        return Flux.from(publisher).flatMap(collection -> {
            if (!this.processor.hasDownstreams() || Math.random() >= this.localConsumerPercent) {
                return this.operations.opsForList().leftPushAll(this.id, collection).doOnNext(l -> {
                    getOperations().convertAndSend("queue:data:produced:".concat(this.id), "1");
                });
            }
            FluxProcessor<T, T> fluxProcessor = this.processor;
            fluxProcessor.getClass();
            collection.forEach(fluxProcessor::onNext);
            return Mono.just(1);
        }).then(Mono.just(true));
    }
}
