package ru.stm.rpc.kafkaredis.topic;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.KafkaException;
import org.springframework.util.StringUtils;
import ru.stm.platform.StmExecutionError;
import ru.stm.rpc.kafkaredis.config.KafkaRedisRpcProperties;
import ru.stm.rpc.kafkaredis.service.RpcNamespace;
import ru.stm.rpc.kafkaredis.service.RpcTopic;
import ru.stm.rpc.kafkaredis.topic.KafkaTopicState;

/* loaded from: input_file:ru/stm/rpc/kafkaredis/topic/KafkaEnsureTopicHelper.class */
public class KafkaEnsureTopicHelper {
    private static final int KAFKA_NUM_PARTITIONS = 12;
    private static final Logger log = LoggerFactory.getLogger(KafkaEnsureTopicHelper.class);
    private static final int DEFAULT_OPERATION_TIMEOUT = 30;
    private static int operationTimeout = DEFAULT_OPERATION_TIMEOUT;

    public static void handleTopics(KafkaRedisRpcProperties kafkaRedisRpcProperties, Map<String, Collection<KafkaTopicState>> map) {
        kafkaRedisRpcProperties.getNamespace().forEach((str, kafkaRedisRpcItem) -> {
            if (kafkaRedisRpcItem.getProducer() != null && kafkaRedisRpcItem.getProducer().getKafka() != null && !StringUtils.isEmpty(kafkaRedisRpcItem.getProducer().getKafka().getBootstrapServers())) {
                createTopicsIfNeeded(str, (Collection) map.get(str), kafkaRedisRpcItem.getProducer().getKafka().getBootstrapServers());
            }
            if (kafkaRedisRpcItem.getConsumer() == null || kafkaRedisRpcItem.getConsumer().getKafka() == null || StringUtils.isEmpty(kafkaRedisRpcItem.getConsumer().getKafka().getBootstrapServers())) {
                return;
            }
            createTopicsIfNeeded(str, (Collection) map.get(str), kafkaRedisRpcItem.getConsumer().getKafka().getBootstrapServers());
        });
    }

    public static void handleTopics(KafkaRedisRpcProperties kafkaRedisRpcProperties, String str, Collection<KafkaTopicState> collection) {
        handleTopics(kafkaRedisRpcProperties, Collections.singletonMap(str, new ArrayList(collection)));
    }

    public static void handleTopics(Collection<RpcNamespace> collection) {
        collection.forEach(rpcNamespace -> {
            if (rpcNamespace.hasProducer()) {
                createTopicsIfNeeded(rpcNamespace.getName(), createTopicStates(rpcNamespace.topics()), rpcNamespace.getProducerServers());
            }
            if (rpcNamespace.hasConsumer()) {
                if (rpcNamespace.hasProducer() && rpcNamespace.getProducerServers().equals(rpcNamespace.getConsumerServers())) {
                    return;
                }
                createTopicsIfNeeded(rpcNamespace.getName(), createTopicStates(rpcNamespace.topics()), rpcNamespace.getConsumerServers());
            }
        });
    }

    private static Collection<KafkaTopicState> createTopicStates(Collection<RpcTopic> collection) {
        return (Collection) collection.stream().map(rpcTopic -> {
            return rpcTopic.isTransactional() ? KafkaTopicState.transactional(rpcTopic.getTopic()) : KafkaTopicState.standard(rpcTopic.getTopic());
        }).collect(Collectors.toList());
    }

    private static void createTopicsIfNeeded(String str, Collection<KafkaTopicState> collection, String str2) {
        if (collection == null) {
            return;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", str2);
        List list = (List) collection.stream().map(kafkaTopicState -> {
            short s = (short) (str2.split(",").length >= 2 ? 3 : 1);
            if (!kafkaTopicState.getType().equals(KafkaTopicState.KafkaTopicType.TRANSACTIONAL_KAFKA) && !kafkaTopicState.getType().equals(KafkaTopicState.KafkaTopicType.SIMPLE_KAFKA)) {
                log.error("Invalid Kafka topic config {}", kafkaTopicState);
                throw new StmExecutionError("Invalid Kafka Topic");
            }
            return new NewTopic(kafkaTopicState.getTopicName(), KAFKA_NUM_PARTITIONS, s);
        }).collect(Collectors.toList());
        log.debug("RPC kafka topics NS={}, topics={}, bootstrap={}", new Object[]{str, collection, str2});
        AdminClient create = AdminClient.create(hashMap);
        addTopicsIfNeeded(create, list, new MutableInt(0));
        create.close();
    }

    private static void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> collection, MutableInt mutableInt) {
        if (collection.size() > 0) {
            HashMap hashMap = new HashMap();
            collection.forEach(newTopic -> {
                hashMap.compute(newTopic.name(), (str, newTopic) -> {
                    return newTopic;
                });
            });
            DescribeTopicsResult describeTopics = adminClient.describeTopics((Collection) collection.stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toList()));
            ArrayList arrayList = new ArrayList();
            HashMap hashMap2 = new HashMap();
            describeTopics.values().forEach((str, kafkaFuture) -> {
                NewTopic newTopic2 = (NewTopic) hashMap.get(str);
                try {
                    TopicDescription topicDescription = (TopicDescription) kafkaFuture.get(operationTimeout, TimeUnit.SECONDS);
                    if (newTopic2.numPartitions() < topicDescription.partitions().size()) {
                        if (log.isInfoEnabled()) {
                            log.info(String.format("Topic '%s' exists but has a different partition count: %d not %d", str, Integer.valueOf(topicDescription.partitions().size()), Integer.valueOf(newTopic2.numPartitions())));
                        }
                    } else if (newTopic2.numPartitions() > topicDescription.partitions().size()) {
                        if (log.isInfoEnabled()) {
                            log.info(String.format("Topic '%s' exists but has a different partition count: %d not %d, increasing if the broker supports it", str, Integer.valueOf(topicDescription.partitions().size()), Integer.valueOf(newTopic2.numPartitions())));
                        }
                        hashMap2.put(str, NewPartitions.increaseTo(newTopic2.numPartitions()));
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (ExecutionException e2) {
                    arrayList.add(newTopic2);
                } catch (TimeoutException e3) {
                    throw new KafkaException("Timed out waiting to get existing topics", e3);
                }
            });
            if (arrayList.size() > 0) {
                try {
                    adminClient.createTopics(arrayList).all().get(operationTimeout, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.error("Interrupted while waiting for topic creation results", e);
                } catch (ExecutionException e2) {
                    log.error("Failed to create topics", e2.getCause());
                    doRetryAlreadyExists(adminClient, collection, mutableInt);
                    return;
                } catch (TimeoutException e3) {
                    throw new KafkaException("Timed out waiting for create topics results", e3);
                }
            }
            if (hashMap2.size() > 0) {
                try {
                    adminClient.createPartitions(hashMap2).all().get(operationTimeout, TimeUnit.SECONDS);
                } catch (InterruptedException e4) {
                    Thread.currentThread().interrupt();
                    log.error("Interrupted while waiting for partition creation results", e4);
                } catch (ExecutionException e5) {
                    log.error("Failed to create partitions", e5.getCause());
                    if (!(e5.getCause() instanceof UnsupportedVersionException)) {
                        throw new KafkaException("Failed to create partitions", e5.getCause());
                    }
                } catch (TimeoutException e6) {
                    throw new KafkaException("Timed out waiting for create partitions results", e6);
                }
            }
        }
    }

    private static void doRetryAlreadyExists(AdminClient adminClient, Collection<NewTopic> collection, MutableInt mutableInt) {
        log.warn("Topics already created. Will run operation one more time");
        mutableInt.increment();
        try {
            Thread.sleep(3000L);
            addTopicsIfNeeded(adminClient, collection, mutableInt);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
