package org.apache.inlong.manager.service.resource.sink.kafka;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO;
import org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.class */
public class KafkaResourceOperator extends AbstractStandaloneSinkResourceOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaResourceOperator.class);

    @Autowired
    private StreamSinkService sinkService;

    @Override // org.apache.inlong.manager.service.resource.sink.SinkResourceOperator
    public Boolean accept(String str) {
        return Boolean.valueOf("KAFKA".equals(str));
    }

    @Override // org.apache.inlong.manager.service.resource.sink.SinkResourceOperator
    public void createSinkResource(SinkInfo sinkInfo) {
        LOGGER.info("begin to create kafka topic for sinkId={}", sinkInfo.getId());
        KafkaSinkDTO fromJson = KafkaSinkDTO.getFromJson(sinkInfo.getExtParams());
        String topicName = fromJson.getTopicName();
        Integer partitionNum = fromJson.getPartitionNum();
        Preconditions.expectNotBlank(topicName, ErrorCodeEnum.INVALID_PARAMETER, "topic name cannot be empty");
        Preconditions.expectNotNull(partitionNum, ErrorCodeEnum.INVALID_PARAMETER, "partition cannot be empty");
        try {
            Admin kafkaAdmin = getKafkaAdmin(fromJson.getBootstrapServers());
            Throwable th = null;
            try {
                try {
                    if (!isTopicExists(kafkaAdmin, topicName, partitionNum)) {
                        ((KafkaFuture) kafkaAdmin.createTopics(Collections.singleton(new NewTopic(topicName, Optional.of(partitionNum), Optional.empty()))).values().get(topicName)).get();
                    }
                    this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode().intValue(), "create kafka topic success");
                    LOGGER.info("success to create kafka topic [{}] for sinkInfo={}", topicName, sinkInfo);
                    if (kafkaAdmin != null) {
                        if (0 != 0) {
                            try {
                                kafkaAdmin.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaAdmin.close();
                        }
                    }
                    assignCluster(sinkInfo);
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            LOGGER.error("create kafka topic error, ", th3);
            this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode().intValue(), th3.getMessage());
            throw new WorkflowException("create kafka topic failed, reason: " + th3.getMessage());
        }
    }

    private boolean isTopicExists(Admin admin, String str, Integer num) throws Exception {
        if (!((Map) admin.listTopics().namesToListings().get()).containsKey(str)) {
            LOGGER.info("kafka topic {} not existed", str);
            return false;
        }
        TopicDescription topicDescription = (TopicDescription) ((KafkaFuture) admin.describeTopics(Collections.singletonList(str)).values().get(str)).get();
        if (topicDescription.partitions().size() == num.intValue()) {
            LOGGER.info(String.format("kafka topic=%s already exist with partition num=%s, no need to create", str, num));
            return true;
        }
        String format = String.format("kafka topic=%s already exist with partition num=%s, but the requested partition num=%s", str, Integer.valueOf(topicDescription.partitions().size()), num);
        LOGGER.error(format);
        throw new IllegalArgumentException(format);
    }

    private Admin getKafkaAdmin(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        return Admin.create(properties);
    }
}
