package dev.profunktor.redis4cats.pubsub;

import cats.effect.ConcurrentEffect;
import cats.effect.ContextShift;
import cats.effect.Resource;
import cats.effect.Resource$;
import cats.effect.Sync$;
import cats.effect.concurrent.Ref$;
import cats.syntax.package$all$;
import dev.profunktor.redis4cats.connection.RedisClient;
import dev.profunktor.redis4cats.effect.JRFuture$;
import dev.profunktor.redis4cats.effect.Log;
import dev.profunktor.redis4cats.effect.Log$;
import dev.profunktor.redis4cats.effect.RedisExecutor;
import dev.profunktor.redis4cats.effect.RedisExecutor$;
import dev.profunktor.redis4cats.pubsub.internals.LivePubSubCommands;
import dev.profunktor.redis4cats.pubsub.internals.Publisher;
import dev.profunktor.redis4cats.pubsub.internals.Subscriber;
import fs2.Stream;
import io.lettuce.core.ConnectionFuture;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.runtime.ModuleSerializationProxy;

/* compiled from: PubSub.scala */
/* loaded from: input_file:dev/profunktor/redis4cats/pubsub/PubSub$.class */
public final class PubSub$ implements Serializable {
    public static final PubSub$ MODULE$ = new PubSub$();

    private PubSub$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(PubSub$.class);
    }

    public <F, K, V> Tuple2<Object, Function1<StatefulRedisPubSubConnection<K, V>, Object>> acquireAndRelease(RedisClient redisClient, RedisCodec redisCodec, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, Log<F> log, RedisExecutor<F> redisExecutor) {
        return Tuple2$.MODULE$.apply(JRFuture$.MODULE$.fromConnectionFuture(Sync$.MODULE$.apply(concurrentEffect).delay(() -> {
            return r2.$anonfun$1(r3, r4);
        }), concurrentEffect, contextShift, redisExecutor), statefulRedisPubSubConnection -> {
            return package$all$.MODULE$.catsSyntaxApply(JRFuture$.MODULE$.fromCompletableFuture(Sync$.MODULE$.apply(concurrentEffect).delay(() -> {
                return r3.$anonfun$3$$anonfun$1(r4);
            }), concurrentEffect, contextShift, redisExecutor), concurrentEffect).$times$greater(Log$.MODULE$.apply(log).info(() -> {
                return r2.$anonfun$4$$anonfun$2(r3);
            }));
        });
    }

    public <F, K, V> Resource<F, PubSubCommands<Stream, K, V>> mkPubSubConnection(RedisClient redisClient, RedisCodec redisCodec, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, Log<F> log) {
        return RedisExecutor$.MODULE$.make(contextShift, concurrentEffect).flatMap(redisExecutor -> {
            Tuple2 acquireAndRelease = acquireAndRelease(redisClient, redisCodec, concurrentEffect, contextShift, log, redisExecutor);
            if (acquireAndRelease == null) {
                throw new MatchError(acquireAndRelease);
            }
            Tuple2 apply = Tuple2$.MODULE$.apply(acquireAndRelease._1(), (Function1) acquireAndRelease._2());
            Object _1 = apply._1();
            Function1 function1 = (Function1) apply._2();
            return Resource$.MODULE$.eval(Ref$.MODULE$.of(Predef$.MODULE$.Map().empty(), concurrentEffect), concurrentEffect).flatMap(ref -> {
                return Resource$.MODULE$.make(_1, function1, concurrentEffect).flatMap(statefulRedisPubSubConnection -> {
                    return Resource$.MODULE$.make(_1, function1, concurrentEffect).map(statefulRedisPubSubConnection -> {
                        return new LivePubSubCommands(ref, statefulRedisPubSubConnection, statefulRedisPubSubConnection, concurrentEffect, contextShift, redisExecutor, log);
                    }, concurrentEffect);
                });
            });
        });
    }

    public <F, K, V> Resource<F, PublishCommands<Stream, K, V>> mkPublisherConnection(RedisClient redisClient, RedisCodec redisCodec, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, Log<F> log) {
        return RedisExecutor$.MODULE$.make(contextShift, concurrentEffect).flatMap(redisExecutor -> {
            Tuple2 acquireAndRelease = acquireAndRelease(redisClient, redisCodec, concurrentEffect, contextShift, log, redisExecutor);
            if (acquireAndRelease == null) {
                throw new MatchError(acquireAndRelease);
            }
            Tuple2 apply = Tuple2$.MODULE$.apply(acquireAndRelease._1(), (Function1) acquireAndRelease._2());
            return Resource$.MODULE$.make(apply._1(), (Function1) apply._2(), concurrentEffect).map(statefulRedisPubSubConnection -> {
                return new Publisher(statefulRedisPubSubConnection, concurrentEffect, contextShift, redisExecutor);
            }, concurrentEffect);
        });
    }

    public <F, K, V> Resource<F, SubscribeCommands<Stream, K, V>> mkSubscriberConnection(RedisClient redisClient, RedisCodec redisCodec, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, Log<F> log) {
        return RedisExecutor$.MODULE$.make(contextShift, concurrentEffect).flatMap(redisExecutor -> {
            Tuple2 acquireAndRelease = acquireAndRelease(redisClient, redisCodec, concurrentEffect, contextShift, log, redisExecutor);
            if (acquireAndRelease == null) {
                throw new MatchError(acquireAndRelease);
            }
            Tuple2 apply = Tuple2$.MODULE$.apply(acquireAndRelease._1(), (Function1) acquireAndRelease._2());
            Object _1 = apply._1();
            Function1 function1 = (Function1) apply._2();
            return Resource$.MODULE$.eval(Ref$.MODULE$.of(Predef$.MODULE$.Map().empty(), concurrentEffect), concurrentEffect).flatMap(ref -> {
                return Resource$.MODULE$.make(_1, function1, concurrentEffect).map(statefulRedisPubSubConnection -> {
                    return new Subscriber(ref, statefulRedisPubSubConnection, concurrentEffect, contextShift, redisExecutor, log);
                }, concurrentEffect);
            });
        });
    }

    private final ConnectionFuture $anonfun$1(RedisClient redisClient, RedisCodec redisCodec) {
        return redisClient.underlying().connectPubSubAsync(redisCodec, redisClient.uri().underlying());
    }

    private final CompletableFuture $anonfun$3$$anonfun$1(StatefulRedisPubSubConnection statefulRedisPubSubConnection) {
        return statefulRedisPubSubConnection.closeAsync();
    }

    private final String $anonfun$4$$anonfun$2(RedisClient redisClient) {
        return "Releasing PubSub connection: " + redisClient.uri().underlying();
    }
}
