package org.redkalex.mq.kafka;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.redkale.mq.spi.MessageAgent;
import org.redkale.mq.spi.MessageClientProducer;
import org.redkale.mq.spi.MessageCoder;
import org.redkale.mq.spi.MessageRecord;
import org.redkale.util.Traces;

/* loaded from: input_file:org/redkalex/mq/kafka/KafkaMessageClientProducer.class */
public class KafkaMessageClientProducer extends MessageClientProducer {
    private final AtomicBoolean closed;
    private MessageAgent messageAgent;
    protected Properties config;
    private KafkaProducer<String, MessageRecord> producer;
    protected final ConcurrentHashMap<String, Integer[]> partionsMap;
    private int partitions;

    /* loaded from: input_file:org/redkalex/mq/kafka/KafkaMessageClientProducer$MessageRecordSerializer.class */
    public static class MessageRecordSerializer implements Serializer<MessageRecord> {
        private final MessageCoder<MessageRecord> coder;

        public MessageRecordSerializer(MessageCoder<MessageRecord> messageCoder) {
            this.coder = messageCoder;
        }

        public byte[] serialize(String str, MessageRecord messageRecord) {
            return this.coder.encode(messageRecord);
        }
    }

    public KafkaMessageClientProducer(KafkaMessageAgent kafkaMessageAgent, String str, int i) {
        super(str);
        this.closed = new AtomicBoolean();
        this.partionsMap = new ConcurrentHashMap<>();
        this.partitions = i;
        Objects.requireNonNull(kafkaMessageAgent);
        this.messageAgent = kafkaMessageAgent;
        this.config = kafkaMessageAgent.createProducerProperties();
        this.producer = new KafkaProducer<>(this.config, new StringSerializer(), new MessageRecordSerializer(kafkaMessageAgent.getMessageRecordCoder()));
        this.logger.log(Level.INFO, getClass().getSimpleName() + "(mq=" + kafkaMessageAgent.getName() + "， name=" + str + ") started");
    }

    public CompletableFuture<Void> apply(MessageRecord messageRecord) {
        if (this.closed.get()) {
            throw new IllegalStateException(getClass().getSimpleName() + "(name=" + this.name + ") is closed when send " + messageRecord);
        }
        if (this.producer == null) {
            throw new IllegalStateException(getClass().getSimpleName() + "(name=" + this.name + ") not started when send " + messageRecord);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        Integer num = null;
        if (this.partitions > 0) {
            if (messageRecord.getGroupid() != null && !messageRecord.getGroupid().isEmpty()) {
                num = Integer.valueOf(Math.abs(messageRecord.getGroupid().hashCode()) % this.partitions);
            } else if (messageRecord.getUserid() != null) {
                num = Integer.valueOf(Math.abs(messageRecord.getUserid().hashCode()) % this.partitions);
            }
        }
        Integer num2 = num;
        String traceid = messageRecord.getTraceid();
        this.producer.send(new ProducerRecord(messageRecord.getTopic(), num, (Object) null, messageRecord), (recordMetadata, exc) -> {
            Traces.computeIfAbsent(traceid);
            if (exc != null) {
                this.messageAgent.execute(() -> {
                    Traces.computeIfAbsent(traceid);
                    completableFuture.completeExceptionally(exc);
                    Traces.removeTraceid();
                });
            } else {
                this.messageAgent.execute(() -> {
                    Traces.computeIfAbsent(traceid);
                    completableFuture.complete(null);
                    Traces.removeTraceid();
                });
            }
            long currentTimeMillis = System.currentTimeMillis() - messageRecord.getCreateTime();
            if (currentTimeMillis > 1000 && this.logger.isLoggable(Level.FINE)) {
                Logger logger = this.logger;
                logger.log(Level.FINE, getClass().getSimpleName() + "(name=" + this.name + ") (mq.cost-slower = " + currentTimeMillis + " ms)，partition=" + logger + ", msg=" + num2);
            } else if (currentTimeMillis > 100 && this.logger.isLoggable(Level.FINER)) {
                Logger logger2 = this.logger;
                logger2.log(Level.FINER, getClass().getSimpleName() + "(name=" + this.name + ") (mq.cost-slowly = " + currentTimeMillis + " ms)，partition=" + logger2 + ", msg=" + num2);
            } else if (currentTimeMillis > 10 && this.logger.isLoggable(Level.FINEST)) {
                Logger logger3 = this.logger;
                logger3.log(Level.FINEST, getClass().getSimpleName() + "(name=" + this.name + ") (mq.cost-normal = " + currentTimeMillis + " ms)，partition=" + logger3 + ", msg=" + num2);
            }
            Traces.removeTraceid();
        });
        return completableFuture;
    }

    protected Integer[] loadTopicPartition(String str) {
        return this.partionsMap.computeIfAbsent(str, str2 -> {
            try {
                List partitions = ((TopicDescription) ((KafkaFuture) ((KafkaMessageAgent) this.messageAgent).adminClient.describeTopics(Arrays.asList(str2)).topicNameValues().get(str2)).get(6L, TimeUnit.SECONDS)).partitions();
                Integer[] numArr = new Integer[partitions.size()];
                for (int i = 0; i < numArr.length; i++) {
                    numArr[i] = Integer.valueOf(((TopicPartitionInfo) partitions.get(i)).partition());
                }
                Arrays.sort(numArr);
                if (this.logger.isLoggable(Level.FINER)) {
                    this.logger.log(Level.FINER, getClass().getSimpleName() + "(name=" + this.name + ") Topic(" + str2 + ") load partitions = " + partitions);
                }
                return numArr;
            } catch (Exception e) {
                this.logger.log(Level.SEVERE, getClass().getSimpleName() + "(name=" + this.name + ") Topic(" + str2 + ")  load partitions error", (Throwable) e);
                return new Integer[0];
            }
        });
    }

    public void stop() {
        if (this.closed.compareAndSet(false, true)) {
            this.logger.log(Level.INFO, getClass().getSimpleName() + "(name=" + this.name + ") closing");
            if (this.producer != null) {
                this.producer.close();
            }
            this.logger.log(Level.INFO, getClass().getSimpleName() + "(name=" + this.name + ") closed");
        }
    }
}
