package org.apache.inlong.manager.service.schedule;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.inlong.common.bounded.Boundaries;
import org.apache.inlong.manager.common.enums.ScheduleStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.schedule.ScheduleClientFactory;
import org.apache.inlong.manager.schedule.ScheduleEngineClient;
import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.class */
public class ScheduleOperatorImpl implements ScheduleOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleOperatorImpl.class);

    @Autowired
    private ScheduleService scheduleService;

    @Autowired
    private InlongGroupExtEntityMapper groupExtMapper;

    @Autowired
    private ScheduleClientFactory scheduleClientFactory;
    private OfflineJobOperator offlineJobOperator;
    private ScheduleEngineClient scheduleEngineClient;

    @Override // org.apache.inlong.manager.service.schedule.ScheduleOperator
    @Transactional(rollbackFor = {Throwable.class})
    public int saveOpt(ScheduleInfoRequest scheduleInfoRequest, String str) {
        int save = this.scheduleService.save(scheduleInfoRequest, str);
        LOGGER.info("Save schedule info success for group {}", scheduleInfoRequest.getInlongGroupId());
        registerScheduleInfoForApprovedGroup((ScheduleInfo) CommonBeanUtils.copyProperties(scheduleInfoRequest, ScheduleInfo::new), str);
        return save;
    }

    private void registerScheduleInfoForApprovedGroup(ScheduleInfo scheduleInfo, String str) {
        String inlongGroupId = scheduleInfo.getInlongGroupId();
        InlongGroupExtEntity selectByUniqueKey = this.groupExtMapper.selectByUniqueKey(inlongGroupId, "register.schedule.status");
        if (selectByUniqueKey == null || !"registered".equalsIgnoreCase(selectByUniqueKey.getKeyValue())) {
            return;
        }
        this.scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), ScheduleStatus.APPROVED, str);
        registerToScheduleEngine(scheduleInfo, str, false);
        LOGGER.info("Register schedule info success for group {}", inlongGroupId);
    }

    private ScheduleEngineClient getScheduleEngineClient() {
        if (this.scheduleEngineClient == null) {
            this.scheduleEngineClient = this.scheduleClientFactory.getInstance();
        }
        return this.scheduleEngineClient;
    }

    @Override // org.apache.inlong.manager.service.schedule.ScheduleOperator
    public Boolean scheduleInfoExist(String str) {
        return this.scheduleService.exist(str);
    }

    @Override // org.apache.inlong.manager.service.schedule.ScheduleOperator
    public ScheduleInfo getScheduleInfo(String str) {
        return this.scheduleService.get(str);
    }

    @Override // org.apache.inlong.manager.service.schedule.ScheduleOperator
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean updateOpt(ScheduleInfoRequest scheduleInfoRequest, String str) {
        if (!scheduleInfoExist(scheduleInfoRequest.getInlongGroupId()).booleanValue()) {
            saveOpt(scheduleInfoRequest, str);
            return true;
        }
        if (!needUpdate((ScheduleInfo) CommonBeanUtils.copyProperties(scheduleInfoRequest, ScheduleInfo::new))) {
            LOGGER.info("schedule info not changed for group {}", scheduleInfoRequest.getInlongGroupId());
            return false;
        }
        boolean booleanValue = this.scheduleService.update(scheduleInfoRequest, str).booleanValue();
        this.scheduleService.updateStatus(scheduleInfoRequest.getInlongGroupId(), ScheduleStatus.UPDATED, str);
        return Boolean.valueOf(booleanValue);
    }

    @Override // org.apache.inlong.manager.service.schedule.ScheduleOperator
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean updateAndRegister(ScheduleInfoRequest scheduleInfoRequest, String str) {
        if (updateOpt(scheduleInfoRequest, str).booleanValue()) {
            return registerToScheduleEngine((ScheduleInfo) CommonBeanUtils.copyProperties(scheduleInfoRequest, ScheduleInfo::new), str, true);
        }
        return false;
    }

    private Boolean registerToScheduleEngine(ScheduleInfo scheduleInfo, String str, boolean z) {
        boolean update = z ? getScheduleEngineClient().update(scheduleInfo) : getScheduleEngineClient().register(scheduleInfo);
        this.scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), ScheduleStatus.REGISTERED, str);
        LOGGER.info("{} schedule info success for group {}", z ? "Update" : "Register", scheduleInfo.getInlongGroupId());
        return Boolean.valueOf(update);
    }

    private boolean needUpdate(ScheduleInfo scheduleInfo) {
        return (scheduleInfo == null || scheduleInfo.equals(getScheduleInfo(scheduleInfo.getInlongGroupId()))) ? false : true;
    }

    @Override // org.apache.inlong.manager.service.schedule.ScheduleOperator
    public Boolean deleteByGroupIdOpt(String str, String str2) {
        return this.scheduleService.deleteByGroupId(str, str2);
    }

    @Override // org.apache.inlong.manager.service.schedule.ScheduleOperator
    public Boolean handleGroupApprove(String str) {
        if (scheduleInfoExist(str).booleanValue()) {
            this.scheduleService.updateStatus(str, ScheduleStatus.APPROVED, null);
            return registerToScheduleEngine(getScheduleInfo(str), null, false);
        }
        LOGGER.warn("schedule info not exist for group {}", str);
        return false;
    }

    @Override // org.apache.inlong.manager.service.schedule.ScheduleOperator
    public Boolean submitOfflineJob(String str, List<InlongStreamInfo> list, Boundaries boundaries) {
        if (this.offlineJobOperator == null) {
            this.offlineJobOperator = OfflineJobOperatorFactory.getOfflineJobOperator();
        }
        try {
            this.offlineJobOperator.submitOfflineJob(str, list, boundaries);
            LOGGER.info("Submit offline job for group {} and stream list {} success.", str, list.stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList()));
            return true;
        } catch (Exception e) {
            String format = String.format("Submit offline job failed for groupId=%s", str);
            LOGGER.error(format, e);
            throw new BusinessException(format);
        }
    }
}
