package org.redkalex.mq.kafka;

import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.redkale.convert.Convert;
import org.redkale.mq.MessageProducer;
import org.redkale.util.Traces;

/* loaded from: input_file:org/redkalex/mq/kafka/KafkaMessageProducer.class */
public class KafkaMessageProducer implements MessageProducer {
    protected final Logger logger = Logger.getLogger(getClass().getSimpleName());
    private final AtomicBoolean closed = new AtomicBoolean();
    private KafkaMessageAgent messageAgent;
    protected Properties config;
    private KafkaProducer<String, byte[]> producer;

    public KafkaMessageProducer(KafkaMessageAgent kafkaMessageAgent) {
        Objects.requireNonNull(kafkaMessageAgent);
        this.messageAgent = kafkaMessageAgent;
        this.producer = new KafkaProducer<>(kafkaMessageAgent.createProducerProperties(), new StringSerializer(), new ByteArraySerializer());
        if (this.logger.isLoggable(Level.INFO)) {
            this.logger.log(Level.INFO, getClass().getSimpleName() + "(name=" + kafkaMessageAgent.getName() + ") started");
        }
    }

    public CompletableFuture<Void> sendMessage(String str, Integer num, Convert convert, Type type, Object obj) {
        if (this.closed.get()) {
            throw new IllegalStateException(getClass().getSimpleName() + "(name=" + this.messageAgent.getName() + ") is closed when send " + obj);
        }
        if (this.producer == null) {
            throw new IllegalStateException(getClass().getSimpleName() + "(name=" + this.messageAgent.getName() + ") not started when send " + obj);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        long currentTimeMillis = System.currentTimeMillis();
        String currentTraceid = Traces.currentTraceid();
        this.producer.send(new ProducerRecord(str, num, currentTraceid, convertMessage(convert, type, obj)), (recordMetadata, exc) -> {
            Traces.computeIfAbsent(currentTraceid);
            if (exc != null) {
                this.messageAgent.execute(() -> {
                    Traces.computeIfAbsent(currentTraceid);
                    completableFuture.completeExceptionally(exc);
                    Traces.removeTraceid();
                });
            } else {
                this.messageAgent.execute(() -> {
                    Traces.computeIfAbsent(currentTraceid);
                    completableFuture.complete(null);
                    Traces.removeTraceid();
                });
            }
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (currentTimeMillis2 > 1000 && this.logger.isLoggable(Level.FINE)) {
                Logger logger = this.logger;
                logger.log(Level.FINE, getClass().getSimpleName() + "(name=" + this.messageAgent.getName() + ") (mq.cost-slower = " + currentTimeMillis2 + " ms)，partition=" + logger + ", msg=" + num);
            } else if (currentTimeMillis2 > 100 && this.logger.isLoggable(Level.FINER)) {
                Logger logger2 = this.logger;
                logger2.log(Level.FINER, getClass().getSimpleName() + "(name=" + this.messageAgent.getName() + ") (mq.cost-slowly = " + currentTimeMillis2 + " ms)，partition=" + logger2 + ", msg=" + num);
            } else if (currentTimeMillis2 > 10 && this.logger.isLoggable(Level.FINEST)) {
                Logger logger3 = this.logger;
                logger3.log(Level.FINEST, getClass().getSimpleName() + "(name=" + this.messageAgent.getName() + ") (mq.cost-normal = " + currentTimeMillis2 + " ms)，partition=" + logger3 + ", msg=" + num);
            }
            Traces.removeTraceid();
        });
        return completableFuture;
    }

    protected byte[] convertMessage(Convert convert, Type type, Object obj) {
        return obj instanceof byte[] ? (byte[]) obj : obj instanceof CharSequence ? obj.toString().getBytes(StandardCharsets.UTF_8) : type == null ? convert.convertToBytes(obj) : convert.convertToBytes(type, obj);
    }

    public void stop() {
        if (this.closed.compareAndSet(false, true)) {
            this.logger.log(Level.INFO, getClass().getSimpleName() + "(name=" + this.messageAgent.getName() + ") closing");
            if (this.producer != null) {
                this.producer.close();
            }
            this.logger.log(Level.FINE, getClass().getSimpleName() + "(name=" + this.messageAgent.getName() + ") closed");
        }
    }
}
