package com.wu.framework.easy.stereotype.upsert.component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wu.framework.easy.stereotype.upsert.config.UpsertConfig;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

@ConditionalOnProperty(prefix = "spring.kafka", value = {"bootstrap-servers"})
/* loaded from: input_file:com/wu/framework/easy/stereotype/upsert/component/EasyUpsertExtractKafkaProducer.class */
public class EasyUpsertExtractKafkaProducer implements InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(EasyUpsertExtractKafkaProducer.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final KafkaProducer<String, String> kafkaProducer;
    private final UpsertConfig upsertConfig;

    public EasyUpsertExtractKafkaProducer(KafkaProperties kafkaProperties, UpsertConfig upsertConfig) {
        this.upsertConfig = upsertConfig;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        if (kafkaProperties.getBootstrapServers().size() == 1 && ((String) kafkaProperties.getBootstrapServers().get(0)).equals("localhost:9092")) {
            this.kafkaProducer = null;
        } else {
            this.kafkaProducer = new KafkaProducer<>(properties);
        }
    }

    public Future<RecordMetadata> sendAsync(String str, String str2, Object obj) {
        return sendAsync(str, str2, obj, null);
    }

    public Future<RecordMetadata> sendAsync(String str, String str2, Object obj, Callback callback) {
        return this.kafkaProducer.send(new ProducerRecord(str2, obj instanceof String ? (String) obj : toJsonString(obj)), callback);
    }

    public void sendSync(String str, String str2, Object obj) throws InterruptedException, ExecutionException, TimeoutException {
        sendSync(str, str2, obj, 2);
    }

    public void sendSync(String str, String str2, Object obj, int i) throws InterruptedException, ExecutionException, TimeoutException {
        sendAsync(str, str2, obj, null).get(i, TimeUnit.SECONDS);
    }

    private String toJsonString(Object obj) {
        try {
            return OBJECT_MAPPER.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("转换为Json字符串出错:%s" + e.toString());
        }
    }

    public void afterPropertiesSet() throws Exception {
        log.info(" init EasyUpsertExtractKafkaProducer success");
    }
}
