package zipkin2.storage.kafka;

import com.linecorp.armeria.server.Server;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Call;
import zipkin2.CheckResult;
import zipkin2.storage.AutocompleteTags;
import zipkin2.storage.ServiceAndSpanNames;
import zipkin2.storage.SpanConsumer;
import zipkin2.storage.SpanStore;
import zipkin2.storage.StorageComponent;
import zipkin2.storage.Traces;
import zipkin2.storage.kafka.streams.DependencyStorageTopology;
import zipkin2.storage.kafka.streams.SpanAggregationTopology;
import zipkin2.storage.kafka.streams.TraceStorageTopology;

/* loaded from: input_file:lib/zipkin-storage-kafka-0.9.4.jar:zipkin2/storage/kafka/KafkaStorage.class */
public class KafkaStorage extends StorageComponent {
    public static final String HTTP_PATH_PREFIX = "/storage/kafka";
    static final Logger LOG = LoggerFactory.getLogger(KafkaStorage.class);
    final boolean partitioningEnabled;
    final boolean aggregationEnabled;
    final boolean traceByIdQueryEnabled;
    final boolean traceSearchEnabled;
    final boolean dependencyQueryEnabled;
    final List<String> autocompleteKeys;
    final long minTracesStored;
    final String hostname;
    final int httpPort;
    final String partitioningSpansTopic;
    final String aggregationSpansTopic;
    final String aggregationTraceTopic;
    final String aggregationDependencyTopic;
    final String storageSpansTopic;
    final String storageDependencyTopic;
    final Properties adminConfig;
    final Properties producerConfig;
    final Properties aggregationStreamConfig;
    final Properties traceStoreStreamConfig;
    final Properties dependencyStoreStreamConfig;
    final Topology aggregationTopology;
    final Topology traceStoreTopology;
    final Topology dependencyStoreTopology;
    final BiFunction<String, Integer, String> httpBaseUrl;
    volatile AdminClient adminClient;
    volatile Producer<String, byte[]> producer;
    volatile KafkaStreams aggregationStream;
    volatile KafkaStreams traceStoreStream;
    volatile KafkaStreams dependencyStoreStream;
    volatile Server server;
    volatile boolean closeCalled;

    public static KafkaStorageBuilder newBuilder() {
        return new KafkaStorageBuilder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStorage(KafkaStorageBuilder kafkaStorageBuilder) {
        this.partitioningEnabled = kafkaStorageBuilder.spanPartitioning.enabled;
        this.aggregationEnabled = kafkaStorageBuilder.spanAggregation.enabled;
        this.traceByIdQueryEnabled = kafkaStorageBuilder.traceStorage.traceByIdQueryEnabled;
        this.traceSearchEnabled = kafkaStorageBuilder.traceStorage.traceSearchEnabled;
        this.dependencyQueryEnabled = kafkaStorageBuilder.dependencyStorage.enabled;
        this.autocompleteKeys = kafkaStorageBuilder.autocompleteKeys;
        this.partitioningSpansTopic = kafkaStorageBuilder.spanPartitioning.spansTopic;
        this.aggregationSpansTopic = kafkaStorageBuilder.spanAggregation.spansTopic;
        this.aggregationTraceTopic = kafkaStorageBuilder.spanAggregation.traceTopic;
        this.aggregationDependencyTopic = kafkaStorageBuilder.spanAggregation.dependencyTopic;
        this.storageSpansTopic = kafkaStorageBuilder.traceStorage.spansTopic;
        this.storageDependencyTopic = kafkaStorageBuilder.dependencyStorage.dependencyTopic;
        this.minTracesStored = kafkaStorageBuilder.traceStorage.minTracesStored;
        this.httpBaseUrl = kafkaStorageBuilder.httpBaseUrl;
        this.hostname = kafkaStorageBuilder.hostname;
        this.httpPort = kafkaStorageBuilder.serverPort;
        this.adminConfig = kafkaStorageBuilder.adminConfig;
        this.producerConfig = kafkaStorageBuilder.spanPartitioning.producerConfig;
        this.aggregationStreamConfig = kafkaStorageBuilder.spanAggregation.streamConfig;
        this.traceStoreStreamConfig = kafkaStorageBuilder.traceStorage.streamConfig;
        this.dependencyStoreStreamConfig = kafkaStorageBuilder.dependencyStorage.streamConfig;
        this.aggregationTopology = new SpanAggregationTopology(kafkaStorageBuilder.spanAggregation.spansTopic, kafkaStorageBuilder.spanAggregation.traceTopic, kafkaStorageBuilder.spanAggregation.dependencyTopic, kafkaStorageBuilder.spanAggregation.traceTimeout, kafkaStorageBuilder.spanAggregation.enabled).get();
        this.traceStoreTopology = new TraceStorageTopology(kafkaStorageBuilder.traceStorage.spansTopic, this.autocompleteKeys, kafkaStorageBuilder.traceStorage.traceTtl, kafkaStorageBuilder.traceStorage.traceTtlCheckInterval, kafkaStorageBuilder.traceStorage.minTracesStored, kafkaStorageBuilder.traceStorage.traceByIdQueryEnabled, kafkaStorageBuilder.traceStorage.traceSearchEnabled).get();
        this.dependencyStoreTopology = new DependencyStorageTopology(kafkaStorageBuilder.dependencyStorage.dependencyTopic, kafkaStorageBuilder.dependencyStorage.dependencyTtl, kafkaStorageBuilder.dependencyStorage.dependencyWindowSize, kafkaStorageBuilder.dependencyStorage.enabled).get();
    }

    public SpanConsumer spanConsumer() {
        checkResources();
        return this.partitioningEnabled ? new KafkaSpanConsumer(this) : list -> {
            return Call.create((Object) null);
        };
    }

    public SpanStore spanStore() {
        checkResources();
        return new KafkaSpanStore(this);
    }

    public Traces traces() {
        checkResources();
        return new KafkaSpanStore(this);
    }

    public ServiceAndSpanNames serviceAndSpanNames() {
        checkResources();
        return new KafkaSpanStore(this);
    }

    public AutocompleteTags autocompleteTags() {
        checkResources();
        return new KafkaAutocompleteTags(this);
    }

    void checkResources() {
        getAggregationStream();
        getTraceStorageStream();
        getDependencyStorageStream();
    }

    public CheckResult check() {
        try {
            getAdminClient().describeCluster().clusterId().get(1L, TimeUnit.SECONDS);
            KafkaStreams.State state = getAggregationStream().state();
            if (!state.isRunningOrRebalancing()) {
                return CheckResult.failed(new IllegalStateException("Aggregation stream not running. " + state));
            }
            KafkaStreams.State state2 = getTraceStorageStream().state();
            if (!state2.isRunningOrRebalancing()) {
                return CheckResult.failed(new IllegalStateException("Store stream not running. " + state2));
            }
            KafkaStreams.State state3 = getDependencyStorageStream().state();
            return !state3.isRunningOrRebalancing() ? CheckResult.failed(new IllegalStateException("Store stream not running. " + state3)) : CheckResult.OK;
        } catch (Exception e) {
            return CheckResult.failed(e);
        }
    }

    public void close() {
        if (this.closeCalled) {
            return;
        }
        synchronized (this) {
            if (!this.closeCalled) {
                doClose();
                this.closeCalled = true;
            }
        }
    }

    void doClose() {
        try {
            if (this.adminClient != null) {
                this.adminClient.close(Duration.ofSeconds(1L));
            }
            if (this.producer != null) {
                this.producer.close(Duration.ofSeconds(1L));
            }
            if (this.traceStoreStream != null) {
                this.traceStoreStream.close(Duration.ofSeconds(1L));
            }
            if (this.dependencyStoreStream != null) {
                this.dependencyStoreStream.close(Duration.ofSeconds(1L));
            }
            if (this.aggregationStream != null) {
                this.aggregationStream.close(Duration.ofSeconds(1L));
            }
            if (this.server != null) {
                this.server.close();
            }
        } catch (Error | Exception e) {
            LOG.debug("error closing client {}", e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Producer<String, byte[]> getProducer() {
        if (this.producer == null) {
            synchronized (this) {
                if (this.producer == null) {
                    this.producer = new KafkaProducer(this.producerConfig);
                }
            }
        }
        return this.producer;
    }

    AdminClient getAdminClient() {
        if (this.adminClient == null) {
            synchronized (this) {
                if (this.adminClient == null) {
                    this.adminClient = AdminClient.create(this.adminConfig);
                }
            }
        }
        return this.adminClient;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStreams getTraceStorageStream() {
        if (this.traceStoreStream == null) {
            synchronized (this) {
                if (this.traceStoreStream == null) {
                    try {
                        this.traceStoreStream = new KafkaStreams(this.traceStoreTopology, this.traceStoreStreamConfig);
                        this.traceStoreStream.start();
                        LOG.info("Trace storage topology:\n{}", this.traceStoreTopology.describe());
                    } catch (Exception e) {
                        LOG.debug("Error starting trace storage process", e);
                        this.traceStoreStream = null;
                    }
                }
            }
        }
        return this.traceStoreStream;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStreams getDependencyStorageStream() {
        if (this.dependencyStoreStream == null) {
            synchronized (this) {
                if (this.dependencyStoreStream == null) {
                    try {
                        this.dependencyStoreStream = new KafkaStreams(this.dependencyStoreTopology, this.dependencyStoreStreamConfig);
                        this.dependencyStoreStream.start();
                        LOG.info("Dependency storage topology:\n{}", this.dependencyStoreTopology.describe());
                    } catch (Exception e) {
                        LOG.debug("Error starting dependency storage", e);
                        this.dependencyStoreStream = null;
                    }
                }
            }
        }
        return this.dependencyStoreStream;
    }

    KafkaStreams getAggregationStream() {
        if (this.aggregationStream == null) {
            synchronized (this) {
                if (this.aggregationStream == null) {
                    try {
                        this.aggregationStream = new KafkaStreams(this.aggregationTopology, this.aggregationStreamConfig);
                        this.aggregationStream.start();
                        LOG.info("Aggregation topology:\n{}", this.aggregationTopology.describe());
                    } catch (Exception e) {
                        LOG.debug("Error loading aggregation process", e);
                        this.aggregationStream = null;
                    }
                }
            }
        }
        return this.aggregationStream;
    }

    public KafkaStorageHttpService httpService() {
        return new KafkaStorageHttpService(this);
    }

    public String toString() {
        return "KafkaStorage{ bootstrapServers=" + this.adminConfig.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG) + ", spanPartitioning{ enabled=" + this.partitioningEnabled + ", spansTopic=" + this.partitioningSpansTopic + "}, spanAggregation{ enabled=" + this.aggregationEnabled + ", spansTopic=" + this.aggregationSpansTopic + ", traceTopic=" + this.aggregationTraceTopic + ", dependencyTopic=" + this.aggregationDependencyTopic + "}, traceStore { traceByIdQueryEnabled=" + this.traceByIdQueryEnabled + ", traceSearchEnabled=" + this.traceSearchEnabled + ", spansTopic=" + this.storageSpansTopic + "}, dependencyStore { dependencyQueryEnabled=" + this.dependencyQueryEnabled + ", dependencyTopic=" + this.storageDependencyTopic + "}}";
    }
}
