package kamon.instrumentation.kafka.client;

import com.typesafe.config.Config;
import kamon.Kamon$;
import kamon.context.Context;
import kamon.context.Storage;
import kamon.instrumentation.kafka.client.KafkaInstrumentation;
import kamon.trace.Span;
import kamon.trace.Span$;
import kamon.trace.Span$Link$Kind$FollowsFrom$;
import kamon.trace.SpanBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import scala.Function0;
import scala.Option;
import scala.Option$;
import scala.runtime.BoxedUnit;
import scala.util.control.NonFatal$;

/* compiled from: KafkaInstrumentation.scala */
/* loaded from: input_file:kamon/instrumentation/kafka/client/KafkaInstrumentation$.class */
public final class KafkaInstrumentation$ {
    public static final KafkaInstrumentation$ MODULE$ = new KafkaInstrumentation$();
    private static volatile KafkaInstrumentation.Settings _settings = MODULE$.readSettings(Kamon$.MODULE$.config());

    static {
        Kamon$.MODULE$.onReconfigure(config -> {
            $anonfun$new$1(config);
            return BoxedUnit.UNIT;
        });
    }

    private KafkaInstrumentation.Settings _settings() {
        return _settings;
    }

    private void _settings_$eq(KafkaInstrumentation.Settings settings) {
        _settings = settings;
    }

    public KafkaInstrumentation.Settings settings() {
        return _settings();
    }

    private KafkaInstrumentation.Settings readSettings(Config config) {
        Config config2 = config.getConfig("kamon.instrumentation.kafka.client");
        return new KafkaInstrumentation.Settings(config2.getBoolean("tracing.start-trace-on-producer"), config2.getBoolean("tracing.continue-trace-on-consumer"), config2.getBoolean("tracing.use-delayed-spans"));
    }

    public <K, V> Context extractContext(ConsumerRecord<K, V> consumerRecord) {
        return KafkaInstrumentation$Syntax$.MODULE$.context$extension(Syntax(consumerRecord));
    }

    public ConsumerRecord<?, ?> Syntax(ConsumerRecord<?, ?> consumerRecord) {
        return consumerRecord;
    }

    public <T> T runWithConsumerSpan(ConsumerRecord<?, ?> consumerRecord, Function0<T> function0) {
        return (T) runWithConsumerSpan(consumerRecord, "consumer.process", true, function0);
    }

    public <T> T runWithConsumerSpan(ConsumerRecord<?, ?> consumerRecord, String str, Function0<T> function0) {
        return (T) runWithConsumerSpan(consumerRecord, str, true, function0);
    }

    public <T> T runWithConsumerSpan(ConsumerRecord<?, ?> consumerRecord, String str, boolean z, Function0<T> function0) {
        Context context$extension = KafkaInstrumentation$Syntax$.MODULE$.context$extension(Syntax(consumerRecord));
        Context currentContext = context$extension.nonEmpty() ? context$extension : Kamon$.MODULE$.currentContext();
        Span consumerSpan = consumerSpan(consumerRecord, str);
        Storage.Scope storeContext = Kamon$.MODULE$.storeContext(currentContext.withEntry(Span$.MODULE$.Key(), consumerSpan));
        try {
            try {
                return (T) function0.apply();
            } catch (Throwable th) {
                if (th != null) {
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (!unapply.isEmpty()) {
                        Throwable th2 = (Throwable) unapply.get();
                        consumerSpan.fail(th2.getMessage(), th2);
                        throw th2;
                    }
                }
                throw th;
            }
        } finally {
            if (z) {
                consumerSpan.finish();
            }
            storeContext.close();
        }
    }

    public Span consumerSpan(ConsumerRecord<?, ?> consumerRecord) {
        return consumerSpan(consumerRecord, "consumer.process");
    }

    public Span consumerSpan(ConsumerRecord<?, ?> consumerRecord, String str) {
        SpanBuilder tag = Kamon$.MODULE$.consumerSpanBuilder(str, "kafka.consumer").tag("kafka.topic", consumerRecord.topic()).tag("kafka.partition", consumerRecord.partition()).tag("kafka.offset", consumerRecord.offset()).tag("kafka.timestamp", consumerRecord.timestamp()).tag("kafka.timestamp-type", consumerRecord.timestampType().name);
        Option$.MODULE$.apply(consumerRecord.key()).foreach(obj -> {
            return tag.tag("kafka.key", obj.toString());
        });
        if (consumerRecord instanceof ConsumedRecordData) {
            ConsumedRecordData consumedRecordData = (ConsumedRecordData) consumerRecord;
            Span span = (Span) consumedRecordData.incomingContext().get(Span$.MODULE$.Key());
            tag.tag("kafka.group-id", (String) consumedRecordData.consumerInfo().groupId().getOrElse(() -> {
                return "unknown";
            })).tag("kafka.client-id", consumedRecordData.consumerInfo().clientId()).tag("kafka.poll-time", consumedRecordData.nanosSincePollStart());
            if (span.isEmpty()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else if (settings().continueTraceOnConsumer()) {
                tag.asChildOf(span);
            } else {
                tag.link(span, Span$Link$Kind$FollowsFrom$.MODULE$);
            }
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        return settings().useDelayedSpans() ? tag.delay(Kamon$.MODULE$.clock().toInstant(consumerRecord.timestamp() * 1000000)).start() : tag.start();
    }

    public static final /* synthetic */ void $anonfun$new$1(Config config) {
        MODULE$._settings_$eq(MODULE$.readSettings(config));
    }

    private KafkaInstrumentation$() {
    }
}
