package com.agorapulse.worker.redis;

import com.agorapulse.worker.JobManager;
import com.agorapulse.worker.event.JobExecutorEvent;
import com.agorapulse.worker.executor.DistributedJobExecutor;
import com.agorapulse.worker.executor.ExecutorId;
import com.agorapulse.worker.job.JobRunContext;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requirements;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Singleton
@Requirements({@Requires(beans = {StatefulRedisConnection.class}, property = "redis.uri"), @Requires(property = "worker.executor.redis.enabled", value = "true", defaultValue = "true")})
/* loaded from: input_file:com/agorapulse/worker/redis/RedisJobExecutor.class */
public class RedisJobExecutor implements DistributedJobExecutor {
    private static final String EXECUTOR_TYPE = "redis";
    private static final String LIBRARY_PREFIX = "APMW::";
    private static final String PREFIX_LEADER = "APMW::LEADER::";
    private static final String PREFIX_COUNT = "APMW::COUNT::";
    private static final int LEADER_INACTIVITY_TIMEOUT = 500;
    private static final int LOCK_TIMEOUT = 60;
    private static final int COUNT_TIMEOUT = 600;
    private static final String DECREASE_JOB_COUNT = "return redis.call('decr', KEYS[1])";
    private final StatefulRedisConnection<String, String> connection;
    private final ExecutorId executorId;
    private final BeanContext beanContext;
    private final JobManager jobManager;
    private final ApplicationEventPublisher<JobExecutorEvent> eventPublisher;
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisJobExecutor.class);
    private static final String LEADER_CHECK = String.join("\n", "redis.call('set', KEYS[1], KEYS[2], 'nx', 'ex', KEYS[3])", "local result = redis.call('get', KEYS[1])", "if result and result == KEYS[2]", "then redis.call('expire', KEYS[1], KEYS[3]) end", "return result");
    private static final String INCREASE_JOB_COUNT = String.join("\n", "redis.call('set', KEYS[1], 0, 'nx', 'ex', KEYS[2])", "return redis.call('incr', KEYS[1])");

    public RedisJobExecutor(StatefulRedisConnection<String, String> statefulRedisConnection, ExecutorId executorId, BeanContext beanContext, JobManager jobManager, ApplicationEventPublisher<JobExecutorEvent> applicationEventPublisher) {
        this.connection = statefulRedisConnection;
        this.executorId = executorId;
        this.beanContext = beanContext;
        this.jobManager = jobManager;
        this.eventPublisher = applicationEventPublisher;
    }

    public <R> Publisher<R> executeOnlyOnLeader(JobRunContext jobRunContext, Callable<R> callable) {
        return readMasterHostname(jobRunContext.getStatus().getName(), this.connection.async()).flatMap(obj -> {
            if (!this.executorId.id().equals(obj)) {
                this.eventPublisher.publishEvent(JobExecutorEvent.leaderOnly(EXECUTOR_TYPE, JobExecutorEvent.Execution.SKIP, jobRunContext.getStatus(), this.executorId.id()));
                return Mono.empty();
            }
            jobRunContext.executed();
            this.eventPublisher.publishEvent(JobExecutorEvent.leaderOnly(EXECUTOR_TYPE, JobExecutorEvent.Execution.EXECUTE, jobRunContext.getStatus(), this.executorId.id()));
            return Mono.fromCallable(callable).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobRunContext.getStatus().getName())));
        }).flux();
    }

    public <R> Publisher<R> executeConcurrently(JobRunContext jobRunContext, int i, Callable<R> callable) {
        RedisAsyncCommands async = this.connection.async();
        return readAndIncreaseCurrentCount(jobRunContext.getStatus().getName(), async, i <= 1 ? LOCK_TIMEOUT : COUNT_TIMEOUT).flatMap(l -> {
            if (l.longValue() > i) {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Skipping execution of the job {} as the concurrency level {} is already reached", jobRunContext.getStatus().getName(), Integer.valueOf(i));
                }
                this.eventPublisher.publishEvent(JobExecutorEvent.concurrent(EXECUTOR_TYPE, JobExecutorEvent.Execution.SKIP, jobRunContext.getStatus(), i, this.executorId.id()));
                return decreaseCurrentExecutionCount(jobRunContext.getStatus().getName(), async).flatMap(l -> {
                    return Mono.empty();
                });
            }
            jobRunContext.executed();
            jobRunContext.onFinished(jobRunStatus -> {
                decreaseCurrentExecutionCount(jobRunStatus.getName(), async).subscribe();
            });
            this.eventPublisher.publishEvent(JobExecutorEvent.concurrent(EXECUTOR_TYPE, JobExecutorEvent.Execution.EXECUTE, jobRunContext.getStatus(), i, this.executorId.id()));
            return Mono.fromCallable(callable).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobRunContext.getStatus().getName())));
        }).flux();
    }

    public <R> Publisher<R> executeOnlyOnFollower(JobRunContext jobRunContext, Callable<R> callable) {
        return readMasterHostname(jobRunContext.getStatus().getName(), this.connection.async()).flatMap(obj -> {
            if (!"".equals(obj) && obj.equals(this.executorId.id())) {
                this.eventPublisher.publishEvent(JobExecutorEvent.followerOnly(EXECUTOR_TYPE, JobExecutorEvent.Execution.SKIP, jobRunContext.getStatus(), this.executorId.id()));
                return Mono.empty();
            }
            jobRunContext.executed();
            this.eventPublisher.publishEvent(JobExecutorEvent.followerOnly(EXECUTOR_TYPE, JobExecutorEvent.Execution.EXECUTE, jobRunContext.getStatus(), this.executorId.id()));
            return Mono.fromCallable(callable).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobRunContext.getStatus().getName())));
        }).flux();
    }

    public <R> Publisher<R> execute(JobRunContext jobRunContext, Callable<R> callable) {
        return Mono.fromCallable(callable).doFinally(signalType -> {
            jobRunContext.executed();
            this.eventPublisher.publishEvent(new JobExecutorEvent(EXECUTOR_TYPE, JobExecutorEvent.Type.ALWAYS, JobExecutorEvent.Execution.EXECUTE, jobRunContext.getStatus(), 0, this.executorId.id()));
        }).subscribeOn(getScheduler(jobRunContext)).flux();
    }

    private static Mono<Long> readAndIncreaseCurrentCount(String str, RedisAsyncCommands<String, String> redisAsyncCommands, int i) {
        Mono fromFuture = Mono.fromFuture(redisAsyncCommands.eval(INCREASE_JOB_COUNT, ScriptOutputType.INTEGER, new String[]{"APMW::COUNT::" + str, String.valueOf(i)}).toCompletableFuture());
        Class<Long> cls = Long.class;
        Objects.requireNonNull(Long.class);
        return fromFuture.map(cls::cast);
    }

    private static Mono<Long> decreaseCurrentExecutionCount(String str, RedisAsyncCommands<String, String> redisAsyncCommands) {
        Mono fromFuture = Mono.fromFuture(redisAsyncCommands.eval(DECREASE_JOB_COUNT, ScriptOutputType.INTEGER, new String[]{"APMW::COUNT::" + str}).toCompletableFuture());
        Class<Long> cls = Long.class;
        Objects.requireNonNull(Long.class);
        return fromFuture.map(cls::cast);
    }

    private Mono<Object> readMasterHostname(String str, RedisAsyncCommands<String, String> redisAsyncCommands) {
        return Mono.delay(Duration.ofMillis(ThreadLocalRandom.current().nextInt(1, LEADER_INACTIVITY_TIMEOUT))).flatMap(l -> {
            return Mono.fromFuture(redisAsyncCommands.eval(LEADER_CHECK, ScriptOutputType.VALUE, new String[]{"APMW::LEADER::" + str, this.executorId.id(), String.valueOf(LEADER_INACTIVITY_TIMEOUT)}).toCompletableFuture());
        }).defaultIfEmpty("");
    }

    private Scheduler getScheduler(JobRunContext jobRunContext) {
        return Schedulers.fromExecutor(getExecutorService(jobRunContext.getStatus().getName()));
    }

    private ExecutorService getExecutorService(String str) {
        return (ExecutorService) this.jobManager.getJob(str).map((v0) -> {
            return v0.getConfiguration();
        }).map((v0) -> {
            return v0.getScheduler();
        }).flatMap(str2 -> {
            return this.beanContext.findBean(ExecutorService.class, Qualifiers.byName(str2));
        }).or(() -> {
            return this.beanContext.findBean(ExecutorService.class);
        }).orElseThrow(() -> {
            return new IllegalArgumentException("No executor service found for job " + str);
        });
    }
}
