package com.github.ideahut.kafka;

import com.github.ideahut.queue.QueueHeader;
import com.github.ideahut.queue.QueueMessage;
import com.github.ideahut.queue.QueueSender;
import com.github.ideahut.task.TaskHandler;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ideahut/kafka/KafkaSender.class */
public class KafkaSender<K, V> implements QueueSender<K, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSender.class);
    private final Serializer<K> keySerializer;
    private final Serializer<V> valueSerializer;
    private final KafkaClient kafkaClient;
    private final KafkaProducer<K, V> kafkaProducer;
    private final TaskHandler taskHandler;
    private final Map<String, Object> producerConfigs;
    private List<NewTopic> topics;
    private boolean allowSetupTopic = false;

    public KafkaSender(Class<K> cls, Class<V> cls2, KafkaClient kafkaClient, Map<String, Object> map, TaskHandler taskHandler) throws Exception {
        this.keySerializer = KafkaHelper.getDefaultSerializer(cls);
        if (this.keySerializer == null) {
            throw new Exception("KeySerializer is null");
        }
        this.valueSerializer = KafkaHelper.getDefaultSerializer(cls2);
        if (this.valueSerializer == null) {
            throw new Exception("ValueSerializer is null");
        }
        Map hashMap = map != null ? new HashMap(map) : new HashMap();
        hashMap.put("key.serializer", this.keySerializer.getClass().getName());
        hashMap.put("value.serializer", this.valueSerializer.getClass().getName());
        this.kafkaClient = kafkaClient;
        this.producerConfigs = Collections.unmodifiableMap(hashMap);
        this.kafkaProducer = new KafkaProducer<>(hashMap);
        this.taskHandler = taskHandler;
    }

    public Serializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    public Serializer<V> getValueSerializer() {
        return this.valueSerializer;
    }

    public List<NewTopic> getTopics() {
        return this.topics;
    }

    public void setTopics(List<NewTopic> list) {
        this.topics = list;
    }

    public boolean isAllowSetupTopic() {
        return this.allowSetupTopic;
    }

    public void setAllowSetupTopic(boolean z) {
        this.allowSetupTopic = z;
    }

    public TaskHandler getTaskHandler() {
        return this.taskHandler;
    }

    public KafkaClient getKafkaClient() {
        return this.kafkaClient;
    }

    public Map<String, Object> getProducerConfigs() {
        return this.producerConfigs;
    }

    @Override // com.github.ideahut.queue.QueueSender
    public void sendMessage(QueueMessage<K, V> queueMessage) {
        if (this.allowSetupTopic) {
            KafkaHelper.setupTopic(LOGGER, this.kafkaClient, this.topics);
        }
        QueueHeader header = queueMessage.getHeader();
        RecordHeaders recordHeaders = null;
        if (header != null) {
            recordHeaders = new RecordHeaders();
            for (String str : header.keySet()) {
                String str2 = (String) header.get(str);
                if (str2 != null) {
                    recordHeaders.add(str, str2.getBytes());
                }
            }
        }
        final ProducerRecord producerRecord = new ProducerRecord(this.topics.get(0).name(), (Integer) null, queueMessage.getKey(), queueMessage.getValue(), recordHeaders);
        this.kafkaProducer.send(producerRecord, new Callback() { // from class: com.github.ideahut.kafka.KafkaSender.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc != null) {
                    KafkaSender.LOGGER.error("### Message {} failed on topic={}, partition={}, offset={}: {}", new Object[]{producerRecord.value(), recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset()), exc});
                } else {
                    KafkaSender.LOGGER.debug("### Message {} written on topic={}, partition={}, offset={}: {}", new Object[]{producerRecord.value(), recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset()), exc});
                }
            }
        });
    }

    @Override // com.github.ideahut.queue.QueueSender
    public void sendMessage(final QueueMessage<K, V> queueMessage, boolean z) {
        if (!z || this.taskHandler == null) {
            sendMessage(queueMessage);
        } else {
            this.taskHandler.execute(new Runnable() { // from class: com.github.ideahut.kafka.KafkaSender.2
                @Override // java.lang.Runnable
                public void run() {
                    KafkaSender.this.sendMessage(queueMessage);
                }
            });
        }
    }
}
