package org.redkalex.mq.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicListing;
import org.redkale.annotation.ResourceChanged;
import org.redkale.inject.ResourceEvent;
import org.redkale.mq.spi.MessageAgent;
import org.redkale.mq.spi.MessageClientConsumer;
import org.redkale.mq.spi.MessageClientProducer;
import org.redkale.util.AnyValue;
import org.redkale.util.RedkaleException;
import org.redkale.util.Utility;

/* loaded from: input_file:org/redkalex/mq/kafka/KafkaMessageAgent.class */
public class KafkaMessageAgent extends MessageAgent {
    protected String servers;
    protected String controllers;
    protected AdminClient adminClient;
    protected int partitions;
    protected MessageClientConsumer httpMessageClientConsumer;
    protected MessageClientConsumer sncpMessageClientConsumer;
    protected int checkIntervals = 10;
    protected Properties adminConfig = new Properties();
    protected Properties consumerConfig = new Properties();
    protected Properties producerConfig = new Properties();
    private final List<KafkaMessageConsumer> kafkaConsumers = new ArrayList();

    public void init(AnyValue anyValue) {
        super.init(anyValue);
        this.servers = anyValue.getAnyValue("servers").getValue("value");
        AnyValue anyValue2 = anyValue.getAnyValue("controllers");
        this.controllers = anyValue2 == null ? this.servers : anyValue2.getValue("value", this.servers);
        this.checkIntervals = anyValue.getAnyValue("servers").getIntValue("checkIntervals", 10);
        AnyValue[] anyValues = anyValue.getAnyValues("config");
        if (anyValues != null) {
            for (AnyValue anyValue3 : anyValues) {
                if ("consumer".equals(anyValue3.getValue("type"))) {
                    for (AnyValue anyValue4 : anyValue3.getAnyValues("property")) {
                        this.consumerConfig.put(anyValue4.getValue("name"), anyValue4.getValue("value"));
                    }
                } else if ("producer".equals(anyValue3.getValue("type"))) {
                    this.partitions = anyValue3.getIntValue("partitions", 0);
                    for (AnyValue anyValue5 : anyValue3.getAnyValues("property")) {
                        this.producerConfig.put(anyValue5.getValue("name"), anyValue5.getValue("value"));
                    }
                } else if ("admin".equals(anyValue3.getValue("type"))) {
                    for (AnyValue anyValue6 : anyValue3.getAnyValues("property")) {
                        this.adminConfig.put(anyValue6.getValue("name"), anyValue6.getValue("value"));
                    }
                }
            }
        }
        this.adminClient = KafkaAdminClient.create(createAdminProperties());
        try {
            long currentTimeMillis = System.currentTimeMillis();
            this.adminClient.listConsumerGroups().errors().get(6L, TimeUnit.SECONDS);
            this.logger.log(Level.INFO, "KafkaMessageAgent ping cost " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
        } catch (Exception e) {
            throw new RedkaleException("KafkaMessageAgent controllers: " + this.controllers, e);
        }
    }

    @ResourceChanged
    public void onResourceChange(ResourceEvent[] resourceEventArr) {
        StringBuilder sb = new StringBuilder();
        for (ResourceEvent resourceEvent : resourceEventArr) {
            sb.append(KafkaMessageAgent.class.getSimpleName()).append(" skip change '").append(resourceEvent.name()).append("' to '").append(resourceEvent.coverNewValue()).append("'\r\n");
        }
        if (sb.length() > 0) {
            this.logger.log(Level.INFO, sb.toString());
        }
    }

    public void destroy(AnyValue anyValue) {
        super.destroy(anyValue);
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }

    protected Properties createAdminProperties() {
        Properties properties = new Properties();
        properties.put("request.timeout.ms", "6000");
        properties.put("default.api.timeout.ms", "6000");
        properties.putAll(this.adminConfig);
        properties.put("bootstrap.servers", this.controllers);
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Properties createConsumerProperties(String str) {
        Properties properties = new Properties();
        if (str != null) {
            properties.put("group.id", str);
        }
        properties.put("auto.offset.reset", "latest");
        properties.put("reconnect.backoff.ms", "1000");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("enable.auto.commit", "true");
        properties.put("request.timeout.ms", "6000");
        properties.put("default.api.timeout.ms", "6000");
        properties.putAll(this.consumerConfig);
        properties.put("bootstrap.servers", this.servers);
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Properties createProducerProperties() {
        Properties properties = new Properties();
        properties.put("retries", 0);
        properties.put("batch.size", 1024);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("acks", "1");
        properties.put("request.timeout.ms", "6000");
        properties.put("default.api.timeout.ms", "6000");
        properties.putAll(this.producerConfig);
        properties.put("bootstrap.servers", this.servers);
        return properties;
    }

    public int getCheckIntervals() {
        return this.checkIntervals;
    }

    public ScheduledThreadPoolExecutor getTimeoutExecutor() {
        return this.timeoutExecutor;
    }

    public AdminClient getAdminClient() {
        return this.adminClient;
    }

    public Properties getConsumerConfig() {
        return this.consumerConfig;
    }

    public Properties getProducerConfig() {
        return this.producerConfig;
    }

    public int getPartitions() {
        return this.partitions;
    }

    public MessageClientConsumer getHttpMessageClientConsumer() {
        return this.httpMessageClientConsumer;
    }

    public MessageClientConsumer getSncpMessageClientConsumer() {
        return this.sncpMessageClientConsumer;
    }

    public boolean acceptsConf(AnyValue anyValue) {
        if (anyValue == null) {
            return false;
        }
        if ("kafka".equalsIgnoreCase(anyValue.getValue("type"))) {
            return true;
        }
        AnyValue anyValue2 = anyValue.getAnyValue("servers");
        return (anyValue2 == null || anyValue2.getValue("value") == null || anyValue2.getValue("value").contains("pulsar")) ? false : true;
    }

    public CompletableFuture<Void> createTopic(String... strArr) {
        if (Utility.isEmpty(strArr)) {
            return CompletableFuture.completedFuture(null);
        }
        try {
            ArrayList arrayList = new ArrayList(strArr.length);
            for (String str : strArr) {
                arrayList.add(new NewTopic(str, Optional.empty(), Optional.empty()));
            }
            return (CompletableFuture) this.adminClient.createTopics(arrayList, new CreateTopicsOptions().timeoutMs(3000)).all().toCompletionStage();
        } catch (Exception e) {
            this.logger.log(Level.SEVERE, "createTopic error: " + Arrays.toString(strArr), (Throwable) e);
            return CompletableFuture.failedFuture(e);
        }
    }

    public CompletableFuture<Void> deleteTopic(String... strArr) {
        if (Utility.isEmpty(strArr)) {
            return CompletableFuture.completedFuture(null);
        }
        try {
            return (CompletableFuture) this.adminClient.deleteTopics(Utility.ofList(strArr), new DeleteTopicsOptions().timeoutMs(3000)).all().toCompletionStage();
        } catch (Exception e) {
            this.logger.log(Level.SEVERE, "deleteTopic error: " + Arrays.toString(strArr), (Throwable) e);
            return CompletableFuture.failedFuture(e);
        }
    }

    public CompletableFuture<List<String>> queryTopic() {
        try {
            return (CompletableFuture) this.adminClient.listTopics(new ListTopicsOptions().timeoutMs(3000)).listings().thenApply(collection -> {
                ArrayList arrayList = new ArrayList(collection.size());
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    TopicListing topicListing = (TopicListing) it.next();
                    if (!topicListing.isInternal()) {
                        arrayList.add(topicListing.name());
                    }
                }
                return arrayList;
            }).toCompletionStage();
        } catch (Exception e) {
            this.logger.log(Level.SEVERE, "queryTopic error ", (Throwable) e);
            return CompletableFuture.failedFuture(e);
        }
    }

    protected void startMessageClientConsumer() {
        if (!this.httpMessageClient.isEmpty()) {
            this.httpMessageClientConsumer = new KafkaMessageClientConsumer(this, this.httpMessageClient);
            this.httpMessageClientConsumer.start();
        }
        if (this.sncpMessageClient.isEmpty()) {
            return;
        }
        this.sncpMessageClientConsumer = new KafkaMessageClientConsumer(this, this.sncpMessageClient);
        this.sncpMessageClientConsumer.start();
    }

    protected void stopMessageClientConsumer() {
        if (this.httpMessageClientConsumer != null) {
            this.httpMessageClientConsumer.stop();
        }
        if (this.sncpMessageClientConsumer != null) {
            this.sncpMessageClientConsumer.stop();
        }
    }

    protected MessageClientProducer startMessageClientProducer() {
        return new KafkaMessageClientProducer(this, "redkale-message", this.partitions);
    }

    protected void startMessageProducer() {
        if (this.messageBaseProducer == null) {
            this.messageBaseProducer = new KafkaMessageProducer(this);
        }
    }

    protected void stopMessageProducer() {
        ((KafkaMessageProducer) this.messageBaseProducer).stop();
        this.messageBaseProducer = null;
    }

    protected void startMessageConsumer() {
        ArrayList arrayList = new ArrayList();
        this.messageConsumerMap.forEach((str, map) -> {
            arrayList.add(new KafkaMessageConsumer(this, str, map));
        });
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((KafkaMessageConsumer) it.next()).start();
        }
        this.kafkaConsumers.addAll(arrayList);
    }

    protected void stopMessageConsumer() {
        Iterator<KafkaMessageConsumer> it = this.kafkaConsumers.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }
}
