package org.redkalex.mq.kafka;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.redkale.mq.MessageAgent;
import org.redkale.mq.MessageCoder;
import org.redkale.mq.MessageProducer;
import org.redkale.mq.MessageRecord;

/* loaded from: input_file:org/redkalex/mq/kafka/KafkaMessageProducer.class */
public class KafkaMessageProducer extends MessageProducer implements Runnable {
    protected MessageAgent messageAgent;
    protected Properties config;
    protected Thread thread;
    protected CompletableFuture<Void> startFuture;
    protected KafkaProducer<String, MessageRecord> producer;
    protected final ConcurrentHashMap<String, Integer[]> partionsMap;
    protected int partitions;
    protected boolean reconnecting;
    protected final Object resumeLock;
    protected final boolean finest;
    protected final boolean finer;
    protected final boolean fine;

    /* loaded from: input_file:org/redkalex/mq/kafka/KafkaMessageProducer$MessageRecordSerializer.class */
    public static class MessageRecordSerializer implements Serializer<MessageRecord> {
        private final MessageCoder<MessageRecord> coder;

        public MessageRecordSerializer(MessageCoder<MessageRecord> messageCoder) {
            this.coder = messageCoder;
        }

        public byte[] serialize(String str, MessageRecord messageRecord) {
            return this.coder.encode(messageRecord);
        }
    }

    public KafkaMessageProducer(String str, MessageAgent messageAgent, String str2, int i, Properties properties) {
        super(str, messageAgent.getLogger());
        this.partionsMap = new ConcurrentHashMap<>();
        this.resumeLock = new Object();
        this.partitions = i;
        Objects.requireNonNull(messageAgent);
        this.messageAgent = messageAgent;
        Properties properties2 = new Properties();
        properties2.put("retries", 0);
        properties2.put("batch.size", 1024);
        properties2.put("linger.ms", 1);
        properties2.put("buffer.memory", 33554432);
        properties2.put("acks", "0");
        properties2.putAll(properties);
        properties2.put("bootstrap.servers", str2);
        this.config = properties2;
        this.finest = this.logger.isLoggable(Level.FINEST);
        this.finer = this.logger.isLoggable(Level.FINER);
        this.fine = this.logger.isLoggable(Level.FINE);
    }

    public void retryConnect() {
    }

    @Override // java.lang.Runnable
    public void run() {
        this.producer = new KafkaProducer<>(this.config, new StringSerializer(), new MessageRecordSerializer(this.messageAgent.getMessageCoder()));
        this.startFuture.complete(null);
        if (this.logger.isLoggable(Level.FINE)) {
            this.logger.log(Level.FINE, MessageProducer.class.getSimpleName() + "(name=" + this.name + ") startuped");
        }
    }

    public CompletableFuture<Void> apply(MessageRecord messageRecord) {
        if (this.closed) {
            throw new IllegalStateException(getClass().getSimpleName() + "(name=" + this.name + ") is closed when send " + messageRecord);
        }
        if (this.producer == null) {
            throw new IllegalStateException(getClass().getSimpleName() + "(name=" + this.name + ") not started when send " + messageRecord);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        Integer num = null;
        if (this.partitions > 0) {
            if (messageRecord.getGroupid() != null && !messageRecord.getGroupid().isEmpty()) {
                num = Integer.valueOf(Math.abs(messageRecord.getGroupid().hashCode()) % this.partitions);
            } else if (messageRecord.getUserid() != null) {
                num = Integer.valueOf(Math.abs(messageRecord.getUserid().hashCode()) % this.partitions);
            }
        }
        Integer num2 = num;
        this.producer.send(new ProducerRecord(messageRecord.getTopic(), num, (Object) null, messageRecord), (recordMetadata, exc) -> {
            if (exc != null) {
                completableFuture.completeExceptionally(exc);
            } else {
                completableFuture.complete(null);
            }
            long currentTimeMillis = System.currentTimeMillis() - messageRecord.getCreateTime();
            if (currentTimeMillis > 1000 && this.fine) {
                Logger logger = this.logger;
                logger.log(Level.FINE, "Kafka.producer (mqs.costs = " + currentTimeMillis + " ms)，partition=" + logger + ", msg=" + num2);
            } else if (currentTimeMillis > 100 && this.finer) {
                Logger logger2 = this.logger;
                logger2.log(Level.FINER, "Kafka.producer (mq.costs = " + currentTimeMillis + " ms)，partition=" + logger2 + ", msg=" + num2);
            } else if (this.finest) {
                Logger logger3 = this.logger;
                logger3.log(Level.FINEST, "Kafka.producer (mq.cost = " + currentTimeMillis + " ms)，partition=" + logger3 + ", msg=" + num2);
            }
        });
        return completableFuture;
    }

    protected Integer[] loadTopicPartition(String str) {
        return this.partionsMap.computeIfAbsent(str, str2 -> {
            try {
                List partitions = ((TopicDescription) ((KafkaFuture) ((KafkaMessageAgent) this.messageAgent).adminClient.describeTopics(Arrays.asList(str2)).values().get(str2)).get(6L, TimeUnit.SECONDS)).partitions();
                Integer[] numArr = new Integer[partitions.size()];
                for (int i = 0; i < numArr.length; i++) {
                    numArr[i] = Integer.valueOf(((TopicPartitionInfo) partitions.get(i)).partition());
                }
                Arrays.sort(numArr);
                if (this.logger.isLoggable(Level.FINER)) {
                    this.logger.log(Level.FINER, "Topic(" + str2 + ") load partitions = " + partitions);
                }
                return numArr;
            } catch (Exception e) {
                this.logger.log(Level.SEVERE, "Topic(" + str2 + ")  load partitions error", (Throwable) e);
                return new Integer[0];
            }
        });
    }

    public synchronized CompletableFuture<Void> startup() {
        if (this.startFuture != null) {
            return this.startFuture;
        }
        this.thread = new Thread(this);
        this.thread.setName("MQ-Producer-Thread");
        this.startFuture = new CompletableFuture<>();
        if (this.logger.isLoggable(Level.FINE)) {
            this.logger.log(Level.FINE, MessageProducer.class.getSimpleName() + " [" + this.name + "] startuping");
        }
        this.thread.start();
        return this.startFuture;
    }

    public synchronized CompletableFuture<Void> shutdown() {
        if (!this.closed) {
            return CompletableFuture.completedFuture(null);
        }
        this.closed = true;
        if (this.logger.isLoggable(Level.FINE)) {
            this.logger.log(Level.FINE, MessageProducer.class.getSimpleName() + " [" + this.name + "] shutdowning");
        }
        if (this.producer != null) {
            this.producer.close();
        }
        if (this.logger.isLoggable(Level.FINE)) {
            this.logger.log(Level.FINE, MessageProducer.class.getSimpleName() + " [" + this.name + "] shutdowned");
        }
        return CompletableFuture.completedFuture(null);
    }
}
