package dev.profunktor.redis4cats.streams;

import cats.UnorderedFoldable$;
import cats.effect.Concurrent;
import cats.effect.ContextShift;
import cats.effect.Resource;
import cats.effect.concurrent.Ref;
import cats.effect.concurrent.Ref$;
import cats.syntax.package$all$;
import dev.profunktor.redis4cats.connection.RedisClient;
import dev.profunktor.redis4cats.effect.Log;
import dev.profunktor.redis4cats.streams.data;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$PartiallyAppliedFromIterator$;
import fs2.internal.FreeC;
import io.lettuce.core.codec.RedisCodec;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.IterableOnceOps;
import scala.collection.MapOps;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;

/* compiled from: Fs2Streaming.scala */
/* loaded from: input_file:dev/profunktor/redis4cats/streams/RedisStream.class */
public class RedisStream<F, K, V> implements Streaming<Stream, K, V> {
    private final RedisRawStreaming<F, K, V> rawStreaming;
    private final Concurrent<F> evidence$1;
    private final Function1 nextOffset = obj -> {
        return xReadMessage -> {
            return data$StreamingOffset$Custom$.MODULE$.apply(obj, xReadMessage.id());
        };
    };
    private final Function1 offsetsByKey = list -> {
        return list.groupBy(xReadMessage -> {
            return xReadMessage.key();
        }).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Object _1 = tuple2._1();
            List list = (List) tuple2._2();
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(_1), list.lastOption().map((Function1) nextOffset().apply(_1)));
        });
    };

    public static FreeC mkMasterReplicaConnection(RedisCodec redisCodec, Seq seq, Option option, Concurrent concurrent, ContextShift contextShift, Log log) {
        return RedisStream$.MODULE$.mkMasterReplicaConnection(redisCodec, seq, option, concurrent, contextShift, log);
    }

    public static Resource mkMasterReplicaConnectionResource(RedisCodec redisCodec, Seq seq, Option option, Concurrent concurrent, ContextShift contextShift, Log log) {
        return RedisStream$.MODULE$.mkMasterReplicaConnectionResource(redisCodec, seq, option, concurrent, contextShift, log);
    }

    public static FreeC mkStreamingConnection(RedisClient redisClient, RedisCodec redisCodec, Concurrent concurrent, ContextShift contextShift, Log log) {
        return RedisStream$.MODULE$.mkStreamingConnection(redisClient, redisCodec, concurrent, contextShift, log);
    }

    public static Resource mkStreamingConnectionResource(RedisClient redisClient, RedisCodec redisCodec, Concurrent concurrent, ContextShift contextShift, Log log) {
        return RedisStream$.MODULE$.mkStreamingConnectionResource(redisClient, redisCodec, concurrent, contextShift, log);
    }

    public <F, K, V> RedisStream(RedisRawStreaming<F, K, V> redisRawStreaming, Concurrent<F> concurrent) {
        this.rawStreaming = redisRawStreaming;
        this.evidence$1 = concurrent;
    }

    @Override // dev.profunktor.redis4cats.streams.Streaming
    public /* bridge */ /* synthetic */ Function1 read$default$2() {
        Function1 read$default$2;
        read$default$2 = read$default$2();
        return read$default$2;
    }

    public Function1<K, Function1<data.XReadMessage<K, V>, data.StreamingOffset<K>>> nextOffset() {
        return this.nextOffset;
    }

    public Function1<List<data.XReadMessage<K, V>>, Map<K, Option<data.StreamingOffset<K>>>> offsetsByKey() {
        return this.offsetsByKey;
    }

    @Override // dev.profunktor.redis4cats.streams.Streaming
    public Function1<Stream, Stream> append() {
        return this::append$$anonfun$adapted$1;
    }

    @Override // dev.profunktor.redis4cats.streams.Streaming
    public Stream read(Set<K> set, Function1<K, data.StreamingOffset<K>> function1) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(Ref$.MODULE$.of(((IterableOnceOps) set.map(obj -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(obj), function1.apply(obj));
        })).toMap($less$colon$less$.MODULE$.refl()), this.evidence$1)), this::read$$anonfun$adapted$1);
    }

    @Override // dev.profunktor.redis4cats.streams.Streaming
    /* renamed from: read, reason: avoid collision after fix types in other method */
    public /* bridge */ /* synthetic */ Stream read2(Set set, Function1 function1) {
        return new Stream(read(set, function1));
    }

    private final /* synthetic */ FreeC append$$anonfun$2(FreeC freeC) {
        return Stream$.MODULE$.evalMap$extension(freeC, xAddMessage -> {
            return this.rawStreaming.xAdd(xAddMessage.key(), xAddMessage.body(), xAddMessage.approxMaxlen());
        });
    }

    private final Object append$$anonfun$adapted$1(Object obj) {
        return new Stream(append$$anonfun$2(obj == null ? null : ((Stream) obj).fs2$Stream$$free()));
    }

    private final /* synthetic */ FreeC read$$anonfun$5$$anonfun$5$$anonfun$5$$anonfun$4(List list, List list2) {
        return Stream$.MODULE$.map$extension(Stream$PartiallyAppliedFromIterator$.MODULE$.apply$extension(Stream$.MODULE$.fromIterator(), list.iterator(), this.evidence$1), xReadMessage -> {
            return xReadMessage;
        });
    }

    private final Object read$$anonfun$6$$anonfun$6$$anonfun$6$$anonfun$adapted$1(List list, List list2) {
        return new Stream(read$$anonfun$5$$anonfun$5$$anonfun$5$$anonfun$4(list, list2));
    }

    private final /* synthetic */ FreeC read$$anonfun$7$$anonfun$7$$anonfun$7(Ref ref, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        List list = (List) tuple2._1();
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(package$all$.MODULE$.toTraverseOps(((List) tuple2._2()).map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError(tuple22);
            }
            Object _1 = tuple22._1();
            data.StreamingOffset streamingOffset = (data.StreamingOffset) tuple22._2();
            return ref.update(map -> {
                return map.updated(_1, streamingOffset);
            });
        }), UnorderedFoldable$.MODULE$.catsTraverseForList()).sequence($less$colon$less$.MODULE$.refl(), this.evidence$1)), (v2) -> {
            return read$$anonfun$6$$anonfun$6$$anonfun$6$$anonfun$adapted$1(r3, v2);
        });
    }

    private final Object read$$anonfun$8$$anonfun$8$$anonfun$adapted$1(Ref ref, Tuple2 tuple2) {
        return new Stream(read$$anonfun$7$$anonfun$7$$anonfun$7(ref, tuple2));
    }

    private final /* synthetic */ FreeC read$$anonfun$9$$anonfun$9(Ref ref, Map map) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.map$extension(Stream$.MODULE$.eval(this.rawStreaming.xRead(map.values().toSet())), list -> {
            return Tuple2$.MODULE$.apply(list, ((MapOps) offsetsByKey().apply(list)).collect(new RedisStream$$anon$1()).toList());
        }), (v2) -> {
            return read$$anonfun$8$$anonfun$8$$anonfun$adapted$1(r3, v2);
        });
    }

    private final Object $anonfun$adapted$1(Ref ref, Map map) {
        return new Stream(read$$anonfun$9$$anonfun$9(ref, map));
    }

    private final /* synthetic */ FreeC read$$anonfun$10(Ref ref) {
        return new Stream(Stream$.MODULE$.repeat$extension(Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(ref.get()), (v2) -> {
            return $anonfun$adapted$1(r3, v2);
        }))).fs2$Stream$$free();
    }

    private final Object read$$anonfun$adapted$1(Ref ref) {
        return new Stream(read$$anonfun$10(ref));
    }
}
