/*
 * Decompiled with CFR 0.152.
 */
package com.agorapulse.worker.queues.redis;

import com.agorapulse.worker.queue.JobQueues;
import com.agorapulse.worker.queues.redis.ConnectionFactory;
import com.agorapulse.worker.queues.redis.RedisPoolConfiguration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import io.lettuce.core.RedisClient;
import io.lettuce.core.TransactionResult;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.support.AsyncObjectFactory;
import io.lettuce.core.support.BoundedAsyncPool;
import io.lettuce.core.support.BoundedPoolConfig;
import io.micronaut.core.type.Argument;
import io.micronaut.jackson.JacksonConfiguration;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisQueues
implements JobQueues {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisQueues.class);
    private static final String PREFIX_DATED_QUEUE = "DATED_QUEUE::";
    private final ObjectMapper objectMapper;
    private final BoundedAsyncPool<StatefulRedisConnection<String, String>> pool;

    public RedisQueues(ObjectMapper objectMapper, RedisClient client, RedisPoolConfiguration redisPoolConfiguration) {
        this.objectMapper = objectMapper;
        BoundedPoolConfig config = BoundedPoolConfig.builder().minIdle(redisPoolConfiguration.getMinIdle()).maxIdle(redisPoolConfiguration.getMaxIdle()).maxTotal(redisPoolConfiguration.getMaxTotal()).testOnAcquire(redisPoolConfiguration.isTestOnAcquire()).testOnCreate(redisPoolConfiguration.isTestOnCreate()).testOnRelease(redisPoolConfiguration.isTestOnRelease()).build();
        this.pool = new BoundedAsyncPool((AsyncObjectFactory)new ConnectionFactory(client), config);
    }

    public <T> void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument, Consumer<T> action) {
        TransactionResult result = this.withTransaction(redisCommands -> {
            String key = this.getKey(queueName);
            redisCommands.zrange((Object)key, 0L, (long)(maxNumberOfMessages - 1));
            redisCommands.zremrangebyrank((Object)key, 0L, (long)(maxNumberOfMessages - 1));
        });
        if (result == null) {
            return;
        }
        Object firstResponse = result.get(0);
        if (!(firstResponse instanceof List)) {
            throw new IllegalStateException("There result is not a list of Strings. Got: " + firstResponse);
        }
        List messages = (List)firstResponse;
        messages.forEach(body -> {
            try {
                action.accept(this.objectMapper.readValue(body, JacksonConfiguration.constructType((Argument)argument, (TypeFactory)this.objectMapper.getTypeFactory())));
            }
            catch (JsonProcessingException e) {
                if (argument.equalsType(Argument.STRING)) {
                    action.accept(body);
                }
                throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e);
            }
        });
    }

    public void sendMessage(String queueName, Object result) {
        try {
            String item = this.objectMapper.writeValueAsString(result);
            this.sendRawMessage(queueName, item);
        }
        catch (JsonProcessingException e) {
            throw new IllegalArgumentException("Cannot write " + result + " to JSON", e);
        }
    }

    public void sendRawMessage(String queueName, Object result) {
        String item = result.toString();
        this.withRedis(redisCommands -> {
            String key = this.getKey(queueName);
            redisCommands.zscore((Object)key, (Object)item).thenAccept(zscore -> {
                if (zscore == null) {
                    long time = System.currentTimeMillis();
                    redisCommands.zadd((Object)key, (double)time, (Object)item);
                }
            });
        });
    }

    private String getKey(String queueName) {
        return PREFIX_DATED_QUEUE + queueName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TransactionResult withTransaction(Consumer<RedisCommands<String, String>> action) {
        TransactionResult transactionResult;
        StatefulRedisConnection connection = (StatefulRedisConnection)this.pool.acquire().get();
        RedisCommands sync = connection.sync();
        try {
            sync.multi();
            action.accept((RedisCommands<String, String>)sync);
            transactionResult = sync.exec();
        }
        catch (Throwable throwable) {
            try {
                this.pool.release((Object)connection);
                throw throwable;
            }
            catch (InterruptedException | ExecutionException e) {
                LOGGER.error("Exception obtaining connection from the pool", (Throwable)e);
                return null;
            }
        }
        this.pool.release((Object)connection);
        return transactionResult;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void withRedis(Consumer<RedisAsyncCommands<String, String>> action) {
        try {
            StatefulRedisConnection connection = (StatefulRedisConnection)this.pool.acquire().get();
            RedisAsyncCommands sync = connection.async();
            try {
                action.accept((RedisAsyncCommands<String, String>)sync);
            }
            finally {
                this.pool.release((Object)connection);
            }
        }
        catch (InterruptedException | ExecutionException e) {
            LOGGER.error("Exception obtaining connection from the pool", (Throwable)e);
        }
    }
}

