package io.janstenpickle.trace4cats.avro.kafka;

import cats.Invariant$;
import cats.Show$;
import cats.UnorderedFoldable$;
import cats.data.NonEmptyList;
import cats.data.NonEmptyList$;
import cats.effect.Concurrent;
import cats.effect.ConcurrentEffect;
import cats.effect.ContextShift;
import cats.effect.Sync;
import cats.effect.Sync$;
import cats.effect.Timer;
import cats.syntax.ApplicativeErrorOps$;
import cats.syntax.EitherOps$;
import cats.syntax.FoldableOps0$;
import cats.syntax.package$applicativeError$;
import cats.syntax.package$either$;
import cats.syntax.package$flatMap$;
import cats.syntax.package$foldable$;
import cats.syntax.package$functor$;
import cats.syntax.package$traverse$;
import fs2.Chunk;
import fs2.Chunk$;
import fs2.RaiseThrowable$;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$PartiallyAppliedFromEither$;
import fs2.internal.FreeC;
import fs2.kafka.AutoOffsetReset$;
import fs2.kafka.CommittableOffsetBatch$;
import fs2.kafka.ConsumerSettings;
import fs2.kafka.ConsumerSettings$;
import fs2.kafka.Deserializer;
import fs2.kafka.Deserializer$;
import fs2.kafka.KafkaConsumer;
import fs2.kafka.KafkaConsumer$;
import fs2.kafka.RecordDeserializer$;
import io.chrisdavenport.log4cats.Logger;
import io.chrisdavenport.log4cats.Logger$;
import io.chrisdavenport.log4cats.SelfAwareStructuredLogger;
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger$;
import io.janstenpickle.trace4cats.avro.AvroInstances$;
import io.janstenpickle.trace4cats.model.CompletedSpan;
import io.janstenpickle.trace4cats.model.TraceId;
import io.janstenpickle.trace4cats.model.TraceId$;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.slf4j.LoggerFactory;
import scala.$less$colon$less$;
import scala.Function1;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.collection.immutable.Nil$;
import scala.runtime.BoxedUnit;

/* compiled from: AvroKafkaConsumer.scala */
/* loaded from: input_file:io/janstenpickle/trace4cats/avro/kafka/AvroKafkaConsumer$.class */
public final class AvroKafkaConsumer$ {
    public static final AvroKafkaConsumer$ MODULE$ = new AvroKafkaConsumer$();

    public <F> Deserializer<F, Option<TraceId>> keyDeserializer(Sync<F> sync) {
        return Deserializer$.MODULE$.lift(bArr -> {
            return Sync$.MODULE$.apply(sync).delay(() -> {
                return Option$.MODULE$.apply(bArr).flatMap(bArr -> {
                    return TraceId$.MODULE$.apply(bArr);
                });
            });
        }, sync);
    }

    public <F> Deserializer<F, Option<CompletedSpan>> valueDeserializer(Schema schema, Sync<F> sync, Logger<F> logger) {
        return Deserializer$.MODULE$.lift(bArr -> {
            return package$traverse$.MODULE$.toTraverseOps(Option$.MODULE$.apply(bArr), UnorderedFoldable$.MODULE$.catsTraverseForOption()).flatTraverse(bArr -> {
                return ApplicativeErrorOps$.MODULE$.handleErrorWith$extension(package$applicativeError$.MODULE$.catsSyntaxApplicativeError(package$flatMap$.MODULE$.toFlatMapOps(Sync$.MODULE$.apply(sync).delay(() -> {
                    return new GenericDatumReader(schema).read((Object) null, DecoderFactory.get().binaryDecoder(bArr, (BinaryDecoder) null));
                }), sync).flatMap(obj -> {
                    return Sync$.MODULE$.apply(sync).fromEither(EitherOps$.MODULE$.bimap$extension(package$either$.MODULE$.catsSyntaxEither(AvroInstances$.MODULE$.completedSpanCodec().decode(obj, schema)), avroError -> {
                        return avroError.throwable();
                    }, completedSpan -> {
                        return new Some(completedSpan);
                    }));
                }), sync), th -> {
                    return package$functor$.MODULE$.toFunctorOps(Logger$.MODULE$.apply(logger).warn(th, () -> {
                        return "Failed to decode span";
                    }), sync).as(Option$.MODULE$.empty());
                }, sync);
            }, sync, Invariant$.MODULE$.catsInstancesForOption());
        }, sync);
    }

    public <F> FreeC<F, CompletedSpan, BoxedUnit> apply(NonEmptyList<String> nonEmptyList, String str, String str2, Function1<ConsumerSettings<F, Option<TraceId>, Option<CompletedSpan>>, ConsumerSettings<F, Option<TraceId>, Option<CompletedSpan>>> function1, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, Timer<F> timer) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(Sync$.MODULE$.apply(concurrentEffect).delay(() -> {
            return Slf4jLogger$.MODULE$.getLoggerFromSlf4j(LoggerFactory.getLogger("io.janstenpickle.trace4cats.avro.kafka.AvroKafkaConsumer"), concurrentEffect);
        })), selfAwareStructuredLogger -> {
            return new Stream($anonfun$apply$2(concurrentEffect, function1, nonEmptyList, str, contextShift, timer, str2, selfAwareStructuredLogger));
        });
    }

    public <F> FreeC<F, CompletedSpan, BoxedUnit> apply(KafkaConsumer<F, Option<TraceId>, Option<CompletedSpan>> kafkaConsumer, String str, Concurrent<F> concurrent, Timer<F> timer) {
        return Stream$.MODULE$.unNone$extension(Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.chunks$extension(Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(kafkaConsumer.subscribeTo(str, Nil$.MODULE$)), boxedUnit -> {
            return new Stream(kafkaConsumer.stream());
        })), chunk -> {
            return new Stream($anonfun$apply$7(concurrent, chunk));
        }), $less$colon$less$.MODULE$.refl());
    }

    public <F> Function1<ConsumerSettings<F, Option<TraceId>, Option<CompletedSpan>>, ConsumerSettings<F, Option<TraceId>, Option<CompletedSpan>>> apply$default$4() {
        return consumerSettings -> {
            return consumerSettings;
        };
    }

    public static final /* synthetic */ FreeC $anonfun$apply$4(ConcurrentEffect concurrentEffect, SelfAwareStructuredLogger selfAwareStructuredLogger, Function1 function1, NonEmptyList nonEmptyList, String str, ContextShift contextShift, Timer timer, Schema schema) {
        return KafkaConsumer$.MODULE$.stream((ConsumerSettings) function1.apply(ConsumerSettings$.MODULE$.apply(concurrentEffect, RecordDeserializer$.MODULE$.lift(concurrentEffect, MODULE$.keyDeserializer(concurrentEffect)), RecordDeserializer$.MODULE$.lift(concurrentEffect, MODULE$.valueDeserializer(schema, concurrentEffect, selfAwareStructuredLogger))).withBootstrapServers(FoldableOps0$.MODULE$.mkString_$extension(package$foldable$.MODULE$.catsSyntaxFoldableOps0(nonEmptyList), ",", Show$.MODULE$.catsShowForString(), NonEmptyList$.MODULE$.catsDataInstancesForNonEmptyList())).withGroupId(str).withAutoOffsetReset(AutoOffsetReset$.MODULE$.Latest())), concurrentEffect, contextShift, timer);
    }

    public static final /* synthetic */ FreeC $anonfun$apply$5(String str, ConcurrentEffect concurrentEffect, Timer timer, KafkaConsumer kafkaConsumer) {
        return MODULE$.apply(kafkaConsumer, str, concurrentEffect, timer);
    }

    public static final /* synthetic */ FreeC $anonfun$apply$2(ConcurrentEffect concurrentEffect, Function1 function1, NonEmptyList nonEmptyList, String str, ContextShift contextShift, Timer timer, String str2, SelfAwareStructuredLogger selfAwareStructuredLogger) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.flatMap$extension(Stream$PartiallyAppliedFromEither$.MODULE$.apply$extension(Stream$.MODULE$.fromEither(), EitherOps$.MODULE$.leftMap$extension(package$either$.MODULE$.catsSyntaxEither(AvroInstances$.MODULE$.completedSpanCodec().schema()), avroError -> {
            return avroError.throwable();
        }), RaiseThrowable$.MODULE$.fromApplicativeError(concurrentEffect)), schema -> {
            return new Stream($anonfun$apply$4(concurrentEffect, selfAwareStructuredLogger, function1, nonEmptyList, str, contextShift, timer, schema));
        }), kafkaConsumer -> {
            return new Stream($anonfun$apply$5(str2, concurrentEffect, timer, kafkaConsumer));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$apply$9(Chunk chunk) {
        return Stream$.MODULE$.chunk(chunk.map(committableConsumerRecord -> {
            return (Option) committableConsumerRecord.record().value();
        }));
    }

    public static final /* synthetic */ FreeC $anonfun$apply$7(Concurrent concurrent, Chunk chunk) {
        return Stream$.MODULE$.$greater$greater$extension(Stream$.MODULE$.eval(CommittableOffsetBatch$.MODULE$.fromFoldable(chunk.map(committableConsumerRecord -> {
            return committableConsumerRecord.offset();
        }), concurrent, Chunk$.MODULE$.instance()).commit()), () -> {
            return new Stream($anonfun$apply$9(chunk));
        });
    }

    private AvroKafkaConsumer$() {
    }
}
