package io.opentelemetry.contrib.kafka;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/opentelemetry/contrib/kafka/KafkaSpanExporter.class */
public class KafkaSpanExporter implements SpanExporter {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSpanExporter.class);
    private final String topicName;
    private final Producer<String, Collection<SpanData>> producer;
    private final ExecutorService executorService;
    private final long timeoutInSeconds;
    private final AtomicBoolean isShutdown = new AtomicBoolean();

    public static KafkaSpanExporterBuilder newBuilder() {
        return new KafkaSpanExporterBuilder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaSpanExporter(String str, Producer<String, Collection<SpanData>> producer, ExecutorService executorService, long j) {
        this.topicName = str;
        this.producer = producer;
        this.executorService = executorService;
        this.timeoutInSeconds = j;
    }

    public CompletableResultCode export(@Nonnull Collection<SpanData> collection) {
        if (this.isShutdown.get()) {
            return CompletableResultCode.ofFailure();
        }
        ProducerRecord producerRecord = new ProducerRecord(this.topicName, collection);
        CompletableResultCode completableResultCode = new CompletableResultCode();
        CompletableFuture.runAsync(() -> {
            this.producer.send(producerRecord, (recordMetadata, exc) -> {
                if (exc == null) {
                    completableResultCode.succeed();
                } else {
                    logger.error(String.format("Error while sending spans to Kafka topic %s", this.topicName), exc);
                    completableResultCode.fail();
                }
            });
        }, this.executorService);
        return completableResultCode;
    }

    public CompletableResultCode flush() {
        CompletableResultCode completableResultCode = new CompletableResultCode();
        Producer<String, Collection<SpanData>> producer = this.producer;
        Objects.requireNonNull(producer);
        CompletableFuture.runAsync(producer::flush, this.executorService).handle((r9, th) -> {
            if (th == null) {
                completableResultCode.succeed();
            } else {
                logger.error(String.format("Error while performing the flush operation on topic %s", this.topicName), th);
                completableResultCode.fail();
            }
            return true;
        });
        return completableResultCode;
    }

    private CompletableResultCode shutdownExecutorService() {
        try {
            this.executorService.shutdown();
            if (!this.executorService.awaitTermination(this.timeoutInSeconds, TimeUnit.SECONDS)) {
                List<Runnable> shutdownNow = this.executorService.shutdownNow();
                if (!shutdownNow.isEmpty()) {
                    logger.error("Shutting down KafkaSpanExporter forced {} tasks to be cancelled.", Integer.valueOf(shutdownNow.size()));
                }
            }
            return CompletableResultCode.ofSuccess();
        } catch (InterruptedException e) {
            logger.error("Error when trying to shutdown KafkaSpanExporter executorService.", e);
            return CompletableResultCode.ofFailure();
        }
    }

    private CompletableResultCode shutdownProducer() {
        try {
            this.producer.close(Duration.ofSeconds(this.timeoutInSeconds));
            return CompletableResultCode.ofSuccess();
        } catch (KafkaException e) {
            logger.error("Error when trying to shutdown KafkaSpanExporter Producer.", e);
            return CompletableResultCode.ofFailure();
        }
    }

    public CompletableResultCode shutdown() {
        if (!this.isShutdown.compareAndSet(false, true)) {
            logger.warn("Calling shutdown() multiple times.");
            return CompletableResultCode.ofSuccess();
        }
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(shutdownExecutorService());
        arrayList.add(shutdownProducer());
        return CompletableResultCode.ofAll(arrayList);
    }
}
