package datadog.trace.instrumentation.kafka_clients;

import datadog.trace.api.Config;
import datadog.trace.api.Functions;
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.naming.SpanNaming;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.MessagingClientDecorator;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.record.TimestampType;

/* loaded from: input_file:inst/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.classdata */
public class KafkaDecorator extends MessagingClientDecorator {
    public static final boolean TIME_IN_QUEUE_ENABLED;
    public static final String KAFKA_PRODUCED_KEY = "x_datadog_kafka_produced";
    private final String spanKind;
    private final CharSequence spanType;
    private final String serviceName;
    private static final DDCache<CharSequence, CharSequence> PRODUCER_RESOURCE_NAME_CACHE;
    private static final Functions.Prefix PRODUCER_PREFIX;
    private static final DDCache<CharSequence, CharSequence> CONSUMER_RESOURCE_NAME_CACHE;
    private static final DDCache<ProducerConfig, CharSequence> PRODUCER_BOOSTRAP_SERVERS_CACHE;
    private static final Function<ProducerConfig, CharSequence> BOOTSTRAP_SERVERS_JOINER;
    private static final Functions.Prefix CONSUMER_PREFIX;
    public static final KafkaDecorator PRODUCER_DECORATE;
    public static final KafkaDecorator CONSUMER_DECORATE;
    public static final KafkaDecorator BROKER_DECORATE;
    public static final CharSequence JAVA_KAFKA = UTF8BytesString.create("java-kafka");
    private static final String KAFKA = "kafka";
    public static final CharSequence KAFKA_CONSUME = UTF8BytesString.create(SpanNaming.instance().namingSchema().messaging().inboundOperation(KAFKA));
    public static final CharSequence KAFKA_POLL = UTF8BytesString.create("kafka.poll");
    public static final CharSequence KAFKA_PRODUCE = UTF8BytesString.create(SpanNaming.instance().namingSchema().messaging().outboundOperation(KAFKA));
    public static final CharSequence KAFKA_DELIVER = UTF8BytesString.create("kafka.deliver");
    public static final boolean KAFKA_LEGACY_TRACING = Config.get().isKafkaLegacyTracingEnabled();

    protected KafkaDecorator(String str, CharSequence charSequence, String str2) {
        this.spanKind = str;
        this.spanType = charSequence;
        this.serviceName = str2;
    }

    @Override // datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator
    protected CharSequence spanType() {
        return this.spanType;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator
    public String[] instrumentationNames() {
        return new String[]{KAFKA};
    }

    @Override // datadog.trace.bootstrap.instrumentation.decorator.ClientDecorator
    protected String service() {
        return this.serviceName;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator
    public CharSequence component() {
        return JAVA_KAFKA;
    }

    @Override // datadog.trace.bootstrap.instrumentation.decorator.ClientDecorator
    protected String spanKind() {
        return this.spanKind;
    }

    public void onConsume(AgentSpan agentSpan, ConsumerRecord consumerRecord, String str, String str2) {
        if (consumerRecord != null) {
            agentSpan.setResourceName(CONSUMER_RESOURCE_NAME_CACHE.computeIfAbsent(consumerRecord.topic() == null ? KAFKA : consumerRecord.topic(), CONSUMER_PREFIX));
            agentSpan.setTag("partition", consumerRecord.partition());
            agentSpan.setTag(InstrumentationTags.OFFSET, consumerRecord.offset());
            if (str != null) {
                agentSpan.setTag(InstrumentationTags.CONSUMER_GROUP, str);
            }
            if (str2 != null) {
                agentSpan.setTag(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS, str2);
            }
            if (consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
                agentSpan.setTag(InstrumentationTags.RECORD_QUEUE_TIME_MS, Math.max(0L, TimeUnit.NANOSECONDS.toMillis(agentSpan.getStartTime()) - consumerRecord.timestamp()));
            }
        }
    }

    public void onTimeInQueue(AgentSpan agentSpan, ConsumerRecord consumerRecord) {
        if (consumerRecord != null) {
            String str = consumerRecord.topic() == null ? KAFKA : consumerRecord.topic();
            agentSpan.setResourceName((CharSequence) str);
            if (Config.get().isMessageBrokerSplitByDestination()) {
                agentSpan.setServiceName(str);
            }
        }
    }

    public void onProduce(AgentSpan agentSpan, ProducerRecord producerRecord, ProducerConfig producerConfig) {
        if (producerRecord != null) {
            if (producerRecord.partition() != null) {
                agentSpan.setTag("partition", (Number) producerRecord.partition());
            }
            if (producerConfig != null) {
                agentSpan.setTag(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS, PRODUCER_BOOSTRAP_SERVERS_CACHE.computeIfAbsent(producerConfig, BOOTSTRAP_SERVERS_JOINER));
            }
            agentSpan.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(producerRecord.topic() == null ? KAFKA : producerRecord.topic(), PRODUCER_PREFIX));
        }
    }

    static {
        TIME_IN_QUEUE_ENABLED = Config.get().isTimeInQueueEnabled(!KAFKA_LEGACY_TRACING, KAFKA);
        PRODUCER_RESOURCE_NAME_CACHE = DDCaches.newFixedSizeCache(32);
        PRODUCER_PREFIX = new Functions.Prefix("Produce Topic ");
        CONSUMER_RESOURCE_NAME_CACHE = DDCaches.newFixedSizeCache(32);
        PRODUCER_BOOSTRAP_SERVERS_CACHE = DDCaches.newFixedSizeWeakKeyCache(16);
        BOOTSTRAP_SERVERS_JOINER = producerConfig -> {
            return String.join(",", producerConfig.getList("bootstrap.servers"));
        };
        CONSUMER_PREFIX = new Functions.Prefix("Consume Topic ");
        PRODUCER_DECORATE = new KafkaDecorator(Tags.SPAN_KIND_PRODUCER, InternalSpanTypes.MESSAGE_PRODUCER, SpanNaming.instance().namingSchema().messaging().outboundService(KAFKA, KAFKA_LEGACY_TRACING));
        CONSUMER_DECORATE = new KafkaDecorator(Tags.SPAN_KIND_CONSUMER, InternalSpanTypes.MESSAGE_CONSUMER, SpanNaming.instance().namingSchema().messaging().inboundService(KAFKA, KAFKA_LEGACY_TRACING));
        BROKER_DECORATE = new KafkaDecorator(Tags.SPAN_KIND_BROKER, InternalSpanTypes.MESSAGE_BROKER, SpanNaming.instance().namingSchema().messaging().timeInQueueService(KAFKA));
    }
}
