package io.edurt.datacap.server.service.impl;

import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import io.edurt.datacap.server.body.PipelineBody;
import io.edurt.datacap.server.common.BeanToPropertiesCommon;
import io.edurt.datacap.server.common.PluginCommon;
import io.edurt.datacap.server.common.Response;
import io.edurt.datacap.server.common.ServiceState;
import io.edurt.datacap.server.entity.PipelineEntity;
import io.edurt.datacap.server.entity.SourceEntity;
import io.edurt.datacap.server.plugin.configure.IConfigure;
import io.edurt.datacap.server.plugin.configure.IConfigureExecutor;
import io.edurt.datacap.server.plugin.configure.IConfigureExecutorField;
import io.edurt.datacap.server.plugin.configure.IConfigurePipelineType;
import io.edurt.datacap.server.repository.PipelineRepository;
import io.edurt.datacap.server.repository.SourceRepository;
import io.edurt.datacap.server.security.UserDetailsService;
import io.edurt.datacap.server.service.PipelineService;
import io.edurt.datacap.spi.executor.Executor;
import io.edurt.datacap.spi.executor.Pipeline;
import io.edurt.datacap.spi.executor.PipelineField;
import io.edurt.datacap.spi.executor.PipelineResponse;
import io.edurt.datacap.spi.executor.PipelineState;
import java.io.File;
import java.sql.Timestamp;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:io/edurt/datacap/server/service/impl/PipelineServiceImpl.class */
public class PipelineServiceImpl implements PipelineService {
    private static final Logger log = LoggerFactory.getLogger(PipelineServiceImpl.class);
    private final ExecutorService executorService = Executors.newCachedThreadPool();
    private final SourceRepository repository;
    private final Injector injector;
    private final Environment environment;
    private final PipelineRepository pipelineRepository;

    public PipelineServiceImpl(SourceRepository sourceRepository, Injector injector, Environment environment, PipelineRepository pipelineRepository) {
        this.repository = sourceRepository;
        this.injector = injector;
        this.environment = environment;
        this.pipelineRepository = pipelineRepository;
    }

    @Override // io.edurt.datacap.server.service.PipelineService
    public Response<Object> submit(PipelineBody pipelineBody) {
        Optional findById = this.repository.findById(pipelineBody.getFrom().getId());
        Optional findById2 = this.repository.findById(pipelineBody.getTo().getId());
        if (!findById.isPresent() || !findById2.isPresent()) {
            return Response.failure(ServiceState.SOURCE_NOT_FOUND);
        }
        SourceEntity sourceEntity = (SourceEntity) findById.get();
        IConfigure loadYamlConfigure = PluginCommon.loadYamlConfigure(sourceEntity.getProtocol(), sourceEntity.getType(), sourceEntity.getType(), this.environment);
        if (ObjectUtils.isEmpty(loadYamlConfigure.getPipelines())) {
            return Response.failure(ServiceState.SOURCE_NOT_SUPPORTED_PIPELINE, String.format("Source %s is not supported pipeline, type %s", sourceEntity.getId(), sourceEntity.getType()));
        }
        Optional<IConfigureExecutor> findFirst = loadYamlConfigure.getPipelines().stream().filter(iConfigureExecutor -> {
            return iConfigureExecutor.getExecutor().equals(pipelineBody.getExecutor()) && iConfigureExecutor.getType().equals(IConfigurePipelineType.SOURCE);
        }).findFirst();
        if (!findFirst.isPresent()) {
            return Response.failure(ServiceState.SOURCE_NOT_SUPPORTED_PIPELINE_TYPE, String.format("Source %s type %s is not supported pipeline type %s", sourceEntity.getId(), sourceEntity.getType(), IConfigurePipelineType.SOURCE));
        }
        SourceEntity sourceEntity2 = (SourceEntity) findById2.get();
        IConfigure loadYamlConfigure2 = PluginCommon.loadYamlConfigure(sourceEntity2.getProtocol(), sourceEntity2.getType(), sourceEntity2.getType(), this.environment);
        if (ObjectUtils.isEmpty(loadYamlConfigure2.getPipelines())) {
            return Response.failure(ServiceState.SOURCE_NOT_SUPPORTED_PIPELINE, String.format("Source %s is not supported pipeline, type %s", sourceEntity2.getId(), sourceEntity2.getType()));
        }
        Optional<IConfigureExecutor> findFirst2 = loadYamlConfigure2.getPipelines().stream().filter(iConfigureExecutor2 -> {
            return iConfigureExecutor2.getExecutor().equals(pipelineBody.getExecutor()) && iConfigureExecutor2.getType().equals(IConfigurePipelineType.SINK);
        }).findFirst();
        if (!findFirst2.isPresent()) {
            return Response.failure(ServiceState.SOURCE_NOT_SUPPORTED_PIPELINE_TYPE, String.format("Source %s type %s is not supported pipeline type %s", sourceEntity2.getId(), sourceEntity2.getType(), IConfigurePipelineType.SINK));
        }
        Optional findFirst3 = ((Set) this.injector.getInstance(Key.get(new TypeLiteral<Set<Executor>>() { // from class: io.edurt.datacap.server.service.impl.PipelineServiceImpl.1
        }))).stream().filter(executor -> {
            return executor.name().equals(pipelineBody.getExecutor());
        }).findFirst();
        Properties configures = pipelineBody.getFrom().getConfigures();
        configures.setProperty("context", pipelineBody.getContent());
        Properties merge = merge(sourceEntity, findFirst.get().getFields(), configures);
        HashSet hashSet = new HashSet();
        findFirst.get().getFields().stream().filter(iConfigureExecutorField -> {
            return iConfigureExecutorField.isRequired();
        }).forEach(iConfigureExecutorField2 -> {
            hashSet.add(iConfigureExecutorField2.getField());
        });
        PipelineField build = PipelineField.builder().type(sourceEntity.getType()).configure(merge).supportOptions(hashSet).build();
        Properties merge2 = merge(sourceEntity2, findFirst2.get().getFields(), pipelineBody.getTo().getConfigures());
        HashSet hashSet2 = new HashSet();
        findFirst2.get().getFields().stream().filter(iConfigureExecutorField3 -> {
            return iConfigureExecutorField3.isRequired();
        }).forEach(iConfigureExecutorField4 -> {
            hashSet2.add(iConfigureExecutorField4.getField());
        });
        PipelineField build2 = PipelineField.builder().type(sourceEntity2.getType()).configure(merge2).supportOptions(hashSet2).build();
        String property = this.environment.getProperty("datacap.executor.data");
        if (StringUtils.isEmpty(property)) {
            property = String.join(File.separator, System.getProperty("user.dir"), "data");
        }
        String username = UserDetailsService.getUser().getUsername();
        String format = DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMddHHmmssSSS");
        String join = String.join(File.separator, property, username, format);
        String join2 = String.join("_", username, pipelineBody.getExecutor().toLowerCase(), "from", String.valueOf(sourceEntity.getId()), "to", String.valueOf(sourceEntity2.getId()), format);
        try {
            FileUtils.forceMkdir(new File(join));
        } catch (Exception e) {
            log.warn("Failed to create temporary directory", e);
        }
        Pipeline build3 = Pipeline.builder().work(join).home(this.environment.getProperty(String.format("datacap.executor.%s.home", pipelineBody.getExecutor().toLowerCase(Locale.ROOT)))).pipelineName(join2).username(UserDetailsService.getUser().getUsername()).from(build).to(build2).timeout(600L).build();
        PipelineEntity pipelineEntity = new PipelineEntity();
        pipelineEntity.setName(join2);
        pipelineEntity.setContent(pipelineBody.getContent());
        pipelineEntity.setState(PipelineState.CREATED);
        pipelineEntity.setWork(join);
        pipelineEntity.setStartTime(new Timestamp(System.currentTimeMillis()));
        pipelineEntity.setUser(UserDetailsService.getUser());
        pipelineEntity.setFrom(sourceEntity);
        pipelineEntity.setFromConfigures(merge);
        pipelineEntity.setTo(sourceEntity2);
        pipelineEntity.setToConfigures(merge2);
        this.pipelineRepository.save(pipelineEntity);
        this.executorService.submit(() -> {
            pipelineEntity.setState(PipelineState.RUNNING);
            this.pipelineRepository.save(pipelineEntity);
            PipelineResponse start = ((Executor) findFirst3.get()).start(build3);
            pipelineEntity.setEndTime(new Timestamp(System.currentTimeMillis()));
            pipelineEntity.setState(start.getState());
            pipelineEntity.setMessage(start.getMessage());
            pipelineEntity.setElapsed(Long.valueOf(pipelineEntity.getElapsed()));
            this.pipelineRepository.save(pipelineEntity);
        });
        return Response.success(pipelineEntity.getId());
    }

    private Properties merge(SourceEntity sourceEntity, List<IConfigureExecutorField> list, Properties properties) {
        Properties properties2 = new Properties();
        Properties convertBeanToProperties = BeanToPropertiesCommon.convertBeanToProperties(sourceEntity);
        for (IConfigureExecutorField iConfigureExecutorField : list) {
            if (iConfigureExecutorField.isOverride()) {
                setProperty(iConfigureExecutorField, properties2, properties);
            } else {
                setProperty(iConfigureExecutorField, properties2, convertBeanToProperties);
            }
        }
        return properties2;
    }

    private void setProperty(IConfigureExecutorField iConfigureExecutorField, Properties properties, Properties properties2) {
        Object obj = "";
        if (ObjectUtils.isNotEmpty(iConfigureExecutorField.getOrigin())) {
            String[] split = String.valueOf(iConfigureExecutorField.getOrigin()).split("\\|");
            if (split.length > 1) {
                obj = String.join(":", String.valueOf(properties2.get(split[0])), String.valueOf(properties2.get(split[1])));
            } else if (ObjectUtils.isNotEmpty(properties2.get(iConfigureExecutorField.getOrigin()))) {
                obj = properties2.get(iConfigureExecutorField.getOrigin());
            }
        } else if (ObjectUtils.isNotEmpty(properties2.get(iConfigureExecutorField.getField()))) {
            obj = properties2.get(iConfigureExecutorField.getField());
        }
        properties.put(iConfigureExecutorField.getField(), obj);
    }

    @Override // io.edurt.datacap.server.service.BaseService
    public Response<Long> delete(Long l) {
        throw new UnsupportedOperationException("Not supported yet.");
    }
}
