package dev.profunktor.redis4cats.streams;

import cats.UnorderedFoldable$;
import cats.effect.kernel.Async;
import cats.effect.kernel.Ref$;
import cats.effect.kernel.Ref$Make$;
import cats.effect.kernel.Resource;
import cats.effect.kernel.Sync;
import cats.syntax.package$all$;
import dev.profunktor.redis4cats.connection.RedisClient;
import dev.profunktor.redis4cats.connection.RedisURI;
import dev.profunktor.redis4cats.effect.Log;
import dev.profunktor.redis4cats.streams.data;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$PartiallyAppliedFromIterator$;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.codec.RedisCodec;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some$;
import scala.Tuple2$;
import scala.collection.IterableOnceOps;
import scala.collection.MapOps;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.util.NotGiven$;

/* compiled from: Fs2Streaming.scala */
/* loaded from: input_file:dev/profunktor/redis4cats/streams/RedisStream.class */
public class RedisStream<F, K, V> implements Streaming<?, K, V> {
    private final RedisRawStreaming<F, K, V> rawStreaming;
    private final Sync<F> evidence$1;
    private final Function1 nextOffset = obj -> {
        return xReadMessage -> {
            return data$StreamingOffset$Custom$.MODULE$.apply(obj, xReadMessage.id());
        };
    };
    private final Function1 offsetsByKey = list -> {
        return list.groupBy(xReadMessage -> {
            return xReadMessage.key();
        }).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Object _1 = tuple2._1();
            List list = (List) tuple2._2();
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(_1), list.lastOption().map((Function1) nextOffset().apply(_1)));
        });
    };

    public static <F, K, V> Stream<F, Streaming<?, K, V>> mkMasterReplicaConnection(RedisCodec redisCodec, Seq<RedisURI> seq, Option<ReadFrom> option, Async<F> async, Log<F> log) {
        return RedisStream$.MODULE$.mkMasterReplicaConnection(redisCodec, seq, option, async, log);
    }

    public static <F, K, V> Resource<F, Streaming<?, K, V>> mkMasterReplicaConnectionResource(RedisCodec redisCodec, Seq<RedisURI> seq, Option<ReadFrom> option, Async<F> async, Log<F> log) {
        return RedisStream$.MODULE$.mkMasterReplicaConnectionResource(redisCodec, seq, option, async, log);
    }

    public static <F, K, V> Stream<F, Streaming<?, K, V>> mkStreamingConnection(RedisClient redisClient, RedisCodec redisCodec, Async<F> async, Log<F> log) {
        return RedisStream$.MODULE$.mkStreamingConnection(redisClient, redisCodec, async, log);
    }

    public static <F, K, V> Resource<F, Streaming<?, K, V>> mkStreamingConnectionResource(RedisClient redisClient, RedisCodec redisCodec, Async<F> async, Log<F> log) {
        return RedisStream$.MODULE$.mkStreamingConnectionResource(redisClient, redisCodec, async, log);
    }

    public RedisStream(RedisRawStreaming<F, K, V> redisRawStreaming, Sync<F> sync) {
        this.rawStreaming = redisRawStreaming;
        this.evidence$1 = sync;
    }

    @Override // dev.profunktor.redis4cats.streams.Streaming
    public /* bridge */ /* synthetic */ Function1 read$default$3() {
        Function1 read$default$3;
        read$default$3 = read$default$3();
        return read$default$3;
    }

    public Function1<K, Function1<data.XReadMessage<K, V>, data.StreamingOffset<K>>> nextOffset() {
        return this.nextOffset;
    }

    public Function1<List<data.XReadMessage<K, V>>, Map<K, Option<data.StreamingOffset<K>>>> offsetsByKey() {
        return this.offsetsByKey;
    }

    @Override // dev.profunktor.redis4cats.streams.Streaming
    public Function1<?, ?> append() {
        return stream -> {
            return stream.evalMap(xAddMessage -> {
                return this.rawStreaming.xAdd(xAddMessage.key(), xAddMessage.body(), xAddMessage.approxMaxlen(), xAddMessage.minId());
            });
        };
    }

    @Override // dev.profunktor.redis4cats.streams.Streaming
    public Object read(Set<K> set, int i, Function1<K, data.StreamingOffset<K>> function1, Option<Duration> option, Option<Object> option2) {
        return Stream$.MODULE$.eval(Ref$.MODULE$.of(((IterableOnceOps) set.map(obj -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(obj), function1.apply(obj));
        })).toMap($less$colon$less$.MODULE$.refl()), Ref$Make$.MODULE$.syncInstance(this.evidence$1))).flatMap(ref -> {
            return Stream$.MODULE$.eval(ref.get()).flatMap(map -> {
                return Stream$.MODULE$.eval(this.rawStreaming.xRead(map.values().toSet(), option, option2)).map(list -> {
                    return Tuple2$.MODULE$.apply(list, ((MapOps) offsetsByKey().apply(list)).collect(new RedisStream$$anon$1()).toList());
                }).flatMap(tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError(tuple2);
                    }
                    List list2 = (List) tuple2._1();
                    return Stream$.MODULE$.eval(package$all$.MODULE$.toTraverseOps(((List) tuple2._2()).map(tuple2 -> {
                        if (tuple2 == null) {
                            throw new MatchError(tuple2);
                        }
                        Object _1 = tuple2._1();
                        data.StreamingOffset streamingOffset = (data.StreamingOffset) tuple2._2();
                        return ref.update(map -> {
                            return map.updated(_1, streamingOffset);
                        });
                    }), UnorderedFoldable$.MODULE$.catsTraverseForList()).sequence($less$colon$less$.MODULE$.refl(), this.evidence$1)).flatMap(list3 -> {
                        return Stream$PartiallyAppliedFromIterator$.MODULE$.apply$extension(Stream$.MODULE$.fromIterator(), list2.iterator(), i, this.evidence$1).map(xReadMessage -> {
                            return xReadMessage;
                        });
                    }, NotGiven$.MODULE$.value());
                }, NotGiven$.MODULE$.value());
            }, NotGiven$.MODULE$.value()).repeat();
        }, NotGiven$.MODULE$.value());
    }

    @Override // dev.profunktor.redis4cats.streams.Streaming
    public Option<Duration> read$default$4() {
        return Some$.MODULE$.apply(Duration$.MODULE$.Zero());
    }

    @Override // dev.profunktor.redis4cats.streams.Streaming
    public Option<Object> read$default$5() {
        return None$.MODULE$;
    }

    @Override // dev.profunktor.redis4cats.streams.Streaming
    /* renamed from: read, reason: avoid collision after fix types in other method */
    public /* bridge */ /* synthetic */ Object read2(Set set, int i, Function1 function1, Option option, Option option2) {
        return read(set, i, function1, (Option<Duration>) option, (Option<Object>) option2);
    }
}
