package com.netflix.conductor.es7.dao.index;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.type.MapType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.es7.config.ElasticSearchProperties;
import com.netflix.conductor.es7.dao.query.parser.internal.ParserException;
import com.netflix.conductor.metrics.Monitors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.XContentType;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.support.RetryTemplate;

@Trace
/* loaded from: input_file:com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7.class */
public class ElasticSearchRestDAOV7 extends ElasticSearchBaseDAO implements IndexDAO {
    private static final int CORE_POOL_SIZE = 6;
    private static final long KEEP_ALIVE_TIME = 1;
    private static final String WORKFLOW_DOC_TYPE = "workflow";
    private static final String TASK_DOC_TYPE = "task";
    private static final String LOG_DOC_TYPE = "task_log";
    private static final String EVENT_DOC_TYPE = "event";
    private static final String MSG_DOC_TYPE = "message";
    private final String workflowIndexName;
    private final String taskIndexName;
    private final String eventIndexPrefix;
    private String eventIndexName;
    private final String messageIndexPrefix;
    private String messageIndexName;
    private String logIndexName;
    private final String logIndexPrefix;
    private final String clusterHealthColor;
    private final RestHighLevelClient elasticSearchClient;
    private final RestClient elasticSearchAdminClient;
    private final ExecutorService executorService;
    private final ExecutorService logExecutorService;
    private final ConcurrentHashMap<String, BulkRequests> bulkRequests;
    private final int indexBatchSize;
    private final int asyncBufferFlushTimeout;
    private final ElasticSearchProperties properties;
    private final RetryTemplate retryTemplate;
    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDAOV7.class);
    private static final String CLASS_NAME = ElasticSearchRestDAOV7.class.getSimpleName();
    private static final TimeZone GMT = TimeZone.getTimeZone("GMT");
    private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMWW");
    private static final String className = ElasticSearchRestDAOV7.class.getSimpleName();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7$BulkRequests.class */
    public static class BulkRequests {
        private final long lastFlushTime;
        private final BulkRequestWrapper bulkRequest;

        long getLastFlushTime() {
            return this.lastFlushTime;
        }

        BulkRequestWrapper getBulkRequest() {
            return this.bulkRequest;
        }

        BulkRequests(long j, BulkRequest bulkRequest) {
            this.lastFlushTime = j;
            this.bulkRequest = new BulkRequestWrapper(bulkRequest);
        }
    }

    /* loaded from: input_file:com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7$HttpMethod.class */
    private @interface HttpMethod {
        public static final String GET = "GET";
        public static final String POST = "POST";
        public static final String PUT = "PUT";
        public static final String HEAD = "HEAD";
    }

    public ElasticSearchRestDAOV7(RestClientBuilder restClientBuilder, RetryTemplate retryTemplate, ElasticSearchProperties elasticSearchProperties, ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
        this.elasticSearchAdminClient = restClientBuilder.build();
        this.elasticSearchClient = new RestHighLevelClient(restClientBuilder);
        this.clusterHealthColor = elasticSearchProperties.getClusterHealthColor();
        this.bulkRequests = new ConcurrentHashMap<>();
        this.indexBatchSize = elasticSearchProperties.getIndexBatchSize();
        this.asyncBufferFlushTimeout = (int) elasticSearchProperties.getAsyncBufferFlushTimeout().getSeconds();
        this.properties = elasticSearchProperties;
        this.indexPrefix = elasticSearchProperties.getIndexPrefix();
        this.workflowIndexName = getIndexName(WORKFLOW_DOC_TYPE);
        this.taskIndexName = getIndexName(TASK_DOC_TYPE);
        this.logIndexPrefix = this.indexPrefix + "_task_log";
        this.messageIndexPrefix = this.indexPrefix + "_message";
        this.eventIndexPrefix = this.indexPrefix + "_event";
        int asyncWorkerQueueSize = elasticSearchProperties.getAsyncWorkerQueueSize();
        this.executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, elasticSearchProperties.getAsyncMaxPoolSize(), KEEP_ALIVE_TIME, TimeUnit.MINUTES, new LinkedBlockingQueue(asyncWorkerQueueSize), (runnable, threadPoolExecutor) -> {
            logger.warn("Request  {} to async dao discarded in executor {}", runnable, threadPoolExecutor);
            Monitors.recordDiscardedIndexingCount("indexQueue");
        });
        this.logExecutorService = new ThreadPoolExecutor(1, 2, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue(asyncWorkerQueueSize), (runnable2, threadPoolExecutor2) -> {
            logger.warn("Request {} to async log dao discarded in executor {}", runnable2, threadPoolExecutor2);
            Monitors.recordDiscardedIndexingCount("logQueue");
        });
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::flushBulkRequests, 60L, 30L, TimeUnit.SECONDS);
        this.retryTemplate = retryTemplate;
    }

    @PreDestroy
    private void shutdown() {
        logger.info("Gracefully shutdown executor service");
        shutdownExecutorService(this.logExecutorService);
        shutdownExecutorService(this.executorService);
    }

    private void shutdownExecutorService(ExecutorService executorService) {
        try {
            executorService.shutdown();
            if (executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                logger.debug("tasks completed, shutting down");
            } else {
                logger.warn("Forcing shutdown after waiting for 30 seconds");
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            logger.warn("Shutdown interrupted, invoking shutdownNow on scheduledThreadPoolExecutor for delay queue");
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @PostConstruct
    public void setup() throws Exception {
        waitForHealthyCluster();
        if (this.properties.isAutoIndexManagementEnabled()) {
            createIndexesTemplates();
            createWorkflowIndex();
            createTaskIndex();
        }
    }

    private void createIndexesTemplates() {
        try {
            initIndexesTemplates();
            updateIndexesNames();
            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(this::updateIndexesNames, 0L, KEEP_ALIVE_TIME, TimeUnit.HOURS);
        } catch (Exception e) {
            logger.error("Error creating index templates!", e);
        }
    }

    private void initIndexesTemplates() {
        initIndexTemplate(LOG_DOC_TYPE);
        initIndexTemplate(EVENT_DOC_TYPE);
        initIndexTemplate(MSG_DOC_TYPE);
    }

    private void initIndexTemplate(String str) {
        String str2 = "template_" + str;
        try {
            if (doesResourceNotExist("/_template/" + str2)) {
                logger.info("Creating the index template '" + str2 + "'");
                NByteArrayEntity nByteArrayEntity = new NByteArrayEntity(IOUtils.toByteArray(ElasticSearchRestDAOV7.class.getResourceAsStream("/" + str2 + ".json")), ContentType.APPLICATION_JSON);
                Request request = new Request(HttpMethod.PUT, "/_template/" + str2);
                request.setEntity(nByteArrayEntity);
                IOUtils.toString(this.elasticSearchAdminClient.performRequest(request).getEntity().getContent());
            }
        } catch (Exception e) {
            logger.error("Failed to init " + str2, e);
        }
    }

    private void updateIndexesNames() {
        this.logIndexName = updateIndexName(LOG_DOC_TYPE);
        this.eventIndexName = updateIndexName(EVENT_DOC_TYPE);
        this.messageIndexName = updateIndexName(MSG_DOC_TYPE);
    }

    private String updateIndexName(String str) {
        String str2 = this.indexPrefix + "_" + str + "_" + SIMPLE_DATE_FORMAT.format(new Date());
        try {
            addIndex(str2);
            return str2;
        } catch (IOException e) {
            logger.error("Failed to update log index name: {}", str2, e);
            throw new NonTransientException(e.getMessage(), e);
        }
    }

    private void createWorkflowIndex() {
        String indexName = getIndexName(WORKFLOW_DOC_TYPE);
        try {
            addIndex(indexName, "/mappings_docType_workflow.json");
        } catch (IOException e) {
            logger.error("Failed to initialize index '{}'", indexName, e);
        }
    }

    private void createTaskIndex() {
        String indexName = getIndexName(TASK_DOC_TYPE);
        try {
            addIndex(indexName, "/mappings_docType_task.json");
        } catch (IOException e) {
            logger.error("Failed to initialize index '{}'", indexName, e);
        }
    }

    private void waitForHealthyCluster() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("wait_for_status", this.clusterHealthColor);
        hashMap.put("timeout", "30s");
        Request request = new Request(HttpMethod.GET, "/_cluster/health");
        request.addParameters(hashMap);
        this.elasticSearchAdminClient.performRequest(request);
    }

    private void addIndex(String str, String str2) throws IOException {
        logger.info("Adding index '{}'...", str);
        String str3 = "/" + str;
        if (!doesResourceNotExist(str3)) {
            logger.info("Index '{}' already exists", str);
            return;
        }
        try {
            this.objectMapper.createObjectNode();
            ObjectNode createObjectNode = this.objectMapper.createObjectNode();
            ObjectNode createObjectNode2 = this.objectMapper.createObjectNode();
            createObjectNode.put("number_of_shards", this.properties.getIndexShardCount());
            createObjectNode.put("number_of_replicas", this.properties.getIndexReplicasCount());
            JsonNode readTree = this.objectMapper.readTree(loadTypeMappingSource(str2));
            createObjectNode2.set("settings", createObjectNode);
            createObjectNode2.set("mappings", readTree);
            Request request = new Request(HttpMethod.PUT, str3);
            request.setEntity(new NStringEntity(this.objectMapper.writeValueAsString(createObjectNode2), ContentType.APPLICATION_JSON));
            this.elasticSearchAdminClient.performRequest(request);
            logger.info("Added '{}' index", str);
        } catch (ResponseException e) {
            boolean z = true;
            Response response = e.getResponse();
            if (response.getStatusLine().getStatusCode() == 400 && "index_already_exists_exception".equals(this.objectMapper.readTree(EntityUtils.toString(response.getEntity())).get("error").get("type").asText())) {
                z = false;
            }
            if (z) {
                throw e;
            }
        }
    }

    private void addIndex(String str) throws IOException {
        logger.info("Adding index '{}'...", str);
        String str2 = "/" + str;
        if (!doesResourceNotExist(str2)) {
            logger.info("Index '{}' already exists", str);
            return;
        }
        try {
            ObjectNode createObjectNode = this.objectMapper.createObjectNode();
            ObjectNode createObjectNode2 = this.objectMapper.createObjectNode();
            createObjectNode2.put("number_of_shards", this.properties.getIndexShardCount());
            createObjectNode2.put("number_of_replicas", this.properties.getIndexReplicasCount());
            createObjectNode.set("settings", createObjectNode2);
            Request request = new Request(HttpMethod.PUT, str2);
            request.setEntity(new NStringEntity(createObjectNode.toString(), ContentType.APPLICATION_JSON));
            this.elasticSearchAdminClient.performRequest(request);
            logger.info("Added '{}' index", str);
        } catch (ResponseException e) {
            boolean z = true;
            Response response = e.getResponse();
            if (response.getStatusLine().getStatusCode() == 400 && "index_already_exists_exception".equals(this.objectMapper.readTree(EntityUtils.toString(response.getEntity())).get("error").get("type").asText())) {
                z = false;
            }
            if (z) {
                throw e;
            }
        }
    }

    private void addMappingToIndex(String str, String str2, String str3) throws IOException {
        logger.info("Adding '{}' mapping to index '{}'...", str2, str);
        String str4 = "/" + str + "/_mapping";
        if (!doesResourceNotExist(str4)) {
            logger.info("Mapping '{}' already exists", str2);
            return;
        }
        NByteArrayEntity nByteArrayEntity = new NByteArrayEntity(loadTypeMappingSource(str3).getBytes(), ContentType.APPLICATION_JSON);
        Request request = new Request(HttpMethod.PUT, str4);
        request.setEntity(nByteArrayEntity);
        this.elasticSearchAdminClient.performRequest(request);
        logger.info("Added '{}' mapping", str2);
    }

    public boolean doesResourceExist(String str) throws IOException {
        return this.elasticSearchAdminClient.performRequest(new Request(HttpMethod.HEAD, str)).getStatusLine().getStatusCode() == 200;
    }

    public boolean doesResourceNotExist(String str) throws IOException {
        return !doesResourceExist(str);
    }

    public void indexWorkflow(WorkflowSummary workflowSummary) {
        try {
            long epochMilli = Instant.now().toEpochMilli();
            String workflowId = workflowSummary.getWorkflowId();
            this.elasticSearchClient.index(new IndexRequest(this.workflowIndexName).id(workflowId).source(this.objectMapper.writeValueAsBytes(workflowSummary), XContentType.JSON), RequestOptions.DEFAULT);
            long epochMilli2 = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for indexing workflow: {}", Long.valueOf(epochMilli2 - epochMilli), workflowId);
            Monitors.recordESIndexTime("index_workflow", WORKFLOW_DOC_TYPE, epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
        } catch (Exception e) {
            Monitors.error(className, "indexWorkflow");
            logger.error("Failed to index workflow: {}", workflowSummary.getWorkflowId(), e);
        }
    }

    public CompletableFuture<Void> asyncIndexWorkflow(WorkflowSummary workflowSummary) {
        return CompletableFuture.runAsync(() -> {
            indexWorkflow(workflowSummary);
        }, this.executorService);
    }

    public void indexTask(TaskSummary taskSummary) {
        try {
            long epochMilli = Instant.now().toEpochMilli();
            String taskId = taskSummary.getTaskId();
            indexObject(this.taskIndexName, TASK_DOC_TYPE, taskId, taskSummary);
            long epochMilli2 = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for  indexing task:{} in workflow: {}", new Object[]{Long.valueOf(epochMilli2 - epochMilli), taskId, taskSummary.getWorkflowId()});
            Monitors.recordESIndexTime("index_task", TASK_DOC_TYPE, epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
        } catch (Exception e) {
            logger.error("Failed to index task: {}", taskSummary.getTaskId(), e);
        }
    }

    public CompletableFuture<Void> asyncIndexTask(TaskSummary taskSummary) {
        return CompletableFuture.runAsync(() -> {
            indexTask(taskSummary);
        }, this.executorService);
    }

    public void addTaskExecutionLogs(List<TaskExecLog> list) {
        if (list.isEmpty()) {
            return;
        }
        long epochMilli = Instant.now().toEpochMilli();
        BulkRequest bulkRequest = new BulkRequest();
        for (TaskExecLog taskExecLog : list) {
            try {
                byte[] writeValueAsBytes = this.objectMapper.writeValueAsBytes(taskExecLog);
                IndexRequest indexRequest = new IndexRequest(this.logIndexName);
                indexRequest.source(writeValueAsBytes, XContentType.JSON);
                bulkRequest.add(indexRequest);
            } catch (JsonProcessingException e) {
                logger.error("Failed to convert task log to JSON for task {}", taskExecLog.getTaskId());
            }
        }
        try {
            this.elasticSearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            long epochMilli2 = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for indexing taskExecutionLogs", Long.valueOf(epochMilli2 - epochMilli));
            Monitors.recordESIndexTime("index_task_execution_logs", LOG_DOC_TYPE, epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("logQueue", ((ThreadPoolExecutor) this.logExecutorService).getQueue().size());
        } catch (Exception e2) {
            logger.error("Failed to index task execution logs for tasks: {}", (List) list.stream().map((v0) -> {
                return v0.getTaskId();
            }).collect(Collectors.toList()), e2);
        }
    }

    public CompletableFuture<Void> asyncAddTaskExecutionLogs(List<TaskExecLog> list) {
        return CompletableFuture.runAsync(() -> {
            addTaskExecutionLogs(list);
        }, this.logExecutorService);
    }

    public List<TaskExecLog> getTaskExecutionLogs(String str) {
        try {
            BoolQueryBuilder boolQueryBuilder = boolQueryBuilder("taskId='" + str + "'", "*");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(boolQueryBuilder);
            searchSourceBuilder.sort(new FieldSortBuilder("createdTime").order(SortOrder.ASC));
            searchSourceBuilder.size(this.properties.getTaskLogResultLimit());
            SearchRequest searchRequest = new SearchRequest(new String[]{this.logIndexPrefix + "*"});
            searchRequest.source(searchSourceBuilder);
            return mapTaskExecLogsResponse(this.elasticSearchClient.search(searchRequest, RequestOptions.DEFAULT));
        } catch (Exception e) {
            logger.error("Failed to get task execution logs for task: {}", str, e);
            return null;
        }
    }

    private List<TaskExecLog> mapTaskExecLogsResponse(SearchResponse searchResponse) throws IOException {
        SearchHit[] hits = searchResponse.getHits().getHits();
        ArrayList arrayList = new ArrayList(hits.length);
        for (SearchHit searchHit : hits) {
            arrayList.add((TaskExecLog) this.objectMapper.readValue(searchHit.getSourceAsString(), TaskExecLog.class));
        }
        return arrayList;
    }

    public List<Message> getMessages(String str) {
        try {
            BoolQueryBuilder boolQueryBuilder = boolQueryBuilder("queue='" + str + "'", "*");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(boolQueryBuilder);
            searchSourceBuilder.sort(new FieldSortBuilder("created").order(SortOrder.ASC));
            SearchRequest searchRequest = new SearchRequest(new String[]{this.messageIndexPrefix + "*"});
            searchRequest.source(searchSourceBuilder);
            return mapGetMessagesResponse(this.elasticSearchClient.search(searchRequest, RequestOptions.DEFAULT));
        } catch (Exception e) {
            logger.error("Failed to get messages for queue: {}", str, e);
            return null;
        }
    }

    private List<Message> mapGetMessagesResponse(SearchResponse searchResponse) throws IOException {
        SearchHit[] hits = searchResponse.getHits().getHits();
        MapType constructMapType = TypeFactory.defaultInstance().constructMapType(HashMap.class, String.class, String.class);
        ArrayList arrayList = new ArrayList(hits.length);
        for (SearchHit searchHit : hits) {
            Map map = (Map) this.objectMapper.readValue(searchHit.getSourceAsString(), constructMapType);
            arrayList.add(new Message((String) map.get("messageId"), (String) map.get("payload"), (String) null));
        }
        return arrayList;
    }

    public List<EventExecution> getEventExecutions(String str) {
        try {
            BoolQueryBuilder boolQueryBuilder = boolQueryBuilder("event='" + str + "'", "*");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(boolQueryBuilder);
            searchSourceBuilder.sort(new FieldSortBuilder("created").order(SortOrder.ASC));
            SearchRequest searchRequest = new SearchRequest(new String[]{this.eventIndexPrefix + "*"});
            searchRequest.source(searchSourceBuilder);
            return mapEventExecutionsResponse(this.elasticSearchClient.search(searchRequest, RequestOptions.DEFAULT));
        } catch (Exception e) {
            logger.error("Failed to get executions for event: {}", str, e);
            return null;
        }
    }

    private List<EventExecution> mapEventExecutionsResponse(SearchResponse searchResponse) throws IOException {
        SearchHit[] hits = searchResponse.getHits().getHits();
        ArrayList arrayList = new ArrayList(hits.length);
        for (SearchHit searchHit : hits) {
            arrayList.add((EventExecution) this.objectMapper.readValue(searchHit.getSourceAsString(), EventExecution.class));
        }
        return arrayList;
    }

    public void addMessage(String str, Message message) {
        try {
            long epochMilli = Instant.now().toEpochMilli();
            HashMap hashMap = new HashMap();
            hashMap.put("messageId", message.getId());
            hashMap.put("payload", message.getPayload());
            hashMap.put("queue", str);
            hashMap.put("created", Long.valueOf(System.currentTimeMillis()));
            indexObject(this.messageIndexName, MSG_DOC_TYPE, hashMap);
            long epochMilli2 = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for  indexing message: {}", Long.valueOf(epochMilli2 - epochMilli), message.getId());
            Monitors.recordESIndexTime("add_message", MSG_DOC_TYPE, epochMilli2 - epochMilli);
        } catch (Exception e) {
            logger.error("Failed to index message: {}", message.getId(), e);
        }
    }

    public CompletableFuture<Void> asyncAddMessage(String str, Message message) {
        return CompletableFuture.runAsync(() -> {
            addMessage(str, message);
        }, this.executorService);
    }

    public void addEventExecution(EventExecution eventExecution) {
        try {
            long epochMilli = Instant.now().toEpochMilli();
            indexObject(this.eventIndexName, EVENT_DOC_TYPE, eventExecution.getName() + "." + eventExecution.getEvent() + "." + eventExecution.getMessageId() + "." + eventExecution.getId(), eventExecution);
            long epochMilli2 = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for indexing event execution: {}", Long.valueOf(epochMilli2 - epochMilli), eventExecution.getId());
            Monitors.recordESIndexTime("add_event_execution", EVENT_DOC_TYPE, epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("logQueue", ((ThreadPoolExecutor) this.logExecutorService).getQueue().size());
        } catch (Exception e) {
            logger.error("Failed to index event execution: {}", eventExecution.getId(), e);
        }
    }

    public CompletableFuture<Void> asyncAddEventExecution(EventExecution eventExecution) {
        return CompletableFuture.runAsync(() -> {
            addEventExecution(eventExecution);
        }, this.logExecutorService);
    }

    public SearchResult<String> searchWorkflows(String str, String str2, int i, int i2, List<String> list) {
        try {
            return searchObjectIdsViaExpression(str, i, i2, list, str2, WORKFLOW_DOC_TYPE);
        } catch (Exception e) {
            throw new NonTransientException(e.getMessage(), e);
        }
    }

    public SearchResult<WorkflowSummary> searchWorkflowSummary(String str, String str2, int i, int i2, List<String> list) {
        try {
            return searchObjectsViaExpression(str, i, i2, list, str2, WORKFLOW_DOC_TYPE, false, WorkflowSummary.class);
        } catch (Exception e) {
            throw new TransientException(e.getMessage(), e);
        }
    }

    private <T> SearchResult<T> searchObjectsViaExpression(String str, int i, int i2, List<String> list, String str2, String str3, boolean z, Class<T> cls) throws ParserException, IOException {
        return searchObjects(getIndexName(str3), boolQueryBuilder(str, str2), i, i2, list, z, cls);
    }

    public SearchResult<String> searchTasks(String str, String str2, int i, int i2, List<String> list) {
        try {
            return searchObjectIdsViaExpression(str, i, i2, list, str2, TASK_DOC_TYPE);
        } catch (Exception e) {
            throw new NonTransientException(e.getMessage(), e);
        }
    }

    public SearchResult<TaskSummary> searchTaskSummary(String str, String str2, int i, int i2, List<String> list) {
        try {
            return searchObjectsViaExpression(str, i, i2, list, str2, TASK_DOC_TYPE, false, TaskSummary.class);
        } catch (Exception e) {
            throw new TransientException(e.getMessage(), e);
        }
    }

    public void removeWorkflow(String str) {
        long epochMilli = Instant.now().toEpochMilli();
        try {
            if (this.elasticSearchClient.delete(new DeleteRequest(this.workflowIndexName, str), RequestOptions.DEFAULT).getResult() == DocWriteResponse.Result.NOT_FOUND) {
                logger.error("Index removal failed - document not found by id: {}", str);
            }
            long epochMilli2 = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for removing workflow: {}", Long.valueOf(epochMilli2 - epochMilli), str);
            Monitors.recordESIndexTime("remove_workflow", WORKFLOW_DOC_TYPE, epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
        } catch (IOException e) {
            logger.error("Failed to remove workflow {} from index", str, e);
            Monitors.error(className, "remove");
        }
    }

    public CompletableFuture<Void> asyncRemoveWorkflow(String str) {
        return CompletableFuture.runAsync(() -> {
            removeWorkflow(str);
        }, this.executorService);
    }

    public void updateWorkflow(String str, String[] strArr, Object[] objArr) {
        try {
            if (strArr.length != objArr.length) {
                throw new NonTransientException("Number of keys and values do not match");
            }
            long epochMilli = Instant.now().toEpochMilli();
            UpdateRequest updateRequest = new UpdateRequest(this.workflowIndexName, str);
            Map map = (Map) IntStream.range(0, strArr.length).boxed().collect(Collectors.toMap(num -> {
                return strArr[num.intValue()];
            }, num2 -> {
                return objArr[num2.intValue()];
            }));
            updateRequest.doc(map);
            logger.debug("Updating workflow {} with {}", str, map);
            this.elasticSearchClient.update(updateRequest, RequestOptions.DEFAULT);
            long epochMilli2 = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for updating workflow: {}", Long.valueOf(epochMilli2 - epochMilli), str);
            Monitors.recordESIndexTime("update_workflow", WORKFLOW_DOC_TYPE, epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
        } catch (Exception e) {
            logger.error("Failed to update workflow {}", str, e);
            Monitors.error(className, "update");
        }
    }

    public void removeTask(String str, String str2) {
        long epochMilli = Instant.now().toEpochMilli();
        if (searchTasks(String.format("(taskId='%s') AND (workflowId='%s')", str2, str), "*", 0, 1, null).getTotalHits() == 0) {
            logger.error("Task: {} does not belong to workflow: {}", str2, str);
            Monitors.error(className, "removeTask");
            return;
        }
        try {
            if (this.elasticSearchClient.delete(new DeleteRequest(this.taskIndexName, str2), RequestOptions.DEFAULT).getResult() != DocWriteResponse.Result.DELETED) {
                logger.error("Index removal failed - task not found by id: {}", str);
                Monitors.error(className, "removeTask");
            } else {
                long epochMilli2 = Instant.now().toEpochMilli();
                logger.debug("Time taken {} for removing task:{} of workflow: {}", new Object[]{Long.valueOf(epochMilli2 - epochMilli), str2, str});
                Monitors.recordESIndexTime("remove_task", "", epochMilli2 - epochMilli);
                Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
            }
        } catch (IOException e) {
            logger.error("Failed to remove task {} of workflow: {} from index", new Object[]{str2, str, e});
            Monitors.error(className, "removeTask");
        }
    }

    public CompletableFuture<Void> asyncRemoveTask(String str, String str2) {
        return CompletableFuture.runAsync(() -> {
            removeTask(str, str2);
        }, this.executorService);
    }

    public void updateTask(String str, String str2, String[] strArr, Object[] objArr) {
        try {
            if (strArr.length != objArr.length) {
                throw new IllegalArgumentException("Number of keys and values do not match");
            }
            long epochMilli = Instant.now().toEpochMilli();
            UpdateRequest updateRequest = new UpdateRequest(this.taskIndexName, str2);
            Map map = (Map) IntStream.range(0, strArr.length).boxed().collect(Collectors.toMap(num -> {
                return strArr[num.intValue()];
            }, num2 -> {
                return objArr[num2.intValue()];
            }));
            updateRequest.doc(map);
            logger.debug("Updating task: {} of workflow: {} with {}", new Object[]{str2, str, map});
            this.elasticSearchClient.update(updateRequest, RequestOptions.DEFAULT);
            long epochMilli2 = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for updating task: {} of workflow: {}", new Object[]{Long.valueOf(epochMilli2 - epochMilli), str2, str});
            Monitors.recordESIndexTime("update_task", "", epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
        } catch (Exception e) {
            logger.error("Failed to update task: {} of workflow: {}", new Object[]{str2, str, e});
            Monitors.error(className, "update");
        }
    }

    public CompletableFuture<Void> asyncUpdateTask(String str, String str2, String[] strArr, Object[] objArr) {
        return CompletableFuture.runAsync(() -> {
            updateTask(str, str2, strArr, objArr);
        }, this.executorService);
    }

    public CompletableFuture<Void> asyncUpdateWorkflow(String str, String[] strArr, Object[] objArr) {
        return CompletableFuture.runAsync(() -> {
            updateWorkflow(str, strArr, objArr);
        }, this.executorService);
    }

    public String get(String str, String str2) {
        try {
            GetResponse getResponse = this.elasticSearchClient.get(new GetRequest(this.workflowIndexName, str), RequestOptions.DEFAULT);
            if (getResponse.isExists()) {
                Map sourceAsMap = getResponse.getSourceAsMap();
                if (sourceAsMap.get(str2) != null) {
                    return sourceAsMap.get(str2).toString();
                }
            }
            logger.debug("Unable to find Workflow: {} in ElasticSearch index: {}.", str, this.workflowIndexName);
            return null;
        } catch (IOException e) {
            logger.error("Unable to get Workflow: {} from ElasticSearch index: {}", new Object[]{str, this.workflowIndexName, e});
            return null;
        }
    }

    private SearchResult<String> searchObjectIdsViaExpression(String str, int i, int i2, List<String> list, String str2, String str3) throws ParserException, IOException {
        return searchObjectIds(getIndexName(str3), boolQueryBuilder(str, str2), i, i2, list);
    }

    private <T> SearchResult<T> searchObjectIdsViaExpression(String str, int i, int i2, List<String> list, String str2, String str3, Class<T> cls) throws ParserException, IOException {
        return searchObjects(getIndexName(str3), boolQueryBuilder(str, str2), i, i2, list, false, cls);
    }

    private SearchResult<String> searchObjectIds(String str, QueryBuilder queryBuilder, int i, int i2) throws IOException {
        return searchObjectIds(str, queryBuilder, i, i2, null);
    }

    private SearchResult<String> searchObjectIds(String str, QueryBuilder queryBuilder, int i, int i2, List<String> list) throws IOException {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(queryBuilder);
        searchSourceBuilder.from(i);
        searchSourceBuilder.size(i2);
        if (list != null && !list.isEmpty()) {
            for (String str2 : list) {
                SortOrder sortOrder = SortOrder.ASC;
                String str3 = str2;
                int indexOf = str2.indexOf(":");
                if (indexOf > 0) {
                    str3 = str2.substring(0, indexOf);
                    sortOrder = SortOrder.valueOf(str2.substring(indexOf + 1));
                }
                searchSourceBuilder.sort(new FieldSortBuilder(str3).order(sortOrder));
            }
        }
        SearchRequest searchRequest = new SearchRequest(new String[]{str});
        searchRequest.source(searchSourceBuilder);
        SearchResponse search = this.elasticSearchClient.search(searchRequest, RequestOptions.DEFAULT);
        LinkedList linkedList = new LinkedList();
        search.getHits().forEach(searchHit -> {
            linkedList.add(searchHit.getId());
        });
        return new SearchResult<>(search.getHits().getTotalHits().value, linkedList);
    }

    private <T> SearchResult<T> searchObjects(String str, QueryBuilder queryBuilder, int i, int i2, List<String> list, boolean z, Class<T> cls) throws IOException {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(queryBuilder);
        searchSourceBuilder.from(i);
        searchSourceBuilder.size(i2);
        if (z) {
            searchSourceBuilder.fetchSource(false);
        }
        if (list != null && !list.isEmpty()) {
            for (String str2 : list) {
                SortOrder sortOrder = SortOrder.ASC;
                String str3 = str2;
                int indexOf = str2.indexOf(":");
                if (indexOf > 0) {
                    str3 = str2.substring(0, indexOf);
                    sortOrder = SortOrder.valueOf(str2.substring(indexOf + 1));
                }
                searchSourceBuilder.sort(new FieldSortBuilder(str3).order(sortOrder));
            }
        }
        SearchRequest searchRequest = new SearchRequest(new String[]{str});
        searchRequest.source(searchSourceBuilder);
        return mapSearchResult(this.elasticSearchClient.search(searchRequest, RequestOptions.DEFAULT), z, cls);
    }

    private <T> SearchResult<T> mapSearchResult(SearchResponse searchResponse, boolean z, Class<T> cls) {
        SearchHits hits = searchResponse.getHits();
        return new SearchResult<>(hits.getTotalHits().value, z ? (List) Arrays.stream(hits.getHits()).map(searchHit -> {
            return cls.cast(searchHit.getId());
        }).collect(Collectors.toList()) : (List) Arrays.stream(hits.getHits()).map(searchHit2 -> {
            try {
                return this.objectMapper.readValue(searchHit2.getSourceAsString(), cls);
            } catch (JsonProcessingException e) {
                logger.error("Failed to de-serialize elasticsearch from source: {}", searchHit2.getSourceAsString(), e);
                return null;
            }
        }).collect(Collectors.toList()));
    }

    public List<String> searchArchivableWorkflows(String str, long j) {
        try {
            return searchObjectIds(str, QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("endTime").lt(LocalDate.now().minusDays(j).toString()).gte(LocalDate.now().minusDays(j).minusDays(KEEP_ALIVE_TIME).toString())).should(QueryBuilders.termQuery("status", "COMPLETED")).should(QueryBuilders.termQuery("status", "FAILED")).should(QueryBuilders.termQuery("status", "TIMED_OUT")).should(QueryBuilders.termQuery("status", "TERMINATED")).mustNot(QueryBuilders.existsQuery("archived")).minimumShouldMatch(1), 0, 1000).getResults();
        } catch (IOException e) {
            logger.error("Unable to communicate with ES to find archivable workflows", e);
            return Collections.emptyList();
        }
    }

    public long getWorkflowCount(String str, String str2) {
        try {
            return getObjectCounts(str, str2, WORKFLOW_DOC_TYPE);
        } catch (Exception e) {
            throw new NonTransientException(e.getMessage(), e);
        }
    }

    private long getObjectCounts(String str, String str2, String str3) throws ParserException, IOException {
        return this.elasticSearchClient.count(new CountRequest(new String[]{getIndexName(str3)}, boolQueryBuilder(str, str2)), RequestOptions.DEFAULT).getCount();
    }

    public List<String> searchRecentRunningWorkflows(int i, int i2) {
        DateTime dateTime = new DateTime();
        try {
            return searchObjectIds(this.workflowIndexName, QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("updateTime").gt(dateTime.minusHours(i))).must(QueryBuilders.rangeQuery("updateTime").lt(dateTime.minusHours(i2))).must(QueryBuilders.termQuery("status", "RUNNING")), 0, 5000, Collections.singletonList("updateTime:ASC")).getResults();
        } catch (IOException e) {
            logger.error("Unable to communicate with ES to find recent running workflows", e);
            return Collections.emptyList();
        }
    }

    private void indexObject(String str, String str2, Object obj) {
        indexObject(str, str2, null, obj);
    }

    private void indexObject(String str, String str2, String str3, Object obj) {
        try {
            byte[] writeValueAsBytes = this.objectMapper.writeValueAsBytes(obj);
            IndexRequest indexRequest = new IndexRequest(str);
            indexRequest.id(str3).source(writeValueAsBytes, XContentType.JSON);
            if (this.bulkRequests.get(str2) == null) {
                this.bulkRequests.put(str2, new BulkRequests(System.currentTimeMillis(), new BulkRequest()));
            }
            this.bulkRequests.get(str2).getBulkRequest().add(indexRequest);
            if (this.bulkRequests.get(str2).getBulkRequest().numberOfActions() >= this.indexBatchSize) {
                indexBulkRequest(str2);
            }
        } catch (JsonProcessingException e) {
            logger.error("Failed to convert {} '{}' to byte string", str2, str3);
        }
    }

    private synchronized void indexBulkRequest(String str) {
        if (this.bulkRequests.get(str).getBulkRequest() == null || this.bulkRequests.get(str).getBulkRequest().numberOfActions() <= 0) {
            return;
        }
        synchronized (this.bulkRequests.get(str).getBulkRequest()) {
            indexWithRetry(this.bulkRequests.get(str).getBulkRequest().get(), "Bulk Indexing " + str, str);
            this.bulkRequests.put(str, new BulkRequests(System.currentTimeMillis(), new BulkRequest()));
        }
    }

    private void indexWithRetry(BulkRequest bulkRequest, String str, String str2) {
        try {
            long epochMilli = Instant.now().toEpochMilli();
            this.retryTemplate.execute(retryContext -> {
                return this.elasticSearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            });
            long epochMilli2 = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for indexing object of type: {}", Long.valueOf(epochMilli2 - epochMilli), str2);
            Monitors.recordESIndexTime("index_object", str2, epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
            Monitors.recordWorkerQueueSize("logQueue", ((ThreadPoolExecutor) this.logExecutorService).getQueue().size());
        } catch (Exception e) {
            Monitors.error(className, "index");
            logger.error("Failed to index {} for request type: {}", new Object[]{bulkRequest, str2, e});
        }
    }

    private void flushBulkRequests() {
        this.bulkRequests.entrySet().stream().filter(entry -> {
            return System.currentTimeMillis() - ((BulkRequests) entry.getValue()).getLastFlushTime() >= ((long) this.asyncBufferFlushTimeout) * 1000;
        }).filter(entry2 -> {
            return ((BulkRequests) entry2.getValue()).getBulkRequest() != null && ((BulkRequests) entry2.getValue()).getBulkRequest().numberOfActions() > 0;
        }).forEach(entry3 -> {
            logger.debug("Flushing bulk request buffer for type {}, size: {}", entry3.getKey(), Integer.valueOf(((BulkRequests) entry3.getValue()).getBulkRequest().numberOfActions()));
            indexBulkRequest((String) entry3.getKey());
        });
    }

    static {
        SIMPLE_DATE_FORMAT.setTimeZone(GMT);
    }
}
