package datadog.trace.agent.common.writer;

import datadog.communication.ddagent.DroppingPolicy;
import datadog.slf4j.Logger;
import datadog.slf4j.LoggerFactory;
import datadog.trace.agent.common.sampling.SingleSpanSampler;
import datadog.trace.agent.common.writer.ddagent.FlushEvent;
import datadog.trace.agent.common.writer.ddagent.Prioritization;
import datadog.trace.agent.common.writer.ddagent.PrioritizationStrategy;
import datadog.trace.agent.core.CoreSpan;
import datadog.trace.agent.core.monitor.HealthMetrics;
import datadog.trace.util.AgentThreadFactory;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;

/* loaded from: input_file:inst/datadog/trace/agent/common/writer/TraceProcessingWorker.classdata */
public class TraceProcessingWorker implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TraceProcessingWorker.class);
    private final PrioritizationStrategy prioritizationStrategy;
    private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
    private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
    private final TraceSerializingHandler serializingHandler;
    private final Thread serializerThread;
    private final int capacity;
    private final SpanSamplingWorker spanSamplingWorker;

    /* loaded from: input_file:inst/datadog/trace/agent/common/writer/TraceProcessingWorker$TraceSerializingHandler.classdata */
    public static class TraceSerializingHandler implements Runnable, MessagePassingQueue.Consumer<Object> {
        private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
        private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
        private final HealthMetrics healthMetrics;
        private final long ticksRequiredToFlush;
        private final boolean doTimeFlush;
        private final PayloadDispatcher payloadDispatcher;
        private long lastTicks;

        public TraceSerializingHandler(MpscBlockingConsumerArrayQueue<Object> mpscBlockingConsumerArrayQueue, MpscBlockingConsumerArrayQueue<Object> mpscBlockingConsumerArrayQueue2, HealthMetrics healthMetrics, PayloadDispatcher payloadDispatcher, long j, TimeUnit timeUnit) {
            this.primaryQueue = mpscBlockingConsumerArrayQueue;
            this.secondaryQueue = mpscBlockingConsumerArrayQueue2;
            this.healthMetrics = healthMetrics;
            this.doTimeFlush = j > 0;
            this.payloadDispatcher = payloadDispatcher;
            if (!this.doTimeFlush) {
                this.ticksRequiredToFlush = Long.MAX_VALUE;
            } else {
                this.lastTicks = System.nanoTime();
                this.ticksRequiredToFlush = timeUnit.toNanos(j);
            }
        }

        public void onEvent(Object obj) {
            try {
                if (obj instanceof List) {
                    this.payloadDispatcher.addTrace((List) obj);
                } else if (obj instanceof FlushEvent) {
                    this.payloadDispatcher.flush();
                    ((FlushEvent) obj).sync();
                }
            } catch (Throwable th) {
                if (TraceProcessingWorker.log.isDebugEnabled()) {
                    TraceProcessingWorker.log.debug("Error while serializing trace", th);
                }
                this.healthMetrics.onFailedSerialize(obj instanceof List ? (List) obj : null, th);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                runDutyCycle();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            TraceProcessingWorker.log.debug("Datadog trace processor exited. Publishing traces stopped");
        }

        private void runDutyCycle() throws InterruptedException {
            Thread currentThread = Thread.currentThread();
            while (!currentThread.isInterrupted()) {
                consumeFromPrimaryQueue();
                consumeFromSecondaryQueue();
                flushIfNecessary();
            }
        }

        private void consumeFromPrimaryQueue() throws InterruptedException {
            Object poll = this.primaryQueue.poll(100L, TimeUnit.MILLISECONDS);
            if (null != poll) {
                onEvent(poll);
                consumeBatch(this.primaryQueue);
            }
        }

        private void consumeFromSecondaryQueue() {
            Object poll = this.secondaryQueue.poll();
            if (null != poll) {
                onEvent(poll);
                consumeBatch(this.secondaryQueue);
            }
        }

        private void flushIfNecessary() {
            if (shouldFlush()) {
                this.payloadDispatcher.flush();
            }
        }

        private boolean shouldFlush() {
            if (!this.doTimeFlush) {
                return false;
            }
            long nanoTime = System.nanoTime();
            if (nanoTime - this.lastTicks <= this.ticksRequiredToFlush) {
                return false;
            }
            this.lastTicks = nanoTime;
            return true;
        }

        private void consumeBatch(MessagePassingQueue<Object> messagePassingQueue) {
            messagePassingQueue.drain(this, messagePassingQueue.size());
        }

        @Override // org.jctools.queues.MessagePassingQueue.Consumer
        public void accept(Object obj) {
            onEvent(obj);
        }
    }

    public TraceProcessingWorker(int i, HealthMetrics healthMetrics, PayloadDispatcher payloadDispatcher, DroppingPolicy droppingPolicy, Prioritization prioritization, long j, TimeUnit timeUnit, SingleSpanSampler singleSpanSampler) {
        this.capacity = i;
        this.primaryQueue = createQueue(i);
        this.secondaryQueue = createQueue(i);
        this.spanSamplingWorker = SpanSamplingWorker.build(i, this.primaryQueue, this.secondaryQueue, singleSpanSampler, healthMetrics, droppingPolicy);
        this.prioritizationStrategy = prioritization.create(this.primaryQueue, this.secondaryQueue, this.spanSamplingWorker == null ? null : this.spanSamplingWorker.getSpanSamplingQueue(), droppingPolicy);
        this.serializingHandler = new TraceSerializingHandler(this.primaryQueue, this.secondaryQueue, healthMetrics, payloadDispatcher, j, timeUnit);
        this.serializerThread = AgentThreadFactory.newAgentThread(AgentThreadFactory.AgentThread.TRACE_PROCESSOR, this.serializingHandler);
    }

    public void start() {
        this.serializerThread.start();
        if (this.spanSamplingWorker != null) {
            this.spanSamplingWorker.start();
        }
    }

    public boolean flush(long j, TimeUnit timeUnit) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        FlushEvent flushEvent = new FlushEvent(countDownLatch);
        while (!this.primaryQueue.offer(flushEvent) && this.serializerThread.isAlive()) {
        }
        try {
            return countDownLatch.await(j, timeUnit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.spanSamplingWorker != null) {
            this.spanSamplingWorker.close();
        }
        this.serializerThread.interrupt();
        try {
            this.serializerThread.join(800L);
        } catch (InterruptedException e) {
        }
    }

    public <T extends CoreSpan<T>> PrioritizationStrategy.PublishResult publish(T t, int i, List<T> list) {
        return this.prioritizationStrategy.publish(t, i, list);
    }

    public int getCapacity() {
        return this.capacity;
    }

    public long getRemainingCapacity() {
        return this.primaryQueue.remainingCapacity();
    }

    private static MpscBlockingConsumerArrayQueue<Object> createQueue(int i) {
        return new MpscBlockingConsumerArrayQueue<>(i);
    }
}
