package org.apache.inlong.manager.service.listener.queue;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.enums.TaskStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.pojo.workflow.TaskResponse;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/listener/queue/QueueResourceListener.class */
public class QueueResourceListener implements QueueOperateListener {
    private static final Logger log = LoggerFactory.getLogger(QueueResourceListener.class);
    private static final Integer TIMEOUT_SECONDS = 180;
    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(10, 20, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(10000), new ThreadFactoryBuilder().setNameFormat("inlong-mq-process-%s").build(), new ThreadPoolExecutor.CallerRunsPolicy());

    @Autowired
    private InlongGroupService groupService;

    @Autowired
    private InlongStreamService streamService;

    @Autowired
    private WorkflowService workflowService;

    @Autowired
    private QueueResourceOperatorFactory queueOperatorFactory;

    /* renamed from: org.apache.inlong.manager.service.listener.queue.QueueResourceListener$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/inlong/manager/service/listener/queue/QueueResourceListener$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$inlong$manager$common$enums$GroupOperateType = new int[GroupOperateType.values().length];

        static {
            try {
                $SwitchMap$org$apache$inlong$manager$common$enums$GroupOperateType[GroupOperateType.INIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$inlong$manager$common$enums$GroupOperateType[GroupOperateType.DELETE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* renamed from: event, reason: merged with bridge method [inline-methods] */
    public TaskEvent m92event() {
        return TaskEvent.COMPLETE;
    }

    public boolean accept(WorkflowContext workflowContext) {
        return isGroupProcessForm(workflowContext);
    }

    public ListenerResult listen(WorkflowContext workflowContext) throws WorkflowListenerException {
        GroupResourceProcessForm processForm = workflowContext.getProcessForm();
        String inlongGroupId = processForm.getInlongGroupId();
        InlongGroupInfo inlongGroupInfo = this.groupService.get(inlongGroupId);
        if (inlongGroupInfo == null) {
            String str = "inlong group not found with groupId=" + inlongGroupId;
            log.error(str);
            throw new WorkflowListenerException(str);
        }
        processForm.setGroupInfo(inlongGroupInfo);
        processForm.setStreamInfos(this.streamService.list(inlongGroupId));
        String operator = workflowContext.getOperator();
        GroupOperateType groupOperateType = processForm.getGroupOperateType();
        if (InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroupInfo.getInlongGroupMode()) || InlongConstants.DATASYNC_OFFLINE_MODE.equals(inlongGroupInfo.getInlongGroupMode())) {
            log.warn("skip to execute QueueResourceListener as sync mode {} (1 for realtime sync, 2 for offline sync) for groupId={}", inlongGroupInfo.getInlongGroupMode(), inlongGroupId);
            if (GroupOperateType.INIT.equals(groupOperateType)) {
                createQueueForStreams(inlongGroupInfo, processForm.getStreamInfos(), operator);
            }
            return ListenerResult.success("skip - disable create mq resource for sync mode");
        }
        if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(inlongGroupInfo.getEnableCreateResource())) {
            log.warn("skip to execute QueueResourceListener as disable create resource for groupId={}", inlongGroupId);
            return ListenerResult.success("skip - disable create resource");
        }
        QueueResourceOperator queueResourceOperatorFactory = this.queueOperatorFactory.getInstance(inlongGroupInfo.getMqType());
        switch (AnonymousClass1.$SwitchMap$org$apache$inlong$manager$common$enums$GroupOperateType[groupOperateType.ordinal()]) {
            case 1:
                this.groupService.updateStatus(inlongGroupId, GroupStatus.CONFIG_ING.getCode(), operator);
                queueResourceOperatorFactory.createQueueForGroup(inlongGroupInfo, operator);
                createQueueForStreams(inlongGroupInfo, processForm.getStreamInfos(), operator);
                break;
            case 2:
                this.groupService.updateStatus(inlongGroupId, GroupStatus.CONFIG_DELETING.getCode(), operator);
                queueResourceOperatorFactory.deleteQueueForGroup(inlongGroupInfo, operator);
                break;
            default:
                log.warn("unsupported operate={} for inlong group", groupOperateType);
                break;
        }
        log.info("success to execute QueueResourceListener for groupId={}, operateType={}", inlongGroupId, groupOperateType);
        return ListenerResult.success("success");
    }

    private void createQueueForStreams(InlongGroupInfo inlongGroupInfo, List<InlongStreamInfo> list, String str) {
        String inlongGroupId = inlongGroupInfo.getInlongGroupId();
        log.info("success to start stream process for groupId={}", inlongGroupId);
        for (InlongStreamInfo inlongStreamInfo : list) {
            StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(inlongGroupInfo, inlongStreamInfo, GroupOperateType.INIT);
            String str2 = "failed to start stream process for groupId=" + inlongGroupId + " streamId=" + inlongStreamInfo.getInlongStreamId();
            UserInfo loginUser = LoginUserUtils.getLoginUser();
            try {
                CompletableFuture.supplyAsync(() -> {
                    return this.workflowService.startAsync(ProcessName.CREATE_STREAM_RESOURCE, loginUser, processForm);
                }, EXECUTOR_SERVICE).whenComplete((workflowResult, th) -> {
                    if (th != null) {
                        log.error(str2 + ": " + th.getMessage());
                        throw new WorkflowListenerException(str2, th);
                    }
                    List newTasks = workflowResult.getNewTasks();
                    if (TaskStatus.FAILED == ((TaskResponse) newTasks.get(newTasks.size() - 1)).getStatus()) {
                        log.error(str2);
                        throw new WorkflowListenerException(str2);
                    }
                }).get(TIMEOUT_SECONDS.intValue(), TimeUnit.SECONDS);
            } catch (Exception e) {
                log.error("failed to execute stream process in asynchronously ", e);
                throw new WorkflowListenerException("failed to execute stream process in asynchronously : " + e.getMessage());
            }
        }
        log.info("success to start stream process for groupId={}", inlongGroupId);
    }
}
