package org.apache.linkis.engineplugin.elasticsearch.executor;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.IOUtils;
import org.apache.linkis.common.io.MetaData;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.common.utils.OverloadUtils;
import org.apache.linkis.engineconn.common.conf.EngineConnConf;
import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
import org.apache.linkis.engineconn.core.EngineConnObject;
import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchEngineConsoleConf;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.ElasticSearchErrorResponse;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.ElasticSearchExecutor;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.ElasticSearchJsonResponse;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.ElasticSearchResponse;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.ElasticSearchTableResponse;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.impl.ElasticSearchExecutorImpl;
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.LoadResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse;
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
import org.apache.linkis.storage.LineRecord;
import org.apache.linkis.storage.resultset.table.TableMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:org/apache/linkis/engineplugin/elasticsearch/executor/ElasticSearchEngineConnExecutor.class */
public class ElasticSearchEngineConnExecutor extends ConcurrentComputationExecutor {
    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchEngineConnExecutor.class);
    private int id;
    private String runType;
    private List<Label<?>> executorLabels;
    private Cache<String, ElasticSearchExecutor> elasticSearchExecutorCache;

    public ElasticSearchEngineConnExecutor(int i, int i2, String str) {
        super(i);
        this.executorLabels = new ArrayList(2);
        this.elasticSearchExecutorCache = CacheBuilder.newBuilder().expireAfterAccess(Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()).longValue(), TimeUnit.MILLISECONDS).removalListener(new RemovalListener<String, ElasticSearchExecutor>() { // from class: org.apache.linkis.engineplugin.elasticsearch.executor.ElasticSearchEngineConnExecutor.1
            public void onRemoval(RemovalNotification<String, ElasticSearchExecutor> removalNotification) {
                ((ElasticSearchExecutor) removalNotification.getValue()).close();
                if (ExecutionNodeStatus.isCompleted(ElasticSearchEngineConnExecutor.this.getTaskById((String) removalNotification.getKey()).getStatus())) {
                    return;
                }
                ElasticSearchEngineConnExecutor.this.killTask((String) removalNotification.getKey());
            }
        }).maximumSize(EngineConnConstant.MAX_TASK_NUM()).build();
        this.id = i2;
        this.runType = str;
    }

    public void init() {
        super.init();
    }

    public ExecuteResponse execute(EngineConnTask engineConnTask) {
        Map<String, String> buildRuntimeParams = buildRuntimeParams(engineConnTask);
        logger.info("The elasticsearch properties is: {}", buildRuntimeParams);
        ElasticSearchExecutorImpl elasticSearchExecutorImpl = new ElasticSearchExecutorImpl(this.runType, buildRuntimeParams);
        try {
            elasticSearchExecutorImpl.open();
            this.elasticSearchExecutorCache.put(engineConnTask.getTaskId(), elasticSearchExecutorImpl);
            return super.execute(engineConnTask);
        } catch (Exception e) {
            logger.error("Execute es code failed, reason:", e);
            return new ErrorExecuteResponse("run es failed", e);
        }
    }

    public ExecuteResponse executeLine(EngineExecutionContext engineExecutionContext, String str) {
        ElasticSearchResponse executeLine = ((ElasticSearchExecutor) this.elasticSearchExecutorCache.getIfPresent((String) engineExecutionContext.getJobId().get())).executeLine(str);
        try {
            if (executeLine instanceof ElasticSearchTableResponse) {
                ElasticSearchTableResponse elasticSearchTableResponse = (ElasticSearchTableResponse) executeLine;
                TableMetaData tableMetaData = new TableMetaData(elasticSearchTableResponse.columns());
                ResultSetWriter createResultSetWriter = engineExecutionContext.createResultSetWriter("2");
                createResultSetWriter.addMetaData(tableMetaData);
                Arrays.asList(elasticSearchTableResponse.records()).forEach(tableRecord -> {
                    try {
                        createResultSetWriter.addRecord(tableRecord);
                    } catch (IOException e) {
                        logger.warn("es addRecord failed", e);
                        throw new RuntimeException("es addRecord failed", e);
                    }
                });
                String resultSetWriter = createResultSetWriter.toString();
                IOUtils.closeQuietly(createResultSetWriter);
                return new AliasOutputExecuteResponse((String) null, resultSetWriter);
            }
            if (!(executeLine instanceof ElasticSearchJsonResponse)) {
                if (!(executeLine instanceof ElasticSearchErrorResponse)) {
                    return new ErrorExecuteResponse("es executeLine failed", (Throwable) null);
                }
                ElasticSearchErrorResponse elasticSearchErrorResponse = (ElasticSearchErrorResponse) executeLine;
                return new ErrorExecuteResponse(elasticSearchErrorResponse.message(), elasticSearchErrorResponse.cause());
            }
            ElasticSearchJsonResponse elasticSearchJsonResponse = (ElasticSearchJsonResponse) executeLine;
            ResultSetWriter createResultSetWriter2 = engineExecutionContext.createResultSetWriter("1");
            createResultSetWriter2.addMetaData((MetaData) null);
            Arrays.stream(elasticSearchJsonResponse.value().split("\\n")).forEach(str2 -> {
                try {
                    createResultSetWriter2.addRecord(new LineRecord(str2));
                } catch (IOException e) {
                    logger.warn("es addRecord failed", e);
                    throw new RuntimeException("es addRecord failed", e);
                }
            });
            String resultSetWriter2 = createResultSetWriter2.toString();
            IOUtils.closeQuietly(createResultSetWriter2);
            return new AliasOutputExecuteResponse((String) null, resultSetWriter2);
        } catch (IOException e) {
            logger.warn("es addMetaData failed", e);
            return new ErrorExecuteResponse("es addMetaData failed", e);
        }
    }

    private Map<String, String> buildRuntimeParams(EngineConnTask engineConnTask) {
        Map<? extends String, ? extends String> map = (Map) engineConnTask.getProperties().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Objects.toString(entry.getValue(), null);
        }));
        Map<String, String> cacheMap = new ElasticSearchEngineConsoleConf().getCacheMap(engineConnTask.getLables());
        if (MapUtils.isNotEmpty(map)) {
            cacheMap.putAll(map);
        }
        return cacheMap;
    }

    public ExecuteResponse executeCompletely(EngineExecutionContext engineExecutionContext, String str, String str2) {
        return null;
    }

    public float progress(String str) {
        return 0.0f;
    }

    public JobProgressInfo[] getProgressInfo(String str) {
        return new JobProgressInfo[0];
    }

    public List<Label<?>> getExecutorLabels() {
        return this.executorLabels;
    }

    public void setExecutorLabels(List<Label<?>> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        this.executorLabels.clear();
        this.executorLabels.addAll(list);
    }

    public NodeResource requestExpectedResource(NodeResource nodeResource) {
        return null;
    }

    public NodeResource getCurrentNodeResource() {
        NodeResourceUtils.appendMemoryUnitIfMissing(EngineConnObject.getEngineCreationContext().getOptions());
        CommonNodeResource commonNodeResource = new CommonNodeResource();
        commonNodeResource.setUsedResource(new LoadResource(OverloadUtils.getProcessMaxMemory(), 1));
        return commonNodeResource;
    }

    public String getId() {
        return Sender.getThisServiceInstance().getInstance() + "_" + this.id;
    }

    public int getConcurrentLimit() {
        return ((Integer) ElasticSearchConfiguration.ENGINE_CONCURRENT_LIMIT.getValue()).intValue();
    }

    public void killTask(String str) {
        ElasticSearchExecutor elasticSearchExecutor = (ElasticSearchExecutor) this.elasticSearchExecutorCache.getIfPresent(str);
        if (elasticSearchExecutor != null) {
            elasticSearchExecutor.close();
        }
        super.killTask(str);
    }

    public void killAll() {
        this.elasticSearchExecutorCache.asMap().values().forEach(elasticSearchExecutor -> {
            elasticSearchExecutor.close();
        });
    }

    public void transformTaskStatus(EngineConnTask engineConnTask, ExecutionNodeStatus executionNodeStatus) {
        super.transformTaskStatus(engineConnTask, executionNodeStatus);
        if (ExecutionNodeStatus.isCompleted(executionNodeStatus)) {
            this.elasticSearchExecutorCache.invalidate(engineConnTask.getTaskId());
        }
    }

    public boolean supportCallBackLogs() {
        return false;
    }
}
