package com.expedia.www.haystack.agent.dispatcher;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.expedia.www.haystack.agent.core.Dispatcher;
import com.expedia.www.haystack.agent.core.config.ConfigurationHelpers;
import com.expedia.www.haystack.agent.core.metrics.SharedMetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.Validate;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/expedia/www/haystack/agent/dispatcher/KafkaDispatcher.class */
public class KafkaDispatcher implements Dispatcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDispatcher.class);
    private static final String PRODUCER_TOPIC = "producerTopic";
    private final Timer dispatchTimer = SharedMetricRegistry.newTimer("kafka.dispatch.timer");
    private final Meter dispatchFailure = SharedMetricRegistry.newMeter("kafka.dispatch.failure");
    private KafkaProducer<byte[], byte[]> producer;
    private String topic;

    public String getName() {
        return "kafka";
    }

    public void dispatch(byte[] bArr, byte[] bArr2) throws Exception {
        Timer.Context time = this.dispatchTimer.time();
        this.producer.send(new ProducerRecord(this.topic, bArr, bArr2), (recordMetadata, exc) -> {
            time.close();
            if (exc != null) {
                this.dispatchFailure.mark();
                LOGGER.error("Fail to produce the record to kafka with exception", exc);
            }
        });
    }

    public void initialize(Config config) {
        Validate.notNull(config.getString("bootstrap.servers"));
        setTopic(config.getString(PRODUCER_TOPIC));
        setKafkaProducer(new KafkaProducer<>(ConfigurationHelpers.generatePropertiesFromMap(ConfigurationHelpers.convertToPropertyMap(config)), new ByteArraySerializer(), new ByteArraySerializer()));
    }

    public void close() {
        LOGGER.info("Closing the kafka dispatcher now...");
        if (this.producer != null) {
            this.producer.flush();
            this.producer.close(10L, TimeUnit.SECONDS);
            this.producer = null;
        }
    }

    @VisibleForTesting
    void setKafkaProducer(KafkaProducer<byte[], byte[]> kafkaProducer) {
        this.producer = kafkaProducer;
    }

    @VisibleForTesting
    void setTopic(String str) {
        this.topic = str;
    }
}
