package dev.profunktor.redis4cats.pubsub;

import cats.Apply;
import cats.FlatMap;
import cats.effect.kernel.Async;
import cats.effect.kernel.Ref$Make$;
import cats.effect.kernel.Resource;
import cats.effect.package$;
import cats.syntax.package$all$;
import dev.profunktor.redis4cats.connection.RedisClient;
import dev.profunktor.redis4cats.effect.FutureLift;
import dev.profunktor.redis4cats.effect.FutureLift$;
import dev.profunktor.redis4cats.effect.Log;
import dev.profunktor.redis4cats.effect.Log$;
import dev.profunktor.redis4cats.effect.MkRedis;
import dev.profunktor.redis4cats.effect.MkRedis$;
import dev.profunktor.redis4cats.effect.RedisExecutor;
import dev.profunktor.redis4cats.effect.RedisExecutor$;
import dev.profunktor.redis4cats.pubsub.internals.LivePubSubCommands;
import dev.profunktor.redis4cats.pubsub.internals.Publisher;
import dev.profunktor.redis4cats.pubsub.internals.Subscriber;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;

/* compiled from: PubSub.scala */
/* loaded from: input_file:dev/profunktor/redis4cats/pubsub/PubSub$.class */
public final class PubSub$ {
    public static final PubSub$ MODULE$ = new PubSub$();

    public <F, K, V> Tuple2<F, Function1<StatefulRedisPubSubConnection<K, V>, F>> acquireAndRelease(RedisClient redisClient, RedisCodec<K, V> redisCodec, Apply<F> apply, FutureLift<F> futureLift, Log<F> log, RedisExecutor<F> redisExecutor) {
        return new Tuple2<>(FutureLift$.MODULE$.apply(futureLift).liftConnectionFuture(RedisExecutor$.MODULE$.apply(redisExecutor).lift(() -> {
            return redisClient.underlying().connectPubSubAsync(redisCodec, redisClient.uri().underlying());
        }), redisExecutor), statefulRedisPubSubConnection -> {
            return package$all$.MODULE$.catsSyntaxApply(FutureLift$.MODULE$.apply(futureLift).liftCompletableFuture(RedisExecutor$.MODULE$.apply(redisExecutor).lift(() -> {
                return statefulRedisPubSubConnection.closeAsync();
            }), redisExecutor), apply).$times$greater(Log$.MODULE$.apply(log).info(() -> {
                return new StringBuilder(29).append("Releasing PubSub connection: ").append(redisClient.uri().underlying()).toString();
            }));
        });
    }

    public <F, K, V> Resource<F, PubSubCommands<?, K, V>> mkPubSubConnection(RedisClient redisClient, RedisCodec<K, V> redisCodec, Async<F> async, FutureLift<F> futureLift, Log<F> log, MkRedis<F> mkRedis) {
        return MkRedis$.MODULE$.apply(mkRedis).newExecutor().flatMap(redisExecutor -> {
            Tuple2 acquireAndRelease = MODULE$.acquireAndRelease(redisClient, redisCodec, async, futureLift, log, redisExecutor);
            if (acquireAndRelease == null) {
                throw new MatchError(acquireAndRelease);
            }
            Tuple2 tuple2 = new Tuple2(acquireAndRelease._1(), (Function1) acquireAndRelease._2());
            Object _1 = tuple2._1();
            Function1 function1 = (Function1) tuple2._2();
            return package$.MODULE$.Resource().eval(package$.MODULE$.Ref().of(Predef$.MODULE$.Map().empty(), Ref$Make$.MODULE$.concurrentInstance(async))).flatMap(ref -> {
                return package$.MODULE$.Resource().make(_1, function1, async).flatMap(statefulRedisPubSubConnection -> {
                    return package$.MODULE$.Resource().make(_1, function1, async).map(statefulRedisPubSubConnection -> {
                        return new LivePubSubCommands(ref, statefulRedisPubSubConnection, statefulRedisPubSubConnection, async, redisExecutor, log);
                    });
                });
            });
        });
    }

    public <F, K, V> Resource<F, PublishCommands<?, K, V>> mkPublisherConnection(RedisClient redisClient, RedisCodec<K, V> redisCodec, FlatMap<F> flatMap, FutureLift<F> futureLift, Log<F> log, MkRedis<F> mkRedis) {
        return MkRedis$.MODULE$.apply(mkRedis).newExecutor().flatMap(redisExecutor -> {
            Tuple2 acquireAndRelease = MODULE$.acquireAndRelease(redisClient, redisCodec, flatMap, futureLift, log, redisExecutor);
            if (acquireAndRelease == null) {
                throw new MatchError(acquireAndRelease);
            }
            Tuple2 tuple2 = new Tuple2(acquireAndRelease._1(), (Function1) acquireAndRelease._2());
            return package$.MODULE$.Resource().make(tuple2._1(), (Function1) tuple2._2(), flatMap).map(statefulRedisPubSubConnection -> {
                return new Publisher(statefulRedisPubSubConnection, flatMap, futureLift, redisExecutor);
            });
        });
    }

    public <F, K, V> Resource<F, SubscribeCommands<?, K, V>> mkSubscriberConnection(RedisClient redisClient, RedisCodec<K, V> redisCodec, Async<F> async, FutureLift<F> futureLift, Log<F> log) {
        return MkRedis$.MODULE$.apply(MkRedis$.MODULE$.forAsync(async, log)).newExecutor().flatMap(redisExecutor -> {
            Tuple2 acquireAndRelease = MODULE$.acquireAndRelease(redisClient, redisCodec, async, futureLift, log, redisExecutor);
            if (acquireAndRelease == null) {
                throw new MatchError(acquireAndRelease);
            }
            Tuple2 tuple2 = new Tuple2(acquireAndRelease._1(), (Function1) acquireAndRelease._2());
            Object _1 = tuple2._1();
            Function1 function1 = (Function1) tuple2._2();
            return package$.MODULE$.Resource().eval(package$.MODULE$.Ref().of(Predef$.MODULE$.Map().empty(), Ref$Make$.MODULE$.concurrentInstance(async))).flatMap(ref -> {
                return package$.MODULE$.Resource().make(_1, function1, async).map(statefulRedisPubSubConnection -> {
                    return new Subscriber(ref, statefulRedisPubSubConnection, async, futureLift, log, redisExecutor);
                });
            });
        });
    }

    private PubSub$() {
    }
}
