package trace4cats.avro.kafka;

import cats.ApplicativeError;
import cats.Invariant$;
import cats.Show$;
import cats.UnorderedFoldable$;
import cats.data.NonEmptyList;
import cats.data.NonEmptyList$;
import cats.effect.kernel.Async;
import cats.effect.kernel.Sync;
import cats.effect.kernel.Sync$;
import cats.syntax.ApplicativeErrorOps$;
import cats.syntax.EitherOps$;
import cats.syntax.FoldableOps0$;
import cats.syntax.package$applicativeError$;
import cats.syntax.package$either$;
import cats.syntax.package$flatMap$;
import cats.syntax.package$foldable$;
import cats.syntax.package$functor$;
import cats.syntax.package$traverse$;
import fs2.Chunk;
import fs2.Chunk$;
import fs2.RaiseThrowable$;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$OptionStreamOps$;
import fs2.Stream$PartiallyAppliedFromEither$;
import fs2.kafka.AutoOffsetReset$;
import fs2.kafka.CommittableOffsetBatch$;
import fs2.kafka.ConsumerSettings;
import fs2.kafka.ConsumerSettings$;
import fs2.kafka.Deserializer;
import fs2.kafka.Deserializer$;
import fs2.kafka.KafkaConsumer;
import fs2.kafka.KafkaConsumer$;
import fs2.kafka.RecordDeserializer$;
import fs2.kafka.consumer.MkConsumer$;
import java.io.Serializable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.typelevel.log4cats.Logger;
import org.typelevel.log4cats.Logger$;
import org.typelevel.log4cats.slf4j.Slf4jLogger$;
import scala.Function1;
import scala.Option;
import scala.Option$;
import scala.Some$;
import scala.runtime.ModuleSerializationProxy;
import scala.runtime.ScalaRunTime$;
import scala.util.NotGiven$;
import trace4cats.avro.AvroInstances$;
import trace4cats.model.CompletedSpan;
import trace4cats.model.TraceId$;

/* compiled from: AvroKafkaConsumer.scala */
/* loaded from: input_file:trace4cats/avro/kafka/AvroKafkaConsumer$.class */
public final class AvroKafkaConsumer$ implements Serializable {
    public static final AvroKafkaConsumer$ MODULE$ = new AvroKafkaConsumer$();

    private AvroKafkaConsumer$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(AvroKafkaConsumer$.class);
    }

    public Deserializer keyDeserializer(Sync sync) {
        return Deserializer$.MODULE$.lift(bArr -> {
            return Sync$.MODULE$.apply(sync).delay(() -> {
                return r1.keyDeserializer$$anonfun$1$$anonfun$1(r2);
            });
        }, sync);
    }

    public <F> Deserializer<F, Option<CompletedSpan>> valueDeserializer(Schema schema, Sync<F> sync, Logger<F> logger) {
        return Deserializer$.MODULE$.lift(bArr -> {
            return package$traverse$.MODULE$.toTraverseOps(Option$.MODULE$.apply(bArr), UnorderedFoldable$.MODULE$.catsTraverseForOption()).flatTraverse(bArr -> {
                return ApplicativeErrorOps$.MODULE$.handleErrorWith$extension(package$applicativeError$.MODULE$.catsSyntaxApplicativeError(package$flatMap$.MODULE$.toFlatMapOps(Sync$.MODULE$.apply(sync).delay(() -> {
                    return r3.valueDeserializer$$anonfun$1$$anonfun$1$$anonfun$1(r4, r5);
                }), sync).flatMap(obj -> {
                    return Sync$.MODULE$.apply(sync).fromEither(EitherOps$.MODULE$.bimap$extension(package$either$.MODULE$.catsSyntaxEither(AvroInstances$.MODULE$.completedSpanCodec().decode(obj, schema)), avroError -> {
                        return avroError.throwable();
                    }, completedSpan -> {
                        return Some$.MODULE$.apply(completedSpan);
                    }));
                }), sync), th -> {
                    return package$functor$.MODULE$.toFunctorOps(Logger$.MODULE$.apply(logger).warn(th, this::valueDeserializer$$anonfun$1$$anonfun$1$$anonfun$3$$anonfun$1), sync).as(Option$.MODULE$.empty());
                }, sync);
            }, sync, Invariant$.MODULE$.catsInstancesForOption());
        }, sync);
    }

    public Stream apply(NonEmptyList nonEmptyList, String str, String str2, Function1 function1, Async async) {
        return Stream$.MODULE$.eval(Slf4jLogger$.MODULE$.create(async, "trace4cats.avro.kafka.AvroKafkaConsumer")).flatMap(selfAwareStructuredLogger -> {
            boolean fromEither = Stream$.MODULE$.fromEither();
            return Stream$PartiallyAppliedFromEither$.MODULE$.apply$extension(fromEither, EitherOps$.MODULE$.leftMap$extension(package$either$.MODULE$.catsSyntaxEither(AvroInstances$.MODULE$.completedSpanCodec().schema()), avroError -> {
                return avroError.throwable();
            }), RaiseThrowable$.MODULE$.fromApplicativeError(async)).flatMap(schema -> {
                Deserializer valueDeserializer = valueDeserializer(schema, async, selfAwareStructuredLogger);
                return KafkaConsumer$.MODULE$.stream((ConsumerSettings) function1.apply(ConsumerSettings$.MODULE$.apply(RecordDeserializer$.MODULE$.lift(async, keyDeserializer(async)), RecordDeserializer$.MODULE$.lift(async, valueDeserializer)).withBootstrapServers(FoldableOps0$.MODULE$.mkString_$extension((NonEmptyList) package$foldable$.MODULE$.catsSyntaxFoldableOps0(nonEmptyList), ",", Show$.MODULE$.catsShowForString(), NonEmptyList$.MODULE$.catsDataInstancesForNonEmptyList())).withGroupId(str).withAutoOffsetReset(AutoOffsetReset$.MODULE$.Latest())), async, MkConsumer$.MODULE$.mkConsumerForSync(async));
            }, NotGiven$.MODULE$.value()).flatMap(kafkaConsumer -> {
                return apply(kafkaConsumer, str2, async);
            }, NotGiven$.MODULE$.value());
        }, NotGiven$.MODULE$.value());
    }

    public Function1 apply$default$4() {
        return consumerSettings -> {
            return consumerSettings;
        };
    }

    public Stream apply(KafkaConsumer kafkaConsumer, String str, ApplicativeError applicativeError) {
        return Stream$OptionStreamOps$.MODULE$.unNone$extension(Stream$.MODULE$.OptionStreamOps(Stream$.MODULE$.eval(kafkaConsumer.subscribeTo(str, ScalaRunTime$.MODULE$.wrapRefArray(new String[0]))).flatMap(boxedUnit -> {
            return kafkaConsumer.stream();
        }, NotGiven$.MODULE$.value()).chunks().flatMap(chunk -> {
            return Stream$.MODULE$.eval(CommittableOffsetBatch$.MODULE$.fromFoldable(chunk.map(committableConsumerRecord -> {
                return committableConsumerRecord.offset();
            }), applicativeError, Chunk$.MODULE$.instance()).commit()).$greater$greater(() -> {
                return r1.apply$$anonfun$3$$anonfun$2(r2);
            }, NotGiven$.MODULE$.value());
        }, NotGiven$.MODULE$.value())));
    }

    private final Option keyDeserializer$$anonfun$1$$anonfun$1(byte[] bArr) {
        return Option$.MODULE$.apply(bArr).flatMap(bArr2 -> {
            return TraceId$.MODULE$.apply(bArr2);
        });
    }

    private final Object valueDeserializer$$anonfun$1$$anonfun$1$$anonfun$1(Schema schema, byte[] bArr) {
        return new GenericDatumReader(schema).read((Object) null, DecoderFactory.get().binaryDecoder(bArr, (BinaryDecoder) null));
    }

    private final String valueDeserializer$$anonfun$1$$anonfun$1$$anonfun$3$$anonfun$1() {
        return "Failed to decode span";
    }

    private final Stream apply$$anonfun$3$$anonfun$2(Chunk chunk) {
        return Stream$.MODULE$.chunk(chunk.map(committableConsumerRecord -> {
            return (Option) committableConsumerRecord.record().value();
        }));
    }
}
