package org.redkalex.mq.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicListing;
import org.redkale.mq.MessageAgent;
import org.redkale.mq.MessageConsumer;
import org.redkale.mq.MessageProcessor;
import org.redkale.mq.MessageProducer;
import org.redkale.util.AnyValue;
import org.redkale.util.Utility;

/* loaded from: input_file:org/redkalex/mq/kafka/KafkaMessageAgent.class */
public class KafkaMessageAgent extends MessageAgent {
    protected String servers;
    protected int checkIntervals = 10;
    protected Properties consumerConfig = new Properties();
    protected Properties producerConfig = new Properties();
    protected AdminClient adminClient;
    protected int partitions;
    protected ScheduledFuture reconnectFuture;
    protected boolean reconnecting;

    public void init(AnyValue anyValue) {
        super.init(anyValue);
        this.servers = anyValue.getAnyValue("servers").getValue("value");
        this.checkIntervals = anyValue.getAnyValue("servers").getIntValue("checkintervals", 10);
        AnyValue anyValue2 = anyValue.getAnyValue("consumer");
        if (anyValue2 != null) {
            for (AnyValue anyValue3 : anyValue2.getAnyValues("property")) {
                this.consumerConfig.put(anyValue3.getValue("name"), anyValue3.getValue("value"));
            }
        }
        AnyValue anyValue4 = anyValue.getAnyValue("producer");
        if (anyValue4 != null) {
            this.partitions = anyValue4.getIntValue("partitions", 0);
            for (AnyValue anyValue5 : anyValue4.getAnyValues("property")) {
                this.producerConfig.put(anyValue5.getValue("name"), anyValue5.getValue("value"));
            }
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.servers);
        this.adminClient = KafkaAdminClient.create(properties);
    }

    public void destroy(AnyValue anyValue) {
        super.destroy(anyValue);
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }

    public synchronized void startReconnect() {
        if (this.reconnecting) {
            return;
        }
        this.reconnectFuture = this.timeoutExecutor.scheduleAtFixedRate(() -> {
            retryConnect();
        }, 0L, this.checkIntervals, TimeUnit.SECONDS);
    }

    private void retryConnect() {
        if (this.adminClient != null) {
            this.adminClient.close();
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.servers);
        this.adminClient = KafkaAdminClient.create(properties);
        if (queryTopic() != null) {
            this.logger.log(Level.INFO, getClass().getSimpleName() + " resume connect");
            this.reconnecting = false;
            if (this.reconnectFuture != null) {
                this.reconnectFuture.cancel(true);
                this.reconnectFuture = null;
            }
            getAllMessageConsumer().forEach(messageConsumer -> {
                ((KafkaMessageConsumer) messageConsumer).retryConnect();
            });
            getAllMessageProducer().forEach(messageProducer -> {
                ((KafkaMessageProducer) messageProducer).retryConnect();
            });
        }
    }

    public int getCheckIntervals() {
        return this.checkIntervals;
    }

    public ScheduledThreadPoolExecutor getTimeoutExecutor() {
        return this.timeoutExecutor;
    }

    public boolean match(AnyValue anyValue) {
        AnyValue anyValue2;
        return (anyValue == null || (anyValue2 = anyValue.getAnyValue("servers")) == null || anyValue2.getValue("value") == null || anyValue2.getValue("value").contains("pulsar")) ? false : true;
    }

    public boolean createTopic(String... strArr) {
        if (strArr == null || strArr.length < 1) {
            return true;
        }
        try {
            ArrayList arrayList = new ArrayList(strArr.length);
            for (String str : strArr) {
                arrayList.add(new NewTopic(str, Optional.empty(), Optional.empty()));
            }
            this.adminClient.createTopics(arrayList, new CreateTopicsOptions().timeoutMs(3000)).all().get(3L, TimeUnit.SECONDS);
            return true;
        } catch (Exception e) {
            this.logger.log(Level.SEVERE, "createTopic error: " + Arrays.toString(strArr), (Throwable) e);
            return false;
        }
    }

    public boolean deleteTopic(String... strArr) {
        if (strArr == null || strArr.length < 1) {
            return true;
        }
        try {
            this.adminClient.deleteTopics(Utility.ofList(strArr), new DeleteTopicsOptions().timeoutMs(3000)).all().get(3L, TimeUnit.SECONDS);
            return true;
        } catch (Exception e) {
            this.logger.log(Level.SEVERE, "deleteTopic error: " + Arrays.toString(strArr), (Throwable) e);
            return false;
        }
    }

    public List<String> queryTopic() {
        try {
            Collection<TopicListing> collection = (Collection) this.adminClient.listTopics(new ListTopicsOptions().timeoutMs(3000)).listings().get(3L, TimeUnit.SECONDS);
            ArrayList arrayList = new ArrayList(collection.size());
            for (TopicListing topicListing : collection) {
                if (!topicListing.isInternal()) {
                    arrayList.add(topicListing.name());
                }
            }
            return arrayList;
        } catch (Exception e) {
            this.logger.log(Level.SEVERE, "queryTopic error ", (Throwable) e);
            return null;
        }
    }

    public MessageConsumer createConsumer(String[] strArr, String str, MessageProcessor messageProcessor) {
        return new KafkaMessageConsumer(this, strArr, str, messageProcessor, this.servers, this.consumerConfig);
    }

    protected MessageProducer createProducer(String str) {
        return new KafkaMessageProducer(str, this, this.servers, this.partitions, this.producerConfig);
    }
}
