package com.netflix.conductor.es6.dao.index;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.es6.config.ElasticSearchProperties;
import com.netflix.conductor.es6.dao.query.parser.internal.ParserException;
import com.netflix.conductor.metrics.Monitors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.support.RetryTemplate;

@Trace
/* loaded from: input_file:com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.class */
public class ElasticSearchDAOV6 extends ElasticSearchBaseDAO implements IndexDAO {
    private static final String WORKFLOW_DOC_TYPE = "workflow";
    private static final String TASK_DOC_TYPE = "task";
    private static final String LOG_DOC_TYPE = "task_log";
    private static final String EVENT_DOC_TYPE = "event";
    private static final String MSG_DOC_TYPE = "message";
    private static final int CORE_POOL_SIZE = 6;
    private static final long KEEP_ALIVE_TIME = 1;
    private static final int UPDATE_REQUEST_RETRY_COUNT = 5;
    private final String workflowIndexName;
    private final String taskIndexName;
    private final String eventIndexPrefix;
    private String eventIndexName;
    private final String messageIndexPrefix;
    private String messageIndexName;
    private String logIndexName;
    private final String logIndexPrefix;
    private final String docTypeOverride;
    private final ObjectMapper objectMapper;
    private final Client elasticSearchClient;
    private final ExecutorService executorService;
    private final ExecutorService logExecutorService;
    private final ConcurrentHashMap<String, BulkRequests> bulkRequests;
    private final int indexBatchSize;
    private final long asyncBufferFlushTimeout;
    private final ElasticSearchProperties properties;
    private final RetryTemplate retryTemplate;
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchDAOV6.class);
    private static final String CLASS_NAME = ElasticSearchDAOV6.class.getSimpleName();
    private static final TimeZone GMT = TimeZone.getTimeZone("GMT");
    private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMWW");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6$BulkRequests.class */
    public static class BulkRequests {
        private long lastFlushTime;
        private BulkRequestBuilderWrapper bulkRequestBuilder;

        public long getLastFlushTime() {
            return this.lastFlushTime;
        }

        public void setLastFlushTime(long j) {
            this.lastFlushTime = j;
        }

        public BulkRequestBuilderWrapper getBulkRequestBuilder() {
            return this.bulkRequestBuilder;
        }

        public void setBulkRequestBuilder(BulkRequestBuilder bulkRequestBuilder) {
            this.bulkRequestBuilder = new BulkRequestBuilderWrapper(bulkRequestBuilder);
        }

        BulkRequests(long j, BulkRequestBuilder bulkRequestBuilder) {
            this.lastFlushTime = j;
            this.bulkRequestBuilder = new BulkRequestBuilderWrapper(bulkRequestBuilder);
        }
    }

    public ElasticSearchDAOV6(Client client, RetryTemplate retryTemplate, ElasticSearchProperties elasticSearchProperties, ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
        this.elasticSearchClient = client;
        this.indexPrefix = elasticSearchProperties.getIndexPrefix();
        this.workflowIndexName = getIndexName(WORKFLOW_DOC_TYPE);
        this.taskIndexName = getIndexName(TASK_DOC_TYPE);
        this.logIndexPrefix = this.indexPrefix + "_task_log";
        this.messageIndexPrefix = this.indexPrefix + "_message";
        this.eventIndexPrefix = this.indexPrefix + "_event";
        int asyncWorkerQueueSize = elasticSearchProperties.getAsyncWorkerQueueSize();
        int asyncMaxPoolSize = elasticSearchProperties.getAsyncMaxPoolSize();
        this.bulkRequests = new ConcurrentHashMap<>();
        this.indexBatchSize = elasticSearchProperties.getIndexBatchSize();
        this.asyncBufferFlushTimeout = elasticSearchProperties.getAsyncBufferFlushTimeout().toMillis();
        this.properties = elasticSearchProperties;
        if (elasticSearchProperties.isAutoIndexManagementEnabled() || !StringUtils.isNotBlank(elasticSearchProperties.getDocumentTypeOverride())) {
            this.docTypeOverride = "";
        } else {
            this.docTypeOverride = elasticSearchProperties.getDocumentTypeOverride();
        }
        this.executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, asyncMaxPoolSize, KEEP_ALIVE_TIME, TimeUnit.MINUTES, new LinkedBlockingQueue(asyncWorkerQueueSize), (runnable, threadPoolExecutor) -> {
            LOGGER.warn("Request  {} to async dao discarded in executor {}", runnable, threadPoolExecutor);
            Monitors.recordDiscardedIndexingCount("indexQueue");
        });
        this.logExecutorService = new ThreadPoolExecutor(1, 2, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue(asyncWorkerQueueSize), (runnable2, threadPoolExecutor2) -> {
            LOGGER.warn("Request {} to async log dao discarded in executor {}", runnable2, threadPoolExecutor2);
            Monitors.recordDiscardedIndexingCount("logQueue");
        });
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::flushBulkRequests, 60L, 30L, TimeUnit.SECONDS);
        this.retryTemplate = retryTemplate;
    }

    @PreDestroy
    private void shutdown() {
        LOGGER.info("Starting graceful shutdown of executor service");
        shutdownExecutorService(this.logExecutorService);
        shutdownExecutorService(this.executorService);
    }

    private void shutdownExecutorService(ExecutorService executorService) {
        try {
            executorService.shutdown();
            if (executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOGGER.debug("tasks completed, shutting down");
            } else {
                LOGGER.warn("Forcing shutdown after waiting for 30 seconds");
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            LOGGER.warn("Shutdown interrupted, invoking shutdownNow on scheduledThreadPoolExecutor for delay queue");
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @PostConstruct
    public void setup() throws Exception {
        waitForHealthyCluster();
        if (this.properties.isAutoIndexManagementEnabled()) {
            createIndexesTemplates();
            createWorkflowIndex();
            createTaskIndex();
        }
    }

    private void waitForHealthyCluster() throws Exception {
        this.elasticSearchClient.admin().cluster().prepareHealth(new String[0]).setWaitForGreenStatus().execute().get();
    }

    private void createIndexesTemplates() {
        try {
            initIndexesTemplates();
            updateIndexesNames();
            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(this::updateIndexesNames, 0L, KEEP_ALIVE_TIME, TimeUnit.HOURS);
        } catch (Exception e) {
            LOGGER.error("Error creating index templates", e);
        }
    }

    private void initIndexesTemplates() {
        initIndexTemplate(LOG_DOC_TYPE);
        initIndexTemplate(EVENT_DOC_TYPE);
        initIndexTemplate(MSG_DOC_TYPE);
    }

    private void initIndexTemplate(String str) {
        String str2 = "template_" + str;
        if (((GetIndexTemplatesResponse) this.elasticSearchClient.admin().indices().prepareGetTemplates(new String[]{str2}).execute().actionGet()).getIndexTemplates().isEmpty()) {
            LOGGER.info("Creating the index template '{}'", str2);
            try {
                this.elasticSearchClient.admin().indices().preparePutTemplate(str2).setSource(loadTypeMappingSource("/" + str2 + ".json").getBytes(), XContentType.JSON).execute().actionGet();
            } catch (Exception e) {
                LOGGER.error("Failed to init " + str2, e);
            }
        }
    }

    private void updateIndexesNames() {
        this.logIndexName = updateIndexName(LOG_DOC_TYPE);
        this.eventIndexName = updateIndexName(EVENT_DOC_TYPE);
        this.messageIndexName = updateIndexName(MSG_DOC_TYPE);
    }

    private String updateIndexName(String str) {
        String str2 = this.indexPrefix + "_" + str + "_" + SIMPLE_DATE_FORMAT.format(new Date());
        createIndex(str2);
        return str2;
    }

    private void createWorkflowIndex() {
        createIndex(this.workflowIndexName);
        addTypeMapping(this.workflowIndexName, WORKFLOW_DOC_TYPE, "/mappings_docType_workflow.json");
    }

    private void createTaskIndex() {
        createIndex(this.taskIndexName);
        addTypeMapping(this.taskIndexName, TASK_DOC_TYPE, "/mappings_docType_task.json");
    }

    private void createIndex(String str) {
        try {
            this.elasticSearchClient.admin().indices().prepareGetIndex().addIndices(new String[]{str}).execute().actionGet();
        } catch (IndexNotFoundException e) {
            try {
                CreateIndexRequest createIndexRequest = new CreateIndexRequest(str);
                createIndexRequest.settings(Settings.builder().put("index.number_of_shards", this.properties.getIndexShardCount()).put("index.number_of_replicas", this.properties.getIndexReplicasCount()));
                this.elasticSearchClient.admin().indices().create(createIndexRequest).actionGet();
            } catch (ResourceAlreadyExistsException e2) {
                LOGGER.error("Failed to update log index name: {}", str, e2);
            }
        }
    }

    private void addTypeMapping(String str, String str2, String str3) {
        if (((GetMappingsResponse) this.elasticSearchClient.admin().indices().prepareGetMappings(new String[]{str}).addTypes(new String[]{str2}).execute().actionGet()).mappings().isEmpty()) {
            LOGGER.info("Adding the {} type mappings", str);
            try {
                this.elasticSearchClient.admin().indices().preparePutMapping(new String[]{str}).setType(str2).setSource(loadTypeMappingSource(str3), XContentType.JSON).execute().actionGet();
            } catch (Exception e) {
                LOGGER.error("Failed to init index " + str + " mappings", e);
            }
        }
    }

    public void indexWorkflow(WorkflowSummary workflowSummary) {
        try {
            long epochMilli = Instant.now().toEpochMilli();
            this.elasticSearchClient.update(buildUpdateRequest(workflowSummary.getWorkflowId(), this.objectMapper.writeValueAsBytes(workflowSummary), this.workflowIndexName, StringUtils.isBlank(this.docTypeOverride) ? WORKFLOW_DOC_TYPE : this.docTypeOverride)).actionGet();
            long epochMilli2 = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for indexing workflow: {}", Long.valueOf(epochMilli2 - epochMilli), workflowSummary.getWorkflowId());
            Monitors.recordESIndexTime("index_workflow", WORKFLOW_DOC_TYPE, epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
        } catch (Exception e) {
            Monitors.error(CLASS_NAME, "indexWorkflow");
            LOGGER.error("Failed to index workflow: {}", workflowSummary.getWorkflowId(), e);
        }
    }

    public CompletableFuture<Void> asyncIndexWorkflow(WorkflowSummary workflowSummary) {
        return CompletableFuture.runAsync(() -> {
            indexWorkflow(workflowSummary);
        }, this.executorService);
    }

    public void indexTask(TaskSummary taskSummary) {
        try {
            long epochMilli = Instant.now().toEpochMilli();
            String taskId = taskSummary.getTaskId();
            byte[] writeValueAsBytes = this.objectMapper.writeValueAsBytes(taskSummary);
            UpdateRequest updateRequest = new UpdateRequest(this.taskIndexName, StringUtils.isBlank(this.docTypeOverride) ? TASK_DOC_TYPE : this.docTypeOverride, taskId);
            updateRequest.doc(writeValueAsBytes, XContentType.JSON);
            updateRequest.upsert(writeValueAsBytes, XContentType.JSON);
            indexObject(updateRequest, TASK_DOC_TYPE);
            long epochMilli2 = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for  indexing task:{} in workflow: {}", new Object[]{Long.valueOf(epochMilli2 - epochMilli), taskSummary.getTaskId(), taskSummary.getWorkflowId()});
            Monitors.recordESIndexTime("index_task", TASK_DOC_TYPE, epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
        } catch (Exception e) {
            LOGGER.error("Failed to index task: {}", taskSummary.getTaskId(), e);
        }
    }

    public CompletableFuture<Void> asyncIndexTask(TaskSummary taskSummary) {
        return CompletableFuture.runAsync(() -> {
            indexTask(taskSummary);
        }, this.executorService);
    }

    private void indexObject(UpdateRequest updateRequest, String str) {
        if (this.bulkRequests.get(str) == null) {
            this.bulkRequests.put(str, new BulkRequests(System.currentTimeMillis(), this.elasticSearchClient.prepareBulk()));
        }
        this.bulkRequests.get(str).getBulkRequestBuilder().add(updateRequest);
        if (this.bulkRequests.get(str).getBulkRequestBuilder().numberOfActions() >= this.indexBatchSize) {
            indexBulkRequest(str);
        }
    }

    private synchronized void indexBulkRequest(String str) {
        if (this.bulkRequests.get(str).getBulkRequestBuilder() == null || this.bulkRequests.get(str).getBulkRequestBuilder().numberOfActions() <= 0) {
            return;
        }
        updateWithRetry(this.bulkRequests.get(str).getBulkRequestBuilder(), str);
        this.bulkRequests.put(str, new BulkRequests(System.currentTimeMillis(), this.elasticSearchClient.prepareBulk()));
    }

    public void addTaskExecutionLogs(List<TaskExecLog> list) {
        if (list.isEmpty()) {
            return;
        }
        try {
            long epochMilli = Instant.now().toEpochMilli();
            BulkRequestBuilderWrapper bulkRequestBuilderWrapper = new BulkRequestBuilderWrapper(this.elasticSearchClient.prepareBulk());
            for (TaskExecLog taskExecLog : list) {
                IndexRequest indexRequest = new IndexRequest(this.logIndexName, StringUtils.isBlank(this.docTypeOverride) ? LOG_DOC_TYPE : this.docTypeOverride);
                indexRequest.source(this.objectMapper.writeValueAsBytes(taskExecLog), XContentType.JSON);
                bulkRequestBuilderWrapper.add(indexRequest);
            }
            bulkRequestBuilderWrapper.execute().actionGet(5L, TimeUnit.SECONDS);
            long epochMilli2 = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for indexing taskExecutionLogs", Long.valueOf(epochMilli2 - epochMilli));
            Monitors.recordESIndexTime("index_task_execution_logs", LOG_DOC_TYPE, epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("logQueue", ((ThreadPoolExecutor) this.logExecutorService).getQueue().size());
        } catch (Exception e) {
            LOGGER.error("Failed to index task execution logs for tasks: {}", (List) list.stream().map((v0) -> {
                return v0.getTaskId();
            }).collect(Collectors.toList()), e);
        }
    }

    public CompletableFuture<Void> asyncAddTaskExecutionLogs(List<TaskExecLog> list) {
        return CompletableFuture.runAsync(() -> {
            addTaskExecutionLogs(list);
        }, this.logExecutorService);
    }

    public List<TaskExecLog> getTaskExecutionLogs(String str) {
        try {
            return mapTaskExecLogsResponse((SearchResponse) this.elasticSearchClient.prepareSearch(new String[]{this.logIndexPrefix + "*"}).setQuery(boolQueryBuilder("taskId='" + str + "'", "*")).setTypes(new String[]{StringUtils.isBlank(this.docTypeOverride) ? LOG_DOC_TYPE : this.docTypeOverride}).setSize(this.properties.getTaskLogResultLimit()).addSort(SortBuilders.fieldSort("createdTime").order(SortOrder.ASC)).execute().actionGet());
        } catch (Exception e) {
            LOGGER.error("Failed to get task execution logs for task: {}", str, e);
            return null;
        }
    }

    private List<TaskExecLog> mapTaskExecLogsResponse(SearchResponse searchResponse) throws IOException {
        SearchHit[] hits = searchResponse.getHits().getHits();
        ArrayList arrayList = new ArrayList(hits.length);
        for (SearchHit searchHit : hits) {
            arrayList.add((TaskExecLog) this.objectMapper.readValue(searchHit.getSourceAsString(), TaskExecLog.class));
        }
        return arrayList;
    }

    public void addMessage(String str, Message message) {
        try {
            long epochMilli = Instant.now().toEpochMilli();
            HashMap hashMap = new HashMap();
            hashMap.put("messageId", message.getId());
            hashMap.put("payload", message.getPayload());
            hashMap.put("queue", str);
            hashMap.put("created", Long.valueOf(System.currentTimeMillis()));
            UpdateRequest updateRequest = new UpdateRequest(this.messageIndexName, StringUtils.isBlank(this.docTypeOverride) ? MSG_DOC_TYPE : this.docTypeOverride, message.getId());
            updateRequest.doc(hashMap, XContentType.JSON);
            updateRequest.upsert(hashMap, XContentType.JSON);
            indexObject(updateRequest, MSG_DOC_TYPE);
            long epochMilli2 = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for  indexing message: {}", Long.valueOf(epochMilli2 - epochMilli), message.getId());
            Monitors.recordESIndexTime("add_message", MSG_DOC_TYPE, epochMilli2 - epochMilli);
        } catch (Exception e) {
            LOGGER.error("Failed to index message: {}", message.getId(), e);
        }
    }

    public CompletableFuture<Void> asyncAddMessage(String str, Message message) {
        return CompletableFuture.runAsync(() -> {
            addMessage(str, message);
        }, this.executorService);
    }

    public List<Message> getMessages(String str) {
        try {
            return mapGetMessagesResponse((SearchResponse) this.elasticSearchClient.prepareSearch(new String[]{this.messageIndexPrefix + "*"}).setQuery(boolQueryBuilder("queue='" + str + "'", "*")).setTypes(new String[]{StringUtils.isBlank(this.docTypeOverride) ? MSG_DOC_TYPE : this.docTypeOverride}).addSort(SortBuilders.fieldSort("created").order(SortOrder.ASC)).execute().actionGet());
        } catch (Exception e) {
            LOGGER.error("Failed to get messages for queue: {}", str, e);
            return null;
        }
    }

    private List<Message> mapGetMessagesResponse(SearchResponse searchResponse) throws IOException {
        SearchHit[] hits = searchResponse.getHits().getHits();
        MapType constructMapType = TypeFactory.defaultInstance().constructMapType(HashMap.class, String.class, String.class);
        ArrayList arrayList = new ArrayList(hits.length);
        for (SearchHit searchHit : hits) {
            Map map = (Map) this.objectMapper.readValue(searchHit.getSourceAsString(), constructMapType);
            arrayList.add(new Message((String) map.get("messageId"), (String) map.get("payload"), (String) null));
        }
        return arrayList;
    }

    public void addEventExecution(EventExecution eventExecution) {
        try {
            long epochMilli = Instant.now().toEpochMilli();
            byte[] writeValueAsBytes = this.objectMapper.writeValueAsBytes(eventExecution);
            indexObject(buildUpdateRequest(eventExecution.getName() + "." + eventExecution.getEvent() + "." + eventExecution.getMessageId() + "." + eventExecution.getId(), writeValueAsBytes, this.eventIndexName, StringUtils.isBlank(this.docTypeOverride) ? EVENT_DOC_TYPE : this.docTypeOverride), EVENT_DOC_TYPE);
            long epochMilli2 = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for indexing event execution: {}", Long.valueOf(epochMilli2 - epochMilli), eventExecution.getId());
            Monitors.recordESIndexTime("add_event_execution", EVENT_DOC_TYPE, epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("logQueue", ((ThreadPoolExecutor) this.logExecutorService).getQueue().size());
        } catch (Exception e) {
            LOGGER.error("Failed to index event execution: {}", eventExecution.getId(), e);
        }
    }

    public CompletableFuture<Void> asyncAddEventExecution(EventExecution eventExecution) {
        return CompletableFuture.runAsync(() -> {
            addEventExecution(eventExecution);
        }, this.logExecutorService);
    }

    public List<EventExecution> getEventExecutions(String str) {
        try {
            return mapEventExecutionsResponse((SearchResponse) this.elasticSearchClient.prepareSearch(new String[]{this.eventIndexPrefix + "*"}).setQuery(boolQueryBuilder("event='" + str + "'", "*")).setTypes(new String[]{StringUtils.isBlank(this.docTypeOverride) ? EVENT_DOC_TYPE : this.docTypeOverride}).addSort(SortBuilders.fieldSort("created").order(SortOrder.ASC)).execute().actionGet());
        } catch (Exception e) {
            LOGGER.error("Failed to get executions for event: {}", str, e);
            return null;
        }
    }

    private List<EventExecution> mapEventExecutionsResponse(SearchResponse searchResponse) throws IOException {
        SearchHit[] hits = searchResponse.getHits().getHits();
        ArrayList arrayList = new ArrayList(hits.length);
        for (SearchHit searchHit : hits) {
            arrayList.add((EventExecution) this.objectMapper.readValue(searchHit.getSourceAsString(), EventExecution.class));
        }
        return arrayList;
    }

    private void updateWithRetry(BulkRequestBuilderWrapper bulkRequestBuilderWrapper, String str) {
        try {
            long epochMilli = Instant.now().toEpochMilli();
            this.retryTemplate.execute(retryContext -> {
                return (BulkResponse) bulkRequestBuilderWrapper.execute().actionGet(5L, TimeUnit.SECONDS);
            });
            long epochMilli2 = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for indexing object of type: {}", Long.valueOf(epochMilli2 - epochMilli), str);
            Monitors.recordESIndexTime("index_object", str, epochMilli2 - epochMilli);
        } catch (Exception e) {
            Monitors.error(CLASS_NAME, "index");
            LOGGER.error("Failed to index {} for requests", Integer.valueOf(bulkRequestBuilderWrapper.numberOfActions()), e);
        }
    }

    public SearchResult<String> searchWorkflows(String str, String str2, int i, int i2, List<String> list) {
        return search(str, i, i2, list, str2, WORKFLOW_DOC_TYPE, true, String.class);
    }

    public SearchResult<WorkflowSummary> searchWorkflowSummary(String str, String str2, int i, int i2, List<String> list) {
        return search(str, i, i2, list, str2, WORKFLOW_DOC_TYPE, false, WorkflowSummary.class);
    }

    public long getWorkflowCount(String str, String str2) {
        return count(str, str2, WORKFLOW_DOC_TYPE);
    }

    public SearchResult<String> searchTasks(String str, String str2, int i, int i2, List<String> list) {
        return search(str, i, i2, list, str2, TASK_DOC_TYPE, true, String.class);
    }

    public SearchResult<TaskSummary> searchTaskSummary(String str, String str2, int i, int i2, List<String> list) {
        return search(str, i, i2, list, str2, TASK_DOC_TYPE, false, TaskSummary.class);
    }

    public void removeWorkflow(String str) {
        try {
            long epochMilli = Instant.now().toEpochMilli();
            if (((DeleteResponse) this.elasticSearchClient.delete(new DeleteRequest(this.workflowIndexName, WORKFLOW_DOC_TYPE, str)).actionGet()).getResult() == DocWriteResponse.Result.DELETED) {
                LOGGER.error("Index removal failed - document not found by id: {}", str);
            }
            long epochMilli2 = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for removing workflow: {}", Long.valueOf(epochMilli2 - epochMilli), str);
            Monitors.recordESIndexTime("remove_workflow", WORKFLOW_DOC_TYPE, epochMilli2 - epochMilli);
            Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
        } catch (Throwable th) {
            LOGGER.error("Failed to remove workflow {} from index", str, th);
            Monitors.error(CLASS_NAME, "remove");
        }
    }

    public CompletableFuture<Void> asyncRemoveWorkflow(String str) {
        return CompletableFuture.runAsync(() -> {
            removeWorkflow(str);
        }, this.executorService);
    }

    public void updateWorkflow(String str, String[] strArr, Object[] objArr) {
        if (strArr.length != objArr.length) {
            throw new IllegalArgumentException("Number of keys and values do not match");
        }
        long epochMilli = Instant.now().toEpochMilli();
        UpdateRequest updateRequest = new UpdateRequest(this.workflowIndexName, WORKFLOW_DOC_TYPE, str);
        updateRequest.doc((Map) IntStream.range(0, strArr.length).boxed().collect(Collectors.toMap(num -> {
            return strArr[num.intValue()];
        }, num2 -> {
            return objArr[num2.intValue()];
        })));
        LOGGER.debug("Updating workflow {} in elasticsearch index: {}", str, this.workflowIndexName);
        this.elasticSearchClient.update(updateRequest).actionGet();
        long epochMilli2 = Instant.now().toEpochMilli();
        LOGGER.debug("Time taken {} for updating workflow: {}", Long.valueOf(epochMilli2 - epochMilli), str);
        Monitors.recordESIndexTime("update_workflow", WORKFLOW_DOC_TYPE, epochMilli2 - epochMilli);
        Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
    }

    public CompletableFuture<Void> asyncUpdateWorkflow(String str, String[] strArr, Object[] objArr) {
        return CompletableFuture.runAsync(() -> {
            updateWorkflow(str, strArr, objArr);
        }, this.executorService);
    }

    public void removeTask(String str, String str2) {
        try {
            long epochMilli = Instant.now().toEpochMilli();
            String str3 = StringUtils.isBlank(this.docTypeOverride) ? TASK_DOC_TYPE : this.docTypeOverride;
            if (searchTasks(String.format("(taskId='%s') AND (workflowId='%s')", str2, str), "*", 0, 1, null).getTotalHits() == 0) {
                LOGGER.error("Task: {} does not belong to workflow: {}", str2, str);
                Monitors.error(CLASS_NAME, "removeTask");
                return;
            }
            DeleteResponse deleteResponse = (DeleteResponse) this.elasticSearchClient.delete(new DeleteRequest(this.taskIndexName, str3, str2)).actionGet();
            long epochMilli2 = Instant.now().toEpochMilli();
            if (deleteResponse.getResult() != DocWriteResponse.Result.DELETED) {
                LOGGER.error("Index removal failed - task not found by id: {} of workflow: {}", str2, str);
                Monitors.error(CLASS_NAME, "removeTask");
            } else {
                LOGGER.debug("Time taken {} for removing task:{} of workflow: {}", new Object[]{Long.valueOf(epochMilli2 - epochMilli), str2, str});
                Monitors.recordESIndexTime("remove_task", str3, epochMilli2 - epochMilli);
                Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
            }
        } catch (Exception e) {
            LOGGER.error("Failed to remove task: {} of workflow: {} from index", new Object[]{str2, str, e});
            Monitors.error(CLASS_NAME, "removeTask");
        }
    }

    public CompletableFuture<Void> asyncRemoveTask(String str, String str2) {
        return CompletableFuture.runAsync(() -> {
            removeTask(str, str2);
        }, this.executorService);
    }

    public void updateTask(String str, String str2, String[] strArr, Object[] objArr) {
        if (strArr.length != objArr.length) {
            throw new IllegalArgumentException("Number of keys and values do not match");
        }
        long epochMilli = Instant.now().toEpochMilli();
        String str3 = StringUtils.isBlank(this.docTypeOverride) ? TASK_DOC_TYPE : this.docTypeOverride;
        UpdateRequest updateRequest = new UpdateRequest(this.taskIndexName, str3, str2);
        updateRequest.doc((Map) IntStream.range(0, strArr.length).boxed().collect(Collectors.toMap(num -> {
            return strArr[num.intValue()];
        }, num2 -> {
            return objArr[num2.intValue()];
        })));
        LOGGER.debug("Updating task: {} of workflow: {} in elasticsearch index: {}", new Object[]{str2, str, this.taskIndexName});
        this.elasticSearchClient.update(updateRequest).actionGet();
        long epochMilli2 = Instant.now().toEpochMilli();
        LOGGER.debug("Time taken {} for updating task: {} of workflow: {}", new Object[]{Long.valueOf(epochMilli2 - epochMilli), str2, str});
        Monitors.recordESIndexTime("update_task", str3, epochMilli2 - epochMilli);
        Monitors.recordWorkerQueueSize("indexQueue", ((ThreadPoolExecutor) this.executorService).getQueue().size());
    }

    public CompletableFuture<Void> asyncUpdateTask(String str, String str2, String[] strArr, Object[] objArr) {
        return CompletableFuture.runAsync(() -> {
            updateTask(str, str2, strArr, objArr);
        }, this.executorService);
    }

    public String get(String str, String str2) {
        GetResponse getResponse = (GetResponse) this.elasticSearchClient.get(new GetRequest(this.workflowIndexName, StringUtils.isBlank(this.docTypeOverride) ? WORKFLOW_DOC_TYPE : this.docTypeOverride, str).fetchSourceContext(new FetchSourceContext(true, new String[]{str2}, Strings.EMPTY_ARRAY))).actionGet();
        if (getResponse.isExists()) {
            Map sourceAsMap = getResponse.getSourceAsMap();
            if (sourceAsMap.get(str2) != null) {
                return sourceAsMap.get(str2).toString();
            }
        }
        LOGGER.debug("Unable to find Workflow: {} in ElasticSearch index: {}.", str, this.workflowIndexName);
        return null;
    }

    private long count(String str, String str2, String str3) {
        try {
            String str4 = StringUtils.isBlank(this.docTypeOverride) ? str3 : this.docTypeOverride;
            return this.elasticSearchClient.prepareSearch(new String[]{getIndexName(str4)}).setQuery(boolQueryBuilder(str, str2)).setTypes(new String[]{str4}).storedFields(new String[]{"_id"}).setSize(0).get().getHits().getTotalHits();
        } catch (ParserException e) {
            throw new TransientException(e.getMessage(), e);
        }
    }

    private <T> SearchResult<T> search(String str, int i, int i2, List<String> list, String str2, String str3, boolean z, Class<T> cls) {
        try {
            String str4 = StringUtils.isBlank(this.docTypeOverride) ? str3 : this.docTypeOverride;
            SearchRequestBuilder size = this.elasticSearchClient.prepareSearch(new String[]{getIndexName(str4)}).setQuery(boolQueryBuilder(str, str2)).setTypes(new String[]{str4}).setFrom(i).setSize(i2);
            if (z) {
                size.storedFields(new String[]{"_id"});
            }
            addSortOptions(size, list);
            return mapSearchResult((SearchResponse) size.get(), z, cls);
        } catch (ParserException e) {
            throw new TransientException(e.getMessage(), e);
        }
    }

    private void addSortOptions(SearchRequestBuilder searchRequestBuilder, List<String> list) {
        if (list != null) {
            list.forEach(str -> {
                SortOrder sortOrder = SortOrder.ASC;
                String str = str;
                int indexOf = str.indexOf(58);
                if (indexOf > 0) {
                    str = str.substring(0, indexOf);
                    sortOrder = SortOrder.valueOf(str.substring(indexOf + 1));
                }
                searchRequestBuilder.addSort(str, sortOrder);
            });
        }
    }

    private <T> SearchResult<T> mapSearchResult(SearchResponse searchResponse, boolean z, Class<T> cls) {
        SearchHits hits = searchResponse.getHits();
        return new SearchResult<>(hits.getTotalHits(), z ? (List) Arrays.stream(hits.getHits()).map(searchHit -> {
            return cls.cast(searchHit.getId());
        }).collect(Collectors.toList()) : (List) Arrays.stream(hits.getHits()).map(searchHit2 -> {
            try {
                return this.objectMapper.readValue(searchHit2.getSourceAsString(), cls);
            } catch (JsonProcessingException e) {
                LOGGER.error("Failed to de-serialize elasticsearch from source: {}", searchHit2.getSourceAsString(), e);
                return null;
            }
        }).collect(Collectors.toList()));
    }

    public List<String> searchArchivableWorkflows(String str, long j) {
        return extractSearchIds(this.elasticSearchClient.prepareSearch(new String[]{str}).setTypes(new String[]{StringUtils.isBlank(this.docTypeOverride) ? WORKFLOW_DOC_TYPE : this.docTypeOverride}).setQuery(QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("endTime").lt(LocalDate.now().minusDays(j).toString()).gte(LocalDate.now().minusDays(j).minusDays(KEEP_ALIVE_TIME).toString())).should(QueryBuilders.termQuery("status", "COMPLETED")).should(QueryBuilders.termQuery("status", "FAILED")).should(QueryBuilders.termQuery("status", "TIMED_OUT")).should(QueryBuilders.termQuery("status", "TERMINATED")).mustNot(QueryBuilders.existsQuery("archived")).minimumShouldMatch(1)).setSize(1000));
    }

    private UpdateRequest buildUpdateRequest(String str, byte[] bArr, String str2, String str3) {
        UpdateRequest updateRequest = new UpdateRequest(str2, str3, str);
        updateRequest.doc(bArr, XContentType.JSON);
        updateRequest.upsert(bArr, XContentType.JSON);
        updateRequest.retryOnConflict(UPDATE_REQUEST_RETRY_COUNT);
        return updateRequest;
    }

    private List<String> extractSearchIds(SearchRequestBuilder searchRequestBuilder) {
        SearchHits hits = ((SearchResponse) searchRequestBuilder.execute().actionGet()).getHits();
        LinkedList linkedList = new LinkedList();
        for (SearchHit searchHit : hits.getHits()) {
            linkedList.add(searchHit.getId());
        }
        return linkedList;
    }

    private void flushBulkRequests() {
        this.bulkRequests.entrySet().stream().filter(entry -> {
            return System.currentTimeMillis() - ((BulkRequests) entry.getValue()).getLastFlushTime() >= this.asyncBufferFlushTimeout;
        }).filter(entry2 -> {
            return ((BulkRequests) entry2.getValue()).getBulkRequestBuilder() != null && ((BulkRequests) entry2.getValue()).getBulkRequestBuilder().numberOfActions() > 0;
        }).forEach(entry3 -> {
            LOGGER.debug("Flushing bulk request buffer for type {}, size: {}", entry3.getKey(), Integer.valueOf(((BulkRequests) entry3.getValue()).getBulkRequestBuilder().numberOfActions()));
            indexBulkRequest((String) entry3.getKey());
        });
    }

    static {
        SIMPLE_DATE_FORMAT.setTimeZone(GMT);
    }
}
