package datadog.trace.agent.core.datastreams;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.slf4j.Logger;
import datadog.slf4j.LoggerFactory;
import datadog.trace.agent.common.metrics.EventListener;
import datadog.trace.agent.common.metrics.OkHttpSink;
import datadog.trace.agent.common.metrics.Sink;
import datadog.trace.agent.core.DDSpan;
import datadog.trace.agent.core.DDTraceCoreInfo;
import datadog.trace.agent.core.propagation.HttpCodec;
import datadog.trace.api.Config;
import datadog.trace.api.DDTags;
import datadog.trace.api.TraceConfig;
import datadog.trace.api.WellKnownTags;
import datadog.trace.api.experimental.DataStreamsContextCarrier;
import datadog.trace.api.time.TimeSource;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.Backlog;
import datadog.trace.bootstrap.instrumentation.api.InboxItem;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import datadog.trace.bootstrap.instrumentation.api.StatsPoint;
import datadog.trace.util.AgentTaskScheduler;
import datadog.trace.util.AgentThreadFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;

/* loaded from: input_file:trace/datadog/trace/agent/core/datastreams/DefaultDataStreamsMonitoring.classdata */
public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, EventListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DefaultDataStreamsMonitoring.class);
    static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5);
    private static final StatsPoint REPORT = new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0);
    private static final StatsPoint POISON_PILL = new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0);
    private final Map<Long, StatsBucket> timeToBucket;
    private final BlockingQueue<InboxItem> inbox;
    private final DatastreamsPayloadWriter payloadWriter;
    private final DDAgentFeaturesDiscovery features;
    private final TimeSource timeSource;
    private final WellKnownTags wellKnownTags;
    private final Supplier<TraceConfig> traceConfigSupplier;
    private final long bucketDurationNanos;
    private final DataStreamContextInjector injector;
    private final Thread thread;
    private AgentTaskScheduler.Scheduled<DefaultDataStreamsMonitoring> cancellation;
    private volatile long nextFeatureCheck;
    private volatile boolean supportsDataStreams;
    private volatile boolean agentSupportsDataStreams;
    private volatile boolean configSupportsDataStreams;
    private final ConcurrentHashMap<String, SchemaSampler> schemaSamplers;

    /* loaded from: input_file:trace/datadog/trace/agent/core/datastreams/DefaultDataStreamsMonitoring$InboxProcessor.classdata */
    private class InboxProcessor implements Runnable {
        private InboxProcessor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread currentThread = Thread.currentThread();
            while (!currentThread.isInterrupted()) {
                try {
                    InboxItem inboxItem = (InboxItem) DefaultDataStreamsMonitoring.this.inbox.take();
                    if (inboxItem == DefaultDataStreamsMonitoring.REPORT) {
                        DefaultDataStreamsMonitoring.this.checkDynamicConfig();
                        if (DefaultDataStreamsMonitoring.this.supportsDataStreams) {
                            DefaultDataStreamsMonitoring.this.flush(DefaultDataStreamsMonitoring.this.timeSource.getCurrentTimeNanos());
                        } else if (DefaultDataStreamsMonitoring.this.timeSource.getCurrentTimeNanos() >= DefaultDataStreamsMonitoring.this.nextFeatureCheck) {
                            DefaultDataStreamsMonitoring.this.checkFeatures();
                        }
                    } else if (inboxItem == DefaultDataStreamsMonitoring.POISON_PILL) {
                        if (DefaultDataStreamsMonitoring.this.supportsDataStreams) {
                            DefaultDataStreamsMonitoring.this.flush(Long.MAX_VALUE);
                        }
                        return;
                    } else if (DefaultDataStreamsMonitoring.this.supportsDataStreams) {
                        if (inboxItem instanceof StatsPoint) {
                            StatsPoint statsPoint = (StatsPoint) inboxItem;
                            ((StatsBucket) DefaultDataStreamsMonitoring.this.timeToBucket.computeIfAbsent(Long.valueOf(DefaultDataStreamsMonitoring.this.currentBucket(statsPoint.getTimestampNanos())), l -> {
                                return new StatsBucket(l.longValue(), DefaultDataStreamsMonitoring.this.bucketDurationNanos);
                            })).addPoint(statsPoint);
                        } else if (inboxItem instanceof Backlog) {
                            Backlog backlog = (Backlog) inboxItem;
                            ((StatsBucket) DefaultDataStreamsMonitoring.this.timeToBucket.computeIfAbsent(Long.valueOf(DefaultDataStreamsMonitoring.this.currentBucket(backlog.getTimestampNanos())), l2 -> {
                                return new StatsBucket(l2.longValue(), DefaultDataStreamsMonitoring.this.bucketDurationNanos);
                            })).addBacklog(backlog);
                        }
                    }
                } catch (InterruptedException e) {
                    currentThread.interrupt();
                } catch (Exception e2) {
                    DefaultDataStreamsMonitoring.log.debug("Error monitoring data streams", (Throwable) e2);
                }
            }
        }
    }

    /* loaded from: input_file:trace/datadog/trace/agent/core/datastreams/DefaultDataStreamsMonitoring$ReportTask.classdata */
    private static final class ReportTask implements AgentTaskScheduler.Task<DefaultDataStreamsMonitoring> {
        private ReportTask() {
        }

        @Override // datadog.trace.util.AgentTaskScheduler.Task
        public void run(DefaultDataStreamsMonitoring defaultDataStreamsMonitoring) {
            defaultDataStreamsMonitoring.report();
        }
    }

    public DefaultDataStreamsMonitoring(Config config, SharedCommunicationObjects sharedCommunicationObjects, TimeSource timeSource, Supplier<TraceConfig> supplier) {
        this(new OkHttpSink(sharedCommunicationObjects.okHttpClient, sharedCommunicationObjects.agentUrl.toString(), DDAgentFeaturesDiscovery.V01_DATASTREAMS_ENDPOINT, false, true, Collections.emptyMap()), sharedCommunicationObjects.featuresDiscovery(config), timeSource, supplier, config);
    }

    public DefaultDataStreamsMonitoring(Sink sink, DDAgentFeaturesDiscovery dDAgentFeaturesDiscovery, TimeSource timeSource, Supplier<TraceConfig> supplier, Config config) {
        this(sink, dDAgentFeaturesDiscovery, timeSource, supplier, config.getWellKnownTags(), new MsgPackDatastreamsPayloadWriter(sink, config.getWellKnownTags(), DDTraceCoreInfo.VERSION, config.getPrimaryTag()), Config.get().getDataStreamsBucketDurationNanoseconds());
    }

    public DefaultDataStreamsMonitoring(Sink sink, DDAgentFeaturesDiscovery dDAgentFeaturesDiscovery, TimeSource timeSource, Supplier<TraceConfig> supplier, WellKnownTags wellKnownTags, DatastreamsPayloadWriter datastreamsPayloadWriter, long j) {
        this.timeToBucket = new HashMap();
        this.inbox = new MpscBlockingConsumerArrayQueue(1024);
        this.supportsDataStreams = false;
        this.agentSupportsDataStreams = false;
        this.configSupportsDataStreams = false;
        this.features = dDAgentFeaturesDiscovery;
        this.timeSource = timeSource;
        this.traceConfigSupplier = supplier;
        this.wellKnownTags = wellKnownTags;
        this.payloadWriter = datastreamsPayloadWriter;
        this.bucketDurationNanos = j;
        this.injector = new DataStreamContextInjector(this);
        this.thread = AgentThreadFactory.newAgentThread(AgentThreadFactory.AgentThread.DATA_STREAMS_MONITORING, new InboxProcessor());
        sink.register(this);
        this.schemaSamplers = new ConcurrentHashMap<>();
    }

    @Override // datadog.trace.agent.core.datastreams.DataStreamsMonitoring
    public void start() {
        if (this.features.getDataStreamsEndpoint() == null) {
            this.features.discoverIfOutdated();
        }
        this.agentSupportsDataStreams = this.features.supportsDataStreams();
        checkDynamicConfig();
        if (!this.configSupportsDataStreams) {
            log.debug("Data streams is disabled");
        } else if (!this.agentSupportsDataStreams) {
            log.debug("Data streams is disabled or not supported by agent");
        }
        this.nextFeatureCheck = this.timeSource.getCurrentTimeNanos() + FEATURE_CHECK_INTERVAL_NANOS;
        this.cancellation = AgentTaskScheduler.INSTANCE.scheduleAtFixedRate(new ReportTask(), this, this.bucketDurationNanos, this.bucketDurationNanos, TimeUnit.NANOSECONDS);
        this.thread.start();
    }

    @Override // datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring
    public void add(StatsPoint statsPoint) {
        if (this.thread.isAlive()) {
            this.inbox.offer(statsPoint);
        }
    }

    @Override // datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring
    public int shouldSampleSchema(String str) {
        return this.schemaSamplers.computeIfAbsent(str, str2 -> {
            return new SchemaSampler();
        }).shouldSample(this.timeSource.getCurrentTimeMillis());
    }

    @Override // datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring
    public PathwayContext newPathwayContext() {
        return this.configSupportsDataStreams ? new DefaultPathwayContext(this.timeSource, this.wellKnownTags) : AgentTracer.NoopPathwayContext.INSTANCE;
    }

    @Override // datadog.trace.agent.core.datastreams.DataStreamsMonitoring
    public HttpCodec.Extractor extractor(HttpCodec.Extractor extractor) {
        return new DataStreamContextExtractor(extractor, this.timeSource, this.traceConfigSupplier, this.wellKnownTags);
    }

    @Override // datadog.trace.agent.core.datastreams.DataStreamsMonitoring
    public DataStreamContextInjector injector() {
        return this.injector;
    }

    @Override // datadog.trace.agent.core.datastreams.DataStreamsMonitoring
    public void mergePathwayContextIntoSpan(AgentSpan agentSpan, DataStreamsContextCarrier dataStreamsContextCarrier) {
        if (agentSpan instanceof DDSpan) {
            ((DDSpan) agentSpan).context().mergePathwayContext(DefaultPathwayContext.extract(dataStreamsContextCarrier, DataStreamsContextCarrierAdapter.INSTANCE, this.timeSource, this.wellKnownTags));
        }
    }

    @Override // datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring
    public void trackBacklog(LinkedHashMap<String, String> linkedHashMap, long j) {
        ArrayList arrayList = new ArrayList(linkedHashMap.size());
        for (Map.Entry<String, String> entry : linkedHashMap.entrySet()) {
            String createTag = TagsProcessor.createTag(entry.getKey(), entry.getValue());
            if (createTag != null) {
                arrayList.add(createTag);
            }
        }
        this.inbox.offer(new Backlog(arrayList, j, this.timeSource.getCurrentTimeNanos()));
    }

    @Override // datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring
    public void setCheckpoint(AgentSpan agentSpan, LinkedHashMap<String, String> linkedHashMap, long j, long j2) {
        PathwayContext pathwayContext = agentSpan.context().getPathwayContext();
        if (pathwayContext != null) {
            pathwayContext.setCheckpoint(linkedHashMap, this::add, j, j2);
            if (pathwayContext.getHash() != 0) {
                agentSpan.m1681setTag(DDTags.PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash()));
            }
        }
    }

    @Override // datadog.trace.api.experimental.DataStreamsCheckpointer
    public void setConsumeCheckpoint(String str, String str2, DataStreamsContextCarrier dataStreamsContextCarrier) {
        if (str == null || str.isEmpty() || str2 == null || str2.isEmpty()) {
            log.warn("setConsumeCheckpoint should be called with non-empty type and source");
            return;
        }
        AgentSpan activeSpan = AgentTracer.activeSpan();
        if (activeSpan == null) {
            log.warn("SetConsumeCheckpoint is called with no active span");
            return;
        }
        mergePathwayContextIntoSpan(activeSpan, dataStreamsContextCarrier);
        LinkedHashMap<String, String> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(TagsProcessor.DIRECTION_TAG, TagsProcessor.DIRECTION_IN);
        linkedHashMap.put(TagsProcessor.TOPIC_TAG, str2);
        linkedHashMap.put("type", str);
        setCheckpoint(activeSpan, linkedHashMap, 0L, 0L);
    }

    @Override // datadog.trace.api.experimental.DataStreamsCheckpointer
    public void setProduceCheckpoint(String str, String str2, DataStreamsContextCarrier dataStreamsContextCarrier) {
        if (str == null || str.isEmpty() || str2 == null || str2.isEmpty()) {
            log.warn("SetProduceCheckpoint should be called with non-empty type and target");
            return;
        }
        AgentSpan activeSpan = AgentTracer.activeSpan();
        if (activeSpan == null) {
            log.warn("SetProduceCheckpoint is called with no active span");
            return;
        }
        LinkedHashMap<String, String> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(TagsProcessor.DIRECTION_TAG, TagsProcessor.DIRECTION_OUT);
        linkedHashMap.put(TagsProcessor.TOPIC_TAG, str2);
        linkedHashMap.put("type", str);
        this.injector.injectPathwayContext(activeSpan, dataStreamsContextCarrier, DataStreamsContextCarrierAdapter.INSTANCE, linkedHashMap);
    }

    @Override // datadog.trace.agent.core.datastreams.DataStreamsMonitoring, java.lang.AutoCloseable
    public void close() {
        if (null != this.cancellation) {
            this.cancellation.cancel();
        }
        this.inbox.offer(POISON_PILL);
        try {
            this.thread.join(800L);
        } catch (InterruptedException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long currentBucket(long j) {
        return j - (j % this.bucketDurationNanos);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flush(long j) {
        long currentBucket = currentBucket(j);
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Long, StatsBucket>> it = this.timeToBucket.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, StatsBucket> next = it.next();
            if (next.getKey().longValue() < currentBucket) {
                it.remove();
                arrayList.add(next.getValue());
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        log.debug("Flushing {} buckets", Integer.valueOf(arrayList.size()));
        this.payloadWriter.writePayload(arrayList);
    }

    @Override // datadog.trace.agent.core.datastreams.DataStreamsMonitoring
    public void clear() {
        this.timeToBucket.clear();
    }

    void report() {
        this.inbox.offer(REPORT);
    }

    @Override // datadog.trace.agent.common.metrics.EventListener
    public void onEvent(EventListener.EventType eventType, String str) {
        switch (eventType) {
            case DOWNGRADED:
                log.debug("Agent downgrade was detected");
                checkFeatures();
                return;
            case BAD_PAYLOAD:
                log.debug("bad metrics payload sent to trace agent: {}", str);
                return;
            case ERROR:
                log.debug("trace agent errored receiving metrics payload: {}", str);
                return;
            default:
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkDynamicConfig() {
        this.configSupportsDataStreams = this.traceConfigSupplier.get().isDataStreamsEnabled();
        this.supportsDataStreams = this.agentSupportsDataStreams && this.configSupportsDataStreams;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkFeatures() {
        boolean z = this.agentSupportsDataStreams;
        this.features.discoverIfOutdated();
        this.agentSupportsDataStreams = this.features.supportsDataStreams();
        if (z && !this.agentSupportsDataStreams && this.configSupportsDataStreams) {
            log.info("Disabling data streams reporting because it is not supported by the agent");
        } else if (!z && this.agentSupportsDataStreams && this.configSupportsDataStreams) {
            log.info("Agent upgrade detected. Enabling data streams because it is now supported");
        } else if (!z && this.agentSupportsDataStreams && !this.configSupportsDataStreams) {
            log.info("Agent upgrade detected. Not enabling data streams because it is disabled by config");
        }
        this.supportsDataStreams = this.agentSupportsDataStreams && this.configSupportsDataStreams;
        this.nextFeatureCheck = this.timeSource.getCurrentTimeNanos() + FEATURE_CHECK_INTERVAL_NANOS;
    }
}
