package dev.profunktor.redis4cats.pubsub.internals;

import cats.Applicative;
import cats.Applicative$;
import cats.effect.ConcurrentEffect;
import cats.effect.Sync$;
import cats.effect.concurrent.Ref;
import cats.effect.syntax.package$effect$;
import cats.syntax.package$all$;
import dev.profunktor.redis4cats.data;
import dev.profunktor.redis4cats.effect.Log;
import dev.profunktor.redis4cats.effect.Log$;
import fs2.concurrent.Topic;
import fs2.concurrent.Topic$;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.collection.immutable.Map;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Either;

/* compiled from: PubSubInternals.scala */
/* loaded from: input_file:dev/profunktor/redis4cats/pubsub/internals/PubSubInternals$.class */
public final class PubSubInternals$ {
    public static PubSubInternals$ MODULE$;

    static {
        new PubSubInternals$();
    }

    public <F, K, V> RedisPubSubListener<K, V> defaultListener(final K k, final Topic<F, Option<V>> topic, final ConcurrentEffect<F> concurrentEffect) {
        return new RedisPubSubListener<K, V>(k, topic, concurrentEffect) { // from class: dev.profunktor.redis4cats.pubsub.internals.PubSubInternals$$anon$1
            private final Object channel$1;
            private final Topic topic$1;
            private final ConcurrentEffect evidence$1$1;

            public void message(K k2, V v) {
                if (BoxesRunTime.equals(k2, this.channel$1)) {
                    package$effect$.MODULE$.toEffectOps(this.topic$1.publish1(Option$.MODULE$.apply(v)), this.evidence$1$1).toIO().unsafeRunAsync(either -> {
                        $anonfun$message$1(either);
                        return BoxedUnit.UNIT;
                    });
                }
            }

            public void message(K k2, K k3, V v) {
                message(k3, v);
            }

            public void psubscribed(K k2, long j) {
            }

            public void subscribed(K k2, long j) {
            }

            public void unsubscribed(K k2, long j) {
            }

            public void punsubscribed(K k2, long j) {
            }

            public static final /* synthetic */ void $anonfun$message$1(Either either) {
            }

            {
                this.channel$1 = k;
                this.topic$1 = topic;
                this.evidence$1$1 = concurrentEffect;
            }
        };
    }

    public <F, K, V> Function1<data.RedisChannel<K>, Function1<Map<K, Topic<F, Option<V>>>, F>> apply(Ref<F, Map<K, Topic<F, Option<V>>>> ref, StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, ConcurrentEffect<F> concurrentEffect, Log<F> log) {
        return obj -> {
            return $anonfun$apply$1(concurrentEffect, log, statefulRedisPubSubConnection, ref, ((data.RedisChannel) obj).underlying());
        };
    }

    public static final /* synthetic */ Function1 $anonfun$apply$1(ConcurrentEffect concurrentEffect, Log log, StatefulRedisPubSubConnection statefulRedisPubSubConnection, Ref ref, Object obj) {
        return map -> {
            Option option = map.get(obj);
            Function0 function0 = () -> {
                return package$all$.MODULE$.toFlatMapOps(Topic$.MODULE$.apply(None$.MODULE$, concurrentEffect), concurrentEffect).flatTap(topic -> {
                    RedisPubSubListener defaultListener = MODULE$.defaultListener(obj, topic, concurrentEffect);
                    return package$all$.MODULE$.catsSyntaxApply(package$all$.MODULE$.catsSyntaxApply(Log$.MODULE$.apply(log).info(() -> {
                        return new StringBuilder(31).append("Creating listener for channel: ").append(new data.RedisChannel(obj)).toString();
                    }), concurrentEffect).$times$greater(Sync$.MODULE$.apply(concurrentEffect).delay(() -> {
                        statefulRedisPubSubConnection.addListener(defaultListener);
                    })), concurrentEffect).$times$greater(ref.update(map -> {
                        return map.updated(obj, topic);
                    }));
                });
            };
            Applicative apply = Applicative$.MODULE$.apply(concurrentEffect);
            return option.fold(function0, topic -> {
                return apply.pure(topic);
            });
        };
    }

    private PubSubInternals$() {
        MODULE$ = this;
    }
}
