package org.apache.inlong.manager.service.resource.queue.kafka;

import java.util.Iterator;
import java.util.List;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.class */
public class KafkaQueueResourceOperator implements QueueResourceOperator {
    private static final Logger log = LoggerFactory.getLogger(KafkaQueueResourceOperator.class);
    public static final String KAFKA_CONSUMER_GROUP = "%s_%s_consumer_group";
    public static final String KAFKA_CONSUMER_GROUP_REALTIME_REVIEW = "%s_%s_consumer_group_realtime_review";

    @Autowired
    private KafkaOperator kafkaOperator;

    @Autowired
    private InlongStreamService streamService;

    @Autowired
    private InlongConsumeService consumeService;

    @Autowired
    private InlongClusterService clusterService;

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public boolean accept(String str) {
        return "KAFKA".equals(str);
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void createQueueForGroup(@NotNull InlongGroupInfo inlongGroupInfo, @NotBlank String str) {
        log.info("skip to create kafka topic for groupId={}, just create in each inlong stream", inlongGroupInfo.getInlongGroupId());
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void deleteQueueForGroup(InlongGroupInfo inlongGroupInfo, String str) {
        Preconditions.expectNotNull(inlongGroupInfo, "inlong group info cannot be null");
        String inlongGroupId = inlongGroupInfo.getInlongGroupId();
        log.info("begin to delete kafka resource for groupId={}", inlongGroupId);
        ClusterInfo one = this.clusterService.getOne(inlongGroupInfo.getInlongClusterTag(), null, "KAFKA");
        try {
            List<InlongStreamBriefInfo> topicList = this.streamService.getTopicList(inlongGroupId);
            if (topicList == null || topicList.isEmpty()) {
                log.warn("skip to create kafka topic and subscription as no streams for groupId={}", inlongGroupId);
                return;
            }
            Iterator<InlongStreamBriefInfo> it = topicList.iterator();
            while (it.hasNext()) {
                deleteKafkaTopic(inlongGroupInfo, it.next().getMqResource());
            }
            log.info("success to delete kafka resource for groupId={}, cluster={}", inlongGroupId, one);
        } catch (Exception e) {
            log.error("failed to delete kafka resource for groupId=" + inlongGroupId, e);
            throw new WorkflowListenerException("failed to delete kafka resource: " + e.getMessage());
        }
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void createQueueForStream(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, String str) {
        Preconditions.expectNotNull(inlongGroupInfo, "inlong group info cannot be null");
        Preconditions.expectNotNull(inlongStreamInfo, "inlong stream info cannot be null");
        Preconditions.expectNotBlank(str, ErrorCodeEnum.INVALID_PARAMETER, "operator cannot be null");
        String inlongGroupId = inlongStreamInfo.getInlongGroupId();
        String inlongStreamId = inlongStreamInfo.getInlongStreamId();
        log.info("begin to create kafka resource for groupId={}, streamId={}", inlongGroupId, inlongStreamId);
        try {
            InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) inlongGroupInfo;
            String mqResource = inlongStreamInfo.getMqResource();
            if (mqResource.equals(inlongStreamId)) {
                mqResource = String.format("%s.%s", inlongKafkaInfo.getMqResource(), inlongStreamInfo.getMqResource());
            }
            createKafkaTopic(inlongKafkaInfo, mqResource);
            log.info("success to create kafka resource for groupId={}, streamId={}", inlongGroupId, inlongStreamId);
        } catch (Exception e) {
            String format = String.format("failed to create kafka topic for groupId=%s, streamId=%s", inlongGroupId, inlongStreamId);
            log.error(format, e);
            throw new WorkflowListenerException(format + ": " + e.getMessage());
        }
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void deleteQueueForStream(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, String str) {
        Preconditions.expectNotNull(inlongGroupInfo, "inlong group info cannot be null");
        Preconditions.expectNotNull(inlongStreamInfo, "inlong stream info cannot be null");
        String inlongGroupId = inlongStreamInfo.getInlongGroupId();
        String inlongStreamId = inlongStreamInfo.getInlongStreamId();
        log.info("begin to delete kafka resource for groupId={} streamId={}", inlongGroupId, inlongStreamId);
        try {
            String mqResource = inlongStreamInfo.getMqResource();
            if (StringUtils.isBlank(mqResource) || mqResource.equals(inlongStreamId)) {
                mqResource = String.format("%s.%s", inlongGroupInfo.getMqResource(), inlongStreamInfo.getMqResource());
            }
            deleteKafkaTopic(inlongGroupInfo, mqResource);
            log.info("success to delete kafka topic for groupId={}, streamId={}", inlongGroupId, inlongStreamId);
            log.info("success to delete kafka resource for groupId={}, streamId={}", inlongGroupId, inlongStreamId);
        } catch (Exception e) {
            String format = String.format("failed to delete kafka topic for groupId=%s, streamId=%s", inlongGroupId, inlongStreamId);
            log.error(format, e);
            throw new WorkflowListenerException(format);
        }
    }

    private void createKafkaTopic(InlongKafkaInfo inlongKafkaInfo, String str) throws Exception {
        KafkaClusterInfo one = this.clusterService.getOne(inlongKafkaInfo.getInlongClusterTag(), null, "KAFKA");
        this.kafkaOperator.createTopic(inlongKafkaInfo, one, str);
        if (!this.kafkaOperator.topicIsExists(one, str)) {
            String url = one.getUrl();
            log.error("topic={} not exists in {}", str, url);
            throw new WorkflowListenerException("topic=" + str + " not exists in " + url);
        }
        String format = String.format(KAFKA_CONSUMER_GROUP, inlongKafkaInfo.getInlongClusterTag(), str);
        log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}", new Object[]{this.consumeService.saveBySystem(inlongKafkaInfo, str, format), format, inlongKafkaInfo.getInlongGroupId(), str});
    }

    private void deleteKafkaTopic(InlongGroupInfo inlongGroupInfo, String str) {
        this.kafkaOperator.forceDeleteTopic(this.clusterService.getOne(inlongGroupInfo.getInlongClusterTag(), null, "KAFKA"), str);
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, Integer num) {
        KafkaClusterInfo one = this.clusterService.getOne(inlongGroupInfo.getInlongClusterTag(), null, "KAFKA");
        String mqResource = inlongStreamInfo.getMqResource();
        if (mqResource.equals(inlongStreamInfo.getInlongStreamId())) {
            mqResource = String.format("%s.%s", inlongGroupInfo.getMqResource(), inlongStreamInfo.getMqResource());
        }
        return this.kafkaOperator.queryLatestMessage(one, mqResource, String.format("%s_%s_consumer_group_realtime_review", inlongGroupInfo.getInlongClusterTag(), mqResource), num, inlongStreamInfo);
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public String getSortConsumeGroup(InlongGroupInfo inlongGroupInfo, InlongStreamEntity inlongStreamEntity, StreamSinkEntity streamSinkEntity) {
        return String.format(KAFKA_CONSUMER_GROUP, ((InlongKafkaInfo) inlongGroupInfo).getInlongClusterTag(), inlongStreamEntity.getMqResource());
    }
}
