package dev.profunktor.redis4cats.interpreter.pubsub;

import cats.effect.ConcurrentEffect;
import cats.effect.ContextShift;
import cats.effect.Sync$;
import cats.effect.concurrent.Ref;
import cats.effect.concurrent.Ref$;
import cats.syntax.package$all$;
import dev.profunktor.redis4cats.algebra.PubSubCommands;
import dev.profunktor.redis4cats.algebra.PublishCommands;
import dev.profunktor.redis4cats.algebra.SubscribeCommands;
import dev.profunktor.redis4cats.connection.RedisClient;
import dev.profunktor.redis4cats.effect.JRFuture$;
import dev.profunktor.redis4cats.effect.Log;
import dev.profunktor.redis4cats.effect.Log$;
import fs2.Stream;
import fs2.Stream$;
import fs2.internal.FreeC;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;

/* compiled from: PubSub.scala */
/* loaded from: input_file:dev/profunktor/redis4cats/interpreter/pubsub/PubSub$.class */
public final class PubSub$ {
    public static PubSub$ MODULE$;

    static {
        new PubSub$();
    }

    public <F, K, V> Tuple2<F, Function1<StatefulRedisPubSubConnection<K, V>, F>> acquireAndRelease(RedisClient redisClient, RedisCodec<K, V> redisCodec, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, Log<F> log) {
        return new Tuple2<>(JRFuture$.MODULE$.fromConnectionFuture(Sync$.MODULE$.apply(concurrentEffect).delay(() -> {
            return redisClient.underlying().connectPubSubAsync(redisCodec, redisClient.uri().underlying());
        }), concurrentEffect, contextShift), statefulRedisPubSubConnection -> {
            return package$all$.MODULE$.catsSyntaxApply(JRFuture$.MODULE$.fromCompletableFuture(Sync$.MODULE$.apply(concurrentEffect).delay(() -> {
                return statefulRedisPubSubConnection.closeAsync();
            }), concurrentEffect, contextShift), concurrentEffect).$times$greater(Log$.MODULE$.apply(log).info(() -> {
                return new StringBuilder(29).append("Releasing PubSub connection: ").append(redisClient.uri().underlying()).toString();
            }));
        });
    }

    public <F, K, V> FreeC<Nothing$, PubSubCommands<?, K, V>, BoxedUnit> mkPubSubConnection(RedisClient redisClient, RedisCodec<K, V> redisCodec, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, Log<F> log) {
        Tuple2<F, Function1<StatefulRedisPubSubConnection<K, V>, F>> acquireAndRelease = acquireAndRelease(redisClient, redisCodec, concurrentEffect, contextShift, log);
        if (acquireAndRelease == null) {
            throw new MatchError(acquireAndRelease);
        }
        Tuple2 tuple2 = new Tuple2(acquireAndRelease._1(), (Function1) acquireAndRelease._2());
        Object _1 = tuple2._1();
        Function1 function1 = (Function1) tuple2._2();
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(Ref$.MODULE$.of(Predef$.MODULE$.Map().empty(), concurrentEffect)), ref -> {
            return new Stream($anonfun$mkPubSubConnection$1(_1, function1, concurrentEffect, contextShift, log, ref));
        });
    }

    public <F, K, V> FreeC<Nothing$, PublishCommands<?, K, V>, BoxedUnit> mkPublisherConnection(RedisClient redisClient, RedisCodec<K, V> redisCodec, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, Log<F> log) {
        Tuple2<F, Function1<StatefulRedisPubSubConnection<K, V>, F>> acquireAndRelease = acquireAndRelease(redisClient, redisCodec, concurrentEffect, contextShift, log);
        if (acquireAndRelease == null) {
            throw new MatchError(acquireAndRelease);
        }
        Tuple2 tuple2 = new Tuple2(acquireAndRelease._1(), (Function1) acquireAndRelease._2());
        return Stream$.MODULE$.map$extension(Stream$.MODULE$.bracket(tuple2._1(), (Function1) tuple2._2()), statefulRedisPubSubConnection -> {
            return new Publisher(statefulRedisPubSubConnection, concurrentEffect, contextShift);
        });
    }

    public <F, K, V> FreeC<Nothing$, SubscribeCommands<?, K, V>, BoxedUnit> mkSubscriberConnection(RedisClient redisClient, RedisCodec<K, V> redisCodec, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, Log<F> log) {
        Tuple2<F, Function1<StatefulRedisPubSubConnection<K, V>, F>> acquireAndRelease = acquireAndRelease(redisClient, redisCodec, concurrentEffect, contextShift, log);
        if (acquireAndRelease == null) {
            throw new MatchError(acquireAndRelease);
        }
        Tuple2 tuple2 = new Tuple2(acquireAndRelease._1(), (Function1) acquireAndRelease._2());
        Object _1 = tuple2._1();
        Function1 function1 = (Function1) tuple2._2();
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(Ref$.MODULE$.of(Predef$.MODULE$.Map().empty(), concurrentEffect)), ref -> {
            return new Stream($anonfun$mkSubscriberConnection$1(_1, function1, concurrentEffect, contextShift, log, ref));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$mkPubSubConnection$3(Ref ref, StatefulRedisPubSubConnection statefulRedisPubSubConnection, ConcurrentEffect concurrentEffect, ContextShift contextShift, Log log, StatefulRedisPubSubConnection statefulRedisPubSubConnection2) {
        return Stream$.MODULE$.emit(new LivePubSubCommands(ref, statefulRedisPubSubConnection, statefulRedisPubSubConnection2, concurrentEffect, contextShift, log));
    }

    public static final /* synthetic */ FreeC $anonfun$mkPubSubConnection$2(Object obj, Function1 function1, Ref ref, ConcurrentEffect concurrentEffect, ContextShift contextShift, Log log, StatefulRedisPubSubConnection statefulRedisPubSubConnection) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.bracket(obj, function1), statefulRedisPubSubConnection2 -> {
            return new Stream($anonfun$mkPubSubConnection$3(ref, statefulRedisPubSubConnection, concurrentEffect, contextShift, log, statefulRedisPubSubConnection2));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$mkPubSubConnection$1(Object obj, Function1 function1, ConcurrentEffect concurrentEffect, ContextShift contextShift, Log log, Ref ref) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.bracket(obj, function1), statefulRedisPubSubConnection -> {
            return new Stream($anonfun$mkPubSubConnection$2(obj, function1, ref, concurrentEffect, contextShift, log, statefulRedisPubSubConnection));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$mkSubscriberConnection$1(Object obj, Function1 function1, ConcurrentEffect concurrentEffect, ContextShift contextShift, Log log, Ref ref) {
        return Stream$.MODULE$.map$extension(Stream$.MODULE$.bracket(obj, function1), statefulRedisPubSubConnection -> {
            return new Subscriber(ref, statefulRedisPubSubConnection, concurrentEffect, contextShift, log);
        });
    }

    private PubSub$() {
        MODULE$ = this;
    }
}
