package com.agorapulse.worker.queues.redis;

import com.agorapulse.worker.queue.JobQueues;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.lettuce.core.RedisClient;
import io.lettuce.core.TransactionResult;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.support.BoundedAsyncPool;
import io.lettuce.core.support.BoundedPoolConfig;
import io.micronaut.core.type.Argument;
import io.micronaut.jackson.JacksonConfiguration;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/agorapulse/worker/queues/redis/RedisQueues.class */
public class RedisQueues implements JobQueues {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisQueues.class);
    private static final String PREFIX_DATED_QUEUE = "DATED_QUEUE::";
    private final ObjectMapper objectMapper;
    private final BoundedAsyncPool<StatefulRedisConnection<String, String>> pool;

    public RedisQueues(ObjectMapper objectMapper, RedisClient redisClient, RedisPoolConfiguration redisPoolConfiguration) {
        this.objectMapper = objectMapper;
        this.pool = new BoundedAsyncPool<>(new ConnectionFactory(redisClient), BoundedPoolConfig.builder().minIdle(redisPoolConfiguration.getMinIdle()).maxIdle(redisPoolConfiguration.getMaxIdle()).maxTotal(redisPoolConfiguration.getMaxTotal()).testOnAcquire(redisPoolConfiguration.isTestOnAcquire()).testOnCreate(redisPoolConfiguration.isTestOnCreate()).testOnRelease(redisPoolConfiguration.isTestOnRelease()).build());
    }

    public <T> void readMessages(String str, int i, Duration duration, Argument<T> argument, Consumer<T> consumer) {
        TransactionResult withTransaction = withTransaction(redisCommands -> {
            String key = getKey(str);
            redisCommands.zrange(key, 0L, i - 1);
            redisCommands.zremrangebyrank(key, 0L, i - 1);
        });
        if (withTransaction == null) {
            return;
        }
        ((List) withTransaction.get(0)).forEach(str2 -> {
            try {
                consumer.accept(this.objectMapper.readValue(str2, JacksonConfiguration.constructType(argument, this.objectMapper.getTypeFactory())));
            } catch (JsonProcessingException e) {
                throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + str2, e);
            }
        });
    }

    public void sendMessage(String str, Object obj) {
        try {
            String writeValueAsString = this.objectMapper.writeValueAsString(obj);
            withRedis(redisAsyncCommands -> {
                String key = getKey(str);
                redisAsyncCommands.zscore(key, writeValueAsString).thenAccept(d -> {
                    if (d == null) {
                        redisAsyncCommands.zadd(key, System.currentTimeMillis(), writeValueAsString);
                    }
                });
            });
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException("Cannot write " + obj + " to JSON", e);
        }
    }

    private String getKey(String str) {
        return "DATED_QUEUE::" + str;
    }

    private TransactionResult withTransaction(Consumer<RedisCommands<String, String>> consumer) {
        try {
            StatefulRedisConnection statefulRedisConnection = (StatefulRedisConnection) this.pool.acquire().get();
            RedisCommands<String, String> sync = statefulRedisConnection.sync();
            try {
                sync.multi();
                consumer.accept(sync);
                TransactionResult exec = sync.exec();
                this.pool.release(statefulRedisConnection);
                return exec;
            } catch (Throwable th) {
                this.pool.release(statefulRedisConnection);
                throw th;
            }
        } catch (InterruptedException | ExecutionException e) {
            LOGGER.error("Exception obtaining connection from the pool", e);
            return null;
        }
    }

    private void withRedis(Consumer<RedisAsyncCommands<String, String>> consumer) {
        try {
            StatefulRedisConnection statefulRedisConnection = (StatefulRedisConnection) this.pool.acquire().get();
            try {
                consumer.accept(statefulRedisConnection.async());
                this.pool.release(statefulRedisConnection);
            } catch (Throwable th) {
                this.pool.release(statefulRedisConnection);
                throw th;
            }
        } catch (InterruptedException | ExecutionException e) {
            LOGGER.error("Exception obtaining connection from the pool", e);
        }
    }
}
