package com.pinterest.doctorkafka.stats;

import com.pinterest.doctorkafka.BrokerStats;
import com.pinterest.doctorkafka.util.OpenTsdbMetricConverter;
import com.pinterest.doctorkafka.util.OperatorUtil;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/pinterest/doctorkafka/stats/KafkaAvroPublisher.class */
public class KafkaAvroPublisher {
    private final Producer<byte[], byte[]> kafkaProducer;
    private String destTopic;
    private static final Logger LOG = LogManager.getLogger((Class<?>) KafkaAvroPublisher.class);
    private static final String HOSTNAME = OperatorUtil.getHostname();
    private static final SpecificDatumWriter<BrokerStats> avroEventWriter = new SpecificDatumWriter<>(BrokerStats.SCHEMA$);
    private static final EncoderFactory avroEncoderFactory = EncoderFactory.get();

    public KafkaAvroPublisher(String str, String str2, String str3) {
        this.destTopic = str2;
        Properties properties = new Properties();
        HashMap hashMap = new HashMap();
        if (str3 != null) {
            try {
                properties.load(new FileInputStream(str3));
                for (String str4 : properties.stringPropertyNames()) {
                    hashMap.put(str4, properties.get(str4));
                }
            } catch (IOException e) {
                LOG.error("Failed to load configuration file {}", str3, e);
            }
        }
        Properties createKafkaProducerProperties = OperatorUtil.createKafkaProducerProperties(str, hashMap.containsKey("security.protocol") ? (SecurityProtocol) Enum.valueOf(SecurityProtocol.class, hashMap.get("security.protocol").toString()) : SecurityProtocol.PLAINTEXT);
        for (Map.Entry entry : hashMap.entrySet()) {
            createKafkaProducerProperties.put(entry.getKey(), entry.getValue());
        }
        this.kafkaProducer = new KafkaProducer(createKafkaProducerProperties);
    }

    public void publish(BrokerStats brokerStats) throws IOException {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(byteArrayOutputStream, null);
            avroEventWriter.write(brokerStats, binaryEncoder);
            binaryEncoder.flush();
            IOUtils.closeQuietly(byteArrayOutputStream);
            this.kafkaProducer.send(new ProducerRecord<>(this.destTopic, Integer.valueOf(brokerStats.getId().intValue() % this.kafkaProducer.partitionsFor(this.destTopic).size()), (brokerStats.getName() + "_" + System.currentTimeMillis()).getBytes(), byteArrayOutputStream.toByteArray())).get();
            OpenTsdbMetricConverter.incr("kafka.stats.collector.success", 1, "host=" + HOSTNAME);
        } catch (Exception e) {
            LOG.error("Failure in publish stats", (Throwable) e);
            OpenTsdbMetricConverter.incr("kafka.stats.collector.failure", 1, "host=" + HOSTNAME);
            throw new RuntimeException("Avro serialization failure", e);
        }
    }

    public void close() {
        this.kafkaProducer.close();
    }
}
