package io.edurt.datacap.server.scheduled;

import com.google.inject.Injector;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.schedule.ScheduledRunnable;
import io.edurt.datacap.server.common.JSON;
import io.edurt.datacap.server.common.PluginCommon;
import io.edurt.datacap.server.common.Type;
import io.edurt.datacap.server.entity.SourceEntity;
import io.edurt.datacap.server.entity.TemplateSqlEntity;
import io.edurt.datacap.server.itransient.SqlConfigure;
import io.edurt.datacap.server.repository.SourceRepository;
import io.edurt.datacap.server.repository.TemplateSqlRepository;
import io.edurt.datacap.spi.Plugin;
import io.edurt.datacap.spi.model.Configure;
import io.edurt.datacap.spi.model.Response;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;

@SuppressFBWarnings({"REC_CATCH_EXCEPTION"})
/* loaded from: input_file:io/edurt/datacap/server/scheduled/SourceScheduledRunnable.class */
public class SourceScheduledRunnable extends ScheduledRunnable {
    private static final Logger log = LoggerFactory.getLogger(SourceScheduledRunnable.class);
    public static final String DATABASE = "database";
    public static final String TABLE = "table";
    private static final String GET_ALL_DATABASES = "getAllDatabase";
    private static final String GET_ALL_TABLES = "getAllTablesFromDatabase";
    private static final String GET_ALL_COLUMNS = "getAllColumnsFromDatabaseAndTable";
    private final Injector injector;
    private final SourceRepository sourceRepository;
    private final TemplateSqlRepository templateSqlRepository;
    private final RedisTemplate redisTemplate;

    public SourceScheduledRunnable(String str, Injector injector, SourceRepository sourceRepository, TemplateSqlRepository templateSqlRepository, RedisTemplate redisTemplate) {
        super(str);
        this.injector = injector;
        this.sourceRepository = sourceRepository;
        this.templateSqlRepository = templateSqlRepository;
        this.redisTemplate = redisTemplate;
    }

    public void run() {
        this.sourceRepository.findAll().forEach(sourceEntity -> {
            log.info("==================== {} ---> {} started =================", getName(), sourceEntity.getName());
            Optional<Plugin> pluginByNameAndType = PluginCommon.getPluginByNameAndType(this.injector, sourceEntity.getType(), sourceEntity.getProtocol());
            if (pluginByNameAndType.isPresent()) {
                long currentTimeMillis = System.currentTimeMillis();
                String format = String.format("%s_%s", sourceEntity.getName(), sourceEntity.getId());
                log.info("The scheduled task <{}> execution start - child：{} start with {}", new Object[]{getName(), format, Long.valueOf(currentTimeMillis)});
                TemplateSqlEntity findByNameAndPluginContaining = this.templateSqlRepository.findByNameAndPluginContaining(GET_ALL_DATABASES, sourceEntity.getType());
                if (ObjectUtils.isEmpty(findByNameAndPluginContaining)) {
                    log.warn("The scheduled task {} template {} is not available", new Object[]{getName(), sourceEntity.getName(), GET_ALL_DATABASES});
                } else {
                    String join = String.join("_", sourceEntity.getType(), sourceEntity.getId().toString());
                    long longValue = this.redisTemplate.opsForSet().size(join).longValue();
                    this.redisTemplate.delete(join);
                    log.info("The scheduled task {} child {} type {} find keys counter: {} is cleaned", new Object[]{getName(), sourceEntity.getName(), sourceEntity.getType(), Long.valueOf(longValue)});
                    processDatabase(sourceEntity, getContent(findByNameAndPluginContaining, null), pluginByNameAndType.get(), DATABASE, join);
                }
                long currentTimeMillis2 = System.currentTimeMillis();
                log.info("The scheduled task <{}> execution end - child：{} end with {} consuming：{} millisecond", new Object[]{getName(), format, Long.valueOf(currentTimeMillis2), Long.valueOf(currentTimeMillis2 - currentTimeMillis)});
            } else {
                log.warn("The scheduled task <{}> source {} protocol {} is not available", new Object[]{getName(), sourceEntity.getType(), sourceEntity.getProtocol()});
            }
            log.info("==================== {} ---> {} end =================", getName(), sourceEntity.getName());
        });
    }

    private Configure getConfigure(SourceEntity sourceEntity) {
        Configure configure = new Configure();
        configure.setHost(sourceEntity.getHost());
        configure.setPort(sourceEntity.getPort());
        configure.setUsername(Optional.ofNullable(sourceEntity.getUsername()));
        configure.setPassword(Optional.ofNullable(sourceEntity.getPassword()));
        configure.setDatabase(StringUtils.isNotEmpty(sourceEntity.getDatabase()) ? Optional.ofNullable(sourceEntity.getDatabase()) : Optional.empty());
        configure.setSsl(Optional.ofNullable(sourceEntity.getSsl()));
        configure.setEnv(Optional.ofNullable(sourceEntity.getConfigures()));
        return configure;
    }

    private String getContent(TemplateSqlEntity templateSqlEntity, Map<String, String> map) {
        try {
            if (!ObjectUtils.isNotEmpty(templateSqlEntity.getConfigure())) {
                return templateSqlEntity.getContent();
            }
            String[] strArr = {templateSqlEntity.getContent()};
            List list = (List) JSON.objectmapper.readValue(templateSqlEntity.getConfigure(), List.class);
            map.entrySet().forEach(entry -> {
                Optional findFirst = list.stream().filter(linkedHashMap -> {
                    return String.valueOf(linkedHashMap.get("column")).equalsIgnoreCase((String) entry.getKey());
                }).map(linkedHashMap2 -> {
                    SqlConfigure sqlConfigure = new SqlConfigure();
                    sqlConfigure.setColumn(linkedHashMap2.get("column").toString());
                    sqlConfigure.setType(Type.valueOf(String.valueOf(linkedHashMap2.get("type"))));
                    sqlConfigure.setExpression(String.valueOf(linkedHashMap2.get("expression")));
                    return sqlConfigure;
                }).findFirst();
                if (findFirst.isPresent()) {
                    strArr[0] = strArr[0].replace(((SqlConfigure) findFirst.get()).getExpression(), String.valueOf(entry.getValue()));
                }
            });
            return strArr[0];
        } catch (Exception e) {
            log.warn("Failed to analysis");
            return templateSqlEntity.getContent();
        }
    }

    private void processDatabase(SourceEntity sourceEntity, String str, Plugin plugin, String str2, String str3) {
        plugin.connect(getConfigure(sourceEntity));
        Response execute = plugin.execute(str);
        if (execute.getIsSuccessful().booleanValue()) {
            TemplateSqlEntity findByNameAndPluginContaining = this.templateSqlRepository.findByNameAndPluginContaining(GET_ALL_TABLES, sourceEntity.getType());
            if (ObjectUtils.isEmpty(findByNameAndPluginContaining)) {
                log.warn("The scheduled task {} template {} is not available", new Object[]{getName(), sourceEntity.getName(), GET_ALL_TABLES});
            } else {
                execute.getColumns().forEach(obj -> {
                    String str4 = (String) ((List) obj).get(0);
                    log.info("The scheduled task {} child {} sync data from source : {}", new Object[]{getName(), sourceEntity.getName(), str4});
                    HashMap hashMap = new HashMap();
                    hashMap.put(DATABASE, str4);
                    processTable(sourceEntity, getContent(findByNameAndPluginContaining, hashMap), str4, plugin, str3);
                });
            }
        } else {
            log.warn("The scheduled task {} child {} type {} sync data from source is failed {}", new Object[]{getName(), sourceEntity.getName(), str2, execute.getMessage()});
        }
        plugin.destroy();
    }

    private void processTable(SourceEntity sourceEntity, String str, String str2, Plugin plugin, String str3) {
        plugin.connect(getConfigure(sourceEntity));
        Response execute = plugin.execute(str);
        if (execute.getIsSuccessful().booleanValue()) {
            TemplateSqlEntity findByNameAndPluginContaining = this.templateSqlRepository.findByNameAndPluginContaining(GET_ALL_COLUMNS, sourceEntity.getType());
            if (ObjectUtils.isEmpty(findByNameAndPluginContaining)) {
                log.warn("The scheduled task {} template {} is not available", new Object[]{getName(), sourceEntity.getName(), GET_ALL_COLUMNS});
            } else {
                execute.getColumns().forEach(obj -> {
                    String str4 = (String) ((List) obj).get(0);
                    log.info("The scheduled task {} child {} database {} sync data from source is : {}", new Object[]{getName(), sourceEntity.getName(), str2, str4});
                    HashMap hashMap = new HashMap();
                    hashMap.put(TABLE, String.join(".", str2, str4));
                    processColumn(sourceEntity, getContent(findByNameAndPluginContaining, hashMap), str2, str4, plugin, str3);
                });
            }
        } else {
            log.warn("The scheduled task {} child {} database {} sync data from source is failed {}", new Object[]{getName(), sourceEntity.getName(), str2, execute.getMessage()});
        }
        plugin.destroy();
    }

    private void processColumn(SourceEntity sourceEntity, String str, String str2, String str3, Plugin plugin, String str4) {
        plugin.connect(getConfigure(sourceEntity));
        Response execute = plugin.execute(str);
        if (!execute.getIsSuccessful().booleanValue()) {
            log.warn("The scheduled task {} child {} database {} table {} sync data from source is failed {}", new Object[]{getName(), sourceEntity.getName(), str2, str3, execute.getMessage()});
        } else if (ObjectUtils.isEmpty(this.templateSqlRepository.findByNameAndPluginContaining(GET_ALL_COLUMNS, sourceEntity.getType()))) {
            log.warn("The scheduled task {} template {} is not available", new Object[]{getName(), sourceEntity.getName(), GET_ALL_COLUMNS});
        } else {
            execute.getColumns().forEach(obj -> {
                String str5 = (String) ((List) obj).get(0);
                log.info("The scheduled task {} child {} database {} table {} sync data from source is : {}", new Object[]{getName(), sourceEntity.getName(), str2, str3, str5});
                this.redisTemplate.opsForSet().add(str4, new Object[]{String.join(".", str2, str3, str5)});
            });
        }
        plugin.destroy();
    }
}
