package dev.profunktor.redis4cats.pubsub.internals;

import cats.effect.ConcurrentEffect;
import cats.effect.ContextShift;
import cats.effect.Sync$;
import cats.effect.concurrent.Ref;
import cats.effect.syntax.BracketOps$;
import cats.syntax.ApplicativeIdOps$;
import cats.syntax.package$all$;
import dev.profunktor.redis4cats.data;
import dev.profunktor.redis4cats.effect.JRFuture$;
import dev.profunktor.redis4cats.effect.Log;
import dev.profunktor.redis4cats.effect.RedisExecutor;
import dev.profunktor.redis4cats.pubsub.SubscribeCommands;
import fs2.Stream;
import fs2.Stream$;
import fs2.concurrent.Topic;
import fs2.internal.FreeC;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import scala.$less$colon$less$;
import scala.Function1;
import scala.Option;
import scala.collection.immutable.Map;
import scala.runtime.BoxedUnit;

/* compiled from: Subscriber.scala */
/* loaded from: input_file:dev/profunktor/redis4cats/pubsub/internals/Subscriber.class */
public class Subscriber<F, K, V> implements SubscribeCommands<Stream, K, V> {
    private final Ref<F, Map<K, Topic<F, Option<V>>>> state;
    private final StatefulRedisPubSubConnection<K, V> subConnection;
    private final ConcurrentEffect<F> evidence$1;
    private final ContextShift<F> evidence$2;
    private final RedisExecutor<F> evidence$3;
    private final Log<F> evidence$4;

    public <F, K, V> Subscriber(Ref<F, Map<K, Topic<F, Option<V>>>> ref, StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, RedisExecutor<F> redisExecutor, Log<F> log) {
        this.state = ref;
        this.subConnection = statefulRedisPubSubConnection;
        this.evidence$1 = concurrentEffect;
        this.evidence$2 = contextShift;
        this.evidence$3 = redisExecutor;
        this.evidence$4 = log;
    }

    @Override // dev.profunktor.redis4cats.pubsub.SubscribeCommands
    public Stream subscribe(Object obj) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(package$all$.MODULE$.toFlatMapOps(this.state.get(), this.evidence$1).flatMap(map -> {
            return package$all$.MODULE$.catsSyntaxApply(((Function1) PubSubInternals$.MODULE$.apply(this.state, this.subConnection, this.evidence$1, this.evidence$4).apply(new data.RedisChannel(obj))).apply(map), this.evidence$1).$less$times(JRFuture$.MODULE$.apply(Sync$.MODULE$.apply(this.evidence$1).delay(() -> {
                return r3.subscribe$$anonfun$2$$anonfun$1(r4);
            }), this.evidence$1, this.evidence$2, this.evidence$3));
        })), topic -> {
            return new Stream(subscribe$$anonfun$3(topic));
        });
    }

    @Override // dev.profunktor.redis4cats.pubsub.SubscribeCommands
    public Stream unsubscribe(Object obj) {
        return Stream$.MODULE$.eval(BracketOps$.MODULE$.guarantee$extension(cats.effect.implicits.package$.MODULE$.catsEffectSyntaxBracket(package$all$.MODULE$.toFunctorOps(JRFuture$.MODULE$.apply(Sync$.MODULE$.apply(this.evidence$1).delay(() -> {
            return r5.unsubscribe$$anonfun$1(r6);
        }), this.evidence$1, this.evidence$2, this.evidence$3), this.evidence$1).void(), this.evidence$1), package$all$.MODULE$.toFlatMapOps(this.state.get(), this.evidence$1).flatMap(map -> {
            return package$all$.MODULE$.catsSyntaxApply(map.get(obj).fold(this::unsubscribe$$anonfun$3$$anonfun$1, topic -> {
                return topic.publish1(package$all$.MODULE$.none());
            }), this.evidence$1).$times$greater(this.state.update(map -> {
                return map.$minus(obj);
            }));
        }), this.evidence$1));
    }

    @Override // dev.profunktor.redis4cats.pubsub.SubscribeCommands
    /* renamed from: subscribe, reason: avoid collision after fix types in other method */
    public /* bridge */ /* synthetic */ Stream subscribe2(Object obj) {
        return new Stream(subscribe(obj));
    }

    @Override // dev.profunktor.redis4cats.pubsub.SubscribeCommands
    /* renamed from: unsubscribe, reason: avoid collision after fix types in other method */
    public /* bridge */ /* synthetic */ Stream unsubscribe2(Object obj) {
        return new Stream(unsubscribe(obj));
    }

    private final RedisFuture subscribe$$anonfun$2$$anonfun$1(Object obj) {
        return this.subConnection.async().subscribe(new Object[]{obj});
    }

    private static final /* synthetic */ FreeC subscribe$$anonfun$3(Topic topic) {
        return Stream$.MODULE$.unNone$extension(topic.subscribe(500), $less$colon$less$.MODULE$.refl());
    }

    private final RedisFuture unsubscribe$$anonfun$1(Object obj) {
        return this.subConnection.async().unsubscribe(new Object[]{obj});
    }

    private final Object unsubscribe$$anonfun$3$$anonfun$1() {
        return ApplicativeIdOps$.MODULE$.pure$extension((BoxedUnit) package$all$.MODULE$.catsSyntaxApplicativeId(BoxedUnit.UNIT), this.evidence$1);
    }
}
