package org.commonjava.indy.subsys.kafka;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.commonjava.indy.subsys.kafka.conf.KafkaConfig;
import org.commonjava.indy.subsys.kafka.util.LogbackFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/commonjava/indy/subsys/kafka/IndyKafkaProducer.class */
public class IndyKafkaProducer {

    @Inject
    private KafkaConfig config;
    private KafkaProducer kafkaProducer;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Consumer<Exception> exceptionHandler = exc -> {
        this.logger.error("Send to Kafka failed", exc);
    };
    private final Callback callback = (recordMetadata, exc) -> {
        if (exc != null) {
            this.exceptionHandler.accept(exc);
        } else {
            this.logger.trace("Message sent to Kafka. Partition:{}, timestamp {}.", Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.timestamp()));
        }
    };

    public IndyKafkaProducer(Properties properties) {
        this.kafkaProducer = new KafkaProducer(properties);
    }

    @PostConstruct
    private void init() {
        if (this.config.isEnabled()) {
            Properties properties = new Properties();
            properties.putAll(this.config.getConfiguration());
            this.kafkaProducer = new KafkaProducer(properties);
        }
    }

    public void send(String str, String str2) throws IOException {
        send(str, str2, (LogbackFormatter) null);
    }

    public void send(String str, String str2, LogbackFormatter logbackFormatter) throws IOException {
        doKafkaSend(str, str2, logbackFormatter);
    }

    public void send(String str, String str2, long j) throws IOException, InterruptedException, ExecutionException, TimeoutException {
        send(str, str2, null, j);
    }

    public void send(String str, String str2, LogbackFormatter logbackFormatter, long j) throws IOException, InterruptedException, ExecutionException, TimeoutException {
        Future doKafkaSend = doKafkaSend(str, str2, logbackFormatter);
        if (doKafkaSend != null) {
            doKafkaSend.get(j, TimeUnit.MILLISECONDS);
        }
    }

    private Future doKafkaSend(String str, String str2, LogbackFormatter logbackFormatter) throws IOException {
        if (this.kafkaProducer != null) {
            return this.kafkaProducer.send(new ProducerRecord(str, logbackFormatter != null ? logbackFormatter.format(str2) : str2), this.callback);
        }
        return null;
    }

    public void flush() {
        if (this.kafkaProducer != null) {
            this.kafkaProducer.flush();
        }
    }

    @PreDestroy
    public void close() throws IOException {
        if (this.kafkaProducer != null) {
            this.logger.info("Closing {}", getClass());
            this.kafkaProducer.close();
        }
    }
}
