package datadog.trace.agent.common.writer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import datadog.slf4j.Logger;
import datadog.slf4j.LoggerFactory;
import datadog.trace.agent.common.util.DaemonThreadFactory;
import datadog.trace.agent.ot.DDSpan;
import datadog.trace.api.Config;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:agent-tooling-and-instrumentation.isolated/datadog/trace/agent/common/writer/DDAgentWriter.classdata */
public class DDAgentWriter implements Writer {
    private static final int DISRUPTOR_BUFFER_SIZE = 8192;
    private static final int FLUSH_PAYLOAD_BYTES = 5000000;
    private static final int FLUSH_PAYLOAD_DELAY = 1;
    private final Runnable flushTask;
    private final DDApi api;
    private final int flushFrequencySeconds;
    private final Disruptor<Event<List<DDSpan>>> disruptor;
    private final ScheduledExecutorService scheduledWriterExecutor;
    private final AtomicInteger traceCount;
    private final AtomicReference<ScheduledFuture<?>> flushSchedule;
    private final Phaser apiPhaser;
    private volatile boolean running;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DDAgentWriter.class);
    private static final EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>> TRANSLATOR = new EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>>() { // from class: datadog.trace.agent.common.writer.DDAgentWriter.1
        @Override // com.lmax.disruptor.EventTranslatorOneArg
        public void translateTo(Event<List<DDSpan>> event, long j, List<DDSpan> list) {
            ((Event) event).data = list;
        }
    };
    private static final EventTranslator<Event<List<DDSpan>>> FLUSH_TRANSLATOR = new EventTranslator<Event<List<DDSpan>>>() { // from class: datadog.trace.agent.common.writer.DDAgentWriter.2
        @Override // com.lmax.disruptor.EventTranslator
        public void translateTo(Event<List<DDSpan>> event, long j) {
            ((Event) event).shouldFlush = true;
        }
    };
    private static final ThreadFactory DISRUPTOR_THREAD_FACTORY = new DaemonThreadFactory("dd-trace-disruptor");
    private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY = new DaemonThreadFactory("dd-trace-writer");

    /* loaded from: input_file:agent-tooling-and-instrumentation.isolated/datadog/trace/agent/common/writer/DDAgentWriter$DisruptorEventFactory.classdata */
    private static class DisruptorEventFactory<T> implements EventFactory<Event<T>> {
        private DisruptorEventFactory() {
        }

        @Override // com.lmax.disruptor.EventFactory
        public Event<T> newInstance() {
            return new Event<>();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:agent-tooling-and-instrumentation.isolated/datadog/trace/agent/common/writer/DDAgentWriter$Event.classdata */
    public static class Event<T> {
        private volatile boolean shouldFlush;
        private volatile T data;

        private Event() {
            this.shouldFlush = false;
            this.data = null;
        }
    }

    /* loaded from: input_file:agent-tooling-and-instrumentation.isolated/datadog/trace/agent/common/writer/DDAgentWriter$FlushTask.classdata */
    private class FlushTask implements Runnable {
        private FlushTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            DDAgentWriter.this.disruptor.publishEvent(DDAgentWriter.FLUSH_TRANSLATOR);
        }
    }

    /* loaded from: input_file:agent-tooling-and-instrumentation.isolated/datadog/trace/agent/common/writer/DDAgentWriter$TraceConsumer.classdata */
    private class TraceConsumer implements EventHandler<Event<List<DDSpan>>> {
        private List<byte[]> serializedTraces;
        private int payloadSize;

        private TraceConsumer() {
            this.serializedTraces = new ArrayList();
            this.payloadSize = 0;
        }

        @Override // com.lmax.disruptor.EventHandler
        public void onEvent(Event<List<DDSpan>> event, long j, boolean z) {
            List<DDSpan> list = (List) ((Event) event).data;
            ((Event) event).data = null;
            if (list != null) {
                DDAgentWriter.this.traceCount.incrementAndGet();
                try {
                    byte[] serializeTrace = DDAgentWriter.this.api.serializeTrace(list);
                    this.payloadSize += serializeTrace.length;
                    this.serializedTraces.add(serializeTrace);
                } catch (JsonProcessingException e) {
                    DDAgentWriter.log.warn("Error serializing trace", (Throwable) e);
                } catch (Throwable th) {
                    DDAgentWriter.log.debug("Error while serializing trace", th);
                }
            }
            if (((Event) event).shouldFlush || this.payloadSize >= DDAgentWriter.FLUSH_PAYLOAD_BYTES) {
                reportTraces();
                ((Event) event).shouldFlush = false;
            }
        }

        private void reportTraces() {
            try {
                if (this.serializedTraces.isEmpty()) {
                    DDAgentWriter.this.apiPhaser.arrive();
                    this.payloadSize = 0;
                    DDAgentWriter.this.scheduleFlush();
                    return;
                }
                final List<byte[]> list = this.serializedTraces;
                this.serializedTraces = new ArrayList(list.size());
                final int andSet = DDAgentWriter.this.traceCount.getAndSet(0);
                final int i = this.payloadSize;
                DDAgentWriter.this.scheduledWriterExecutor.execute(new Runnable() { // from class: datadog.trace.agent.common.writer.DDAgentWriter.TraceConsumer.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            if (DDAgentWriter.this.api.sendSerializedTraces(andSet, Integer.valueOf(i), list)) {
                                DDAgentWriter.log.debug("Successfully sent {} traces to the API", Integer.valueOf(list.size()));
                            } else {
                                DDAgentWriter.log.debug("Failed to send {} traces (representing {}) of size {} bytes to the API", Integer.valueOf(list.size()), Integer.valueOf(andSet), Integer.valueOf(i));
                            }
                        } catch (Throwable th) {
                            DDAgentWriter.log.debug("Failed to send traces to the API: {}", th.getMessage());
                        } finally {
                            DDAgentWriter.this.apiPhaser.arrive();
                        }
                    }
                });
                this.payloadSize = 0;
                DDAgentWriter.this.scheduleFlush();
            } catch (Throwable th) {
                this.payloadSize = 0;
                DDAgentWriter.this.scheduleFlush();
                throw th;
            }
        }
    }

    public DDAgentWriter() {
        this(new DDApi(Config.DEFAULT_AGENT_HOST, Config.DEFAULT_TRACE_AGENT_PORT, Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET));
    }

    public DDAgentWriter(DDApi dDApi) {
        this(dDApi, 8192, 1);
    }

    private DDAgentWriter(DDApi dDApi, int i, int i2) {
        this.flushTask = new FlushTask();
        this.traceCount = new AtomicInteger(0);
        this.flushSchedule = new AtomicReference<>();
        this.running = false;
        this.api = dDApi;
        this.flushFrequencySeconds = i2;
        this.disruptor = new Disruptor<>(new DisruptorEventFactory(), Math.max(2, Integer.highestOneBit(i - 1) << 1), DISRUPTOR_THREAD_FACTORY, ProducerType.MULTI, new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5L)));
        this.disruptor.handleEventsWith(new TraceConsumer());
        this.scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY);
        this.apiPhaser = new Phaser();
        this.apiPhaser.register();
    }

    @Override // datadog.trace.agent.common.writer.Writer
    public void write(List<DDSpan> list) {
        if (!this.running) {
            log.debug("Trace written after shutdown. Ignoring trace: {}", list);
        } else {
            if (this.disruptor.getRingBuffer().tryPublishEvent((EventTranslatorOneArg<Event<List<DDSpan>>, EventTranslatorOneArg>) TRANSLATOR, (EventTranslatorOneArg) list)) {
                return;
            }
            this.traceCount.incrementAndGet();
            log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", list);
        }
    }

    @Override // datadog.trace.agent.common.writer.Writer
    public void incrementTraceCount() {
        this.traceCount.incrementAndGet();
    }

    public DDApi getApi() {
        return this.api;
    }

    @Override // datadog.trace.agent.common.writer.Writer
    public void start() {
        this.disruptor.start();
        this.running = true;
        scheduleFlush();
    }

    @Override // datadog.trace.agent.common.writer.Writer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.running = false;
        this.scheduledWriterExecutor.shutdown();
        try {
            this.scheduledWriterExecutor.awaitTermination(this.flushFrequencySeconds, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn("Waiting for flush executor shutdown interrupted.", (Throwable) e);
        }
        flush();
        this.disruptor.shutdown();
    }

    public void flush() {
        if (this.running) {
            log.info("Flushing any remaining traces.");
            this.apiPhaser.register();
            this.disruptor.publishEvent(FLUSH_TRANSLATOR);
            try {
                this.apiPhaser.awaitAdvanceInterruptibly(this.apiPhaser.arriveAndDeregister());
            } catch (InterruptedException e) {
                log.warn("Waiting for flush interrupted.", (Throwable) e);
            }
        }
    }

    public String toString() {
        return "DDAgentWriter { api=" + this.api + " }";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleFlush() {
        ScheduledFuture<?> andSet;
        if (this.flushFrequencySeconds <= 0 || this.scheduledWriterExecutor.isShutdown() || (andSet = this.flushSchedule.getAndSet(this.scheduledWriterExecutor.schedule(this.flushTask, this.flushFrequencySeconds, TimeUnit.SECONDS))) == null) {
            return;
        }
        andSet.cancel(true);
    }
}
