/*
 * Decompiled with CFR 0.152.
 */
package com.agorapulse.worker.redis;

import com.agorapulse.worker.JobConfiguration;
import com.agorapulse.worker.JobInfo;
import com.agorapulse.worker.JobManager;
import com.agorapulse.worker.JobRunStatus;
import com.agorapulse.worker.event.JobExecutorEvent;
import com.agorapulse.worker.executor.DistributedJobExecutor;
import com.agorapulse.worker.executor.ExecutorId;
import com.agorapulse.worker.job.JobRunContext;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requirements;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Singleton
@Requirements(value={@Requires(beans={StatefulRedisConnection.class}, property="redis.uri"), @Requires(property="worker.executor.redis.enabled", value="true", defaultValue="true")})
public class RedisJobExecutor
implements DistributedJobExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisJobExecutor.class);
    private static final String EXECUTOR_TYPE = "redis";
    private static final String LIBRARY_PREFIX = "APMW::";
    private static final String PREFIX_LEADER = "APMW::LEADER::";
    private static final String PREFIX_COUNT = "APMW::COUNT::";
    private static final int LEADER_INACTIVITY_TIMEOUT = 500;
    private static final int LOCK_TIMEOUT = 60;
    private static final int COUNT_TIMEOUT = 600;
    private static final String LEADER_CHECK = String.join((CharSequence)"\n", "redis.call('set', KEYS[1], KEYS[2], 'nx', 'ex', KEYS[3])", "local result = redis.call('get', KEYS[1])", "if result and result == KEYS[2]", "then redis.call('expire', KEYS[1], KEYS[3]) end", "return result");
    private static final String INCREASE_JOB_COUNT = String.join((CharSequence)"\n", "redis.call('set', KEYS[1], 0, 'nx', 'ex', KEYS[2])", "return redis.call('incr', KEYS[1])");
    private static final String DECREASE_JOB_COUNT = "return redis.call('decr', KEYS[1])";
    private final StatefulRedisConnection<String, String> connection;
    private final ExecutorId executorId;
    private final BeanContext beanContext;
    private final JobManager jobManager;
    private final ApplicationEventPublisher<JobExecutorEvent> eventPublisher;

    public RedisJobExecutor(StatefulRedisConnection<String, String> connection, ExecutorId executorId, BeanContext beanContext, JobManager jobManager, ApplicationEventPublisher<JobExecutorEvent> eventPublisher) {
        this.connection = connection;
        this.executorId = executorId;
        this.beanContext = beanContext;
        this.jobManager = jobManager;
        this.eventPublisher = eventPublisher;
    }

    public <R> Publisher<R> executeOnlyOnLeader(JobRunContext context, Callable<R> supplier) {
        RedisAsyncCommands commands = this.connection.async();
        return this.readMasterHostname(context.getStatus().getName(), (RedisAsyncCommands<String, String>)commands).flatMap(h -> {
            if (this.executorId.id().equals(h)) {
                context.executed();
                this.eventPublisher.publishEvent((Object)JobExecutorEvent.leaderOnly((String)EXECUTOR_TYPE, (JobExecutorEvent.Execution)JobExecutorEvent.Execution.EXECUTE, (JobRunStatus)context.getStatus(), (String)this.executorId.id()));
                return Mono.fromCallable((Callable)supplier).subscribeOn(Schedulers.fromExecutorService((ExecutorService)this.getExecutorService(context.getStatus().getName())));
            }
            this.eventPublisher.publishEvent((Object)JobExecutorEvent.leaderOnly((String)EXECUTOR_TYPE, (JobExecutorEvent.Execution)JobExecutorEvent.Execution.SKIP, (JobRunStatus)context.getStatus(), (String)this.executorId.id()));
            return Mono.empty();
        }).flux();
    }

    public <R> Publisher<R> executeConcurrently(JobRunContext context, int maxConcurrency, Callable<R> supplier) {
        RedisAsyncCommands commands = this.connection.async();
        return RedisJobExecutor.readAndIncreaseCurrentCount(context.getStatus().getName(), (RedisAsyncCommands<String, String>)commands, maxConcurrency <= 1 ? 60 : 600).flatMap(count -> {
            if (count > (long)maxConcurrency) {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Skipping execution of the job {} as the concurrency level {} is already reached", (Object)context.getStatus().getName(), (Object)maxConcurrency);
                }
                this.eventPublisher.publishEvent((Object)JobExecutorEvent.concurrent((String)EXECUTOR_TYPE, (JobExecutorEvent.Execution)JobExecutorEvent.Execution.SKIP, (JobRunStatus)context.getStatus(), (int)maxConcurrency, (String)this.executorId.id()));
                return RedisJobExecutor.decreaseCurrentExecutionCount(context.getStatus().getName(), (RedisAsyncCommands<String, String>)commands).flatMap(decreased -> Mono.empty());
            }
            context.executed();
            context.onFinished(s -> RedisJobExecutor.decreaseCurrentExecutionCount(s.getName(), (RedisAsyncCommands<String, String>)commands).subscribe());
            this.eventPublisher.publishEvent((Object)JobExecutorEvent.concurrent((String)EXECUTOR_TYPE, (JobExecutorEvent.Execution)JobExecutorEvent.Execution.EXECUTE, (JobRunStatus)context.getStatus(), (int)maxConcurrency, (String)this.executorId.id()));
            return Mono.fromCallable((Callable)supplier).subscribeOn(Schedulers.fromExecutorService((ExecutorService)this.getExecutorService(context.getStatus().getName())));
        }).flux();
    }

    public <R> Publisher<R> executeOnlyOnFollower(JobRunContext context, Callable<R> supplier) {
        RedisAsyncCommands commands = this.connection.async();
        return this.readMasterHostname(context.getStatus().getName(), (RedisAsyncCommands<String, String>)commands).flatMap(h -> {
            if (!"".equals(h) && h.equals(this.executorId.id())) {
                this.eventPublisher.publishEvent((Object)JobExecutorEvent.followerOnly((String)EXECUTOR_TYPE, (JobExecutorEvent.Execution)JobExecutorEvent.Execution.SKIP, (JobRunStatus)context.getStatus(), (String)this.executorId.id()));
                return Mono.empty();
            }
            context.executed();
            this.eventPublisher.publishEvent((Object)JobExecutorEvent.followerOnly((String)EXECUTOR_TYPE, (JobExecutorEvent.Execution)JobExecutorEvent.Execution.EXECUTE, (JobRunStatus)context.getStatus(), (String)this.executorId.id()));
            return Mono.fromCallable((Callable)supplier).subscribeOn(Schedulers.fromExecutorService((ExecutorService)this.getExecutorService(context.getStatus().getName())));
        }).flux();
    }

    private static Mono<Long> readAndIncreaseCurrentCount(String jobName, RedisAsyncCommands<String, String> commands, int timeout) {
        return Mono.fromFuture((CompletableFuture)commands.eval(INCREASE_JOB_COUNT, ScriptOutputType.INTEGER, (Object[])new String[]{PREFIX_COUNT + jobName, String.valueOf(timeout)}).toCompletableFuture()).map(Long.class::cast);
    }

    private static Mono<Long> decreaseCurrentExecutionCount(String jobName, RedisAsyncCommands<String, String> commands) {
        return Mono.fromFuture((CompletableFuture)commands.eval(DECREASE_JOB_COUNT, ScriptOutputType.INTEGER, (Object[])new String[]{PREFIX_COUNT + jobName}).toCompletableFuture()).map(Long.class::cast);
    }

    private Mono<Object> readMasterHostname(String jobName, RedisAsyncCommands<String, String> commands) {
        int randomDelay = ThreadLocalRandom.current().nextInt(1, 500);
        return Mono.delay((Duration)Duration.ofMillis(randomDelay)).flatMap(ignored -> Mono.fromFuture((CompletableFuture)commands.eval(LEADER_CHECK, ScriptOutputType.VALUE, (Object[])new String[]{PREFIX_LEADER + jobName, this.executorId.id(), String.valueOf(500)}).toCompletableFuture())).defaultIfEmpty((Object)"");
    }

    private ExecutorService getExecutorService(String jobName) {
        return (ExecutorService)this.jobManager.getJob(jobName).map(JobInfo::getConfiguration).map(JobConfiguration::getScheduler).flatMap(name -> this.beanContext.findBean(ExecutorService.class, Qualifiers.byName((String)name))).or(() -> this.beanContext.findBean(ExecutorService.class)).orElseThrow(() -> new IllegalArgumentException("No executor service found for job " + jobName));
    }
}

