package zone.cogni.asquare.service.index;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.collections4.ListUtils;
import org.apache.jena.query.Dataset;
import org.apache.jena.query.DatasetFactory;
import org.apache.jena.rdf.model.Model;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
import zone.cogni.asquare.service.async.AsyncTaskManager;
import zone.cogni.asquare.service.elasticsearch.Params;
import zone.cogni.asquare.triplestore.RdfStoreService;
import zone.cogni.asquare.triplestore.jenamemory.DatasetRdfStoreService;
import zone.cogni.sem.jena.model.ResultSetDto;

@Service
/* loaded from: input_file:zone/cogni/asquare/service/index/IndexService.class */
public class IndexService {
    public static final String INDEX_TIMESTAMP_MS_NAME = "timestampms";
    public static final String INDEX_TIMESTAMP_PROPERTY_NAME = "lastIndexing";
    public static final String INDEX_GRAPH_NAME = "graph";
    private static final Logger log = LoggerFactory.getLogger(IndexService.class);
    private static final String GRAPH_PROP_NAME = "graph";
    private static final String URI_PROP_NAME = "uri";
    private static final String TYPE_PROP_NAME = "type";
    private static final String INDEX_PROP_NAME = "index";
    private static final int INDEX_TYPE_ATTEMPTS = 5;
    private final GraphIndexService graphIndexService;
    private final IndexConfigProvider indexConfigProvider;
    private final String configIndexName;
    private final AsyncTaskManager indexingGraphTaskExecutor;
    private final AsyncTaskManager indexingTaskExecutor;
    private final Long iterationTimeoutMs;

    public IndexService(IndexConfigProvider indexConfigProvider, @Qualifier("indexingGraphTaskExecutor") AsyncTaskManager asyncTaskManager, @Qualifier("indexingTaskExecutor") AsyncTaskManager asyncTaskManager2, @Value("${index.iterationTimeoutMs:3600000}") Long l, @Value("${asquare.ap.config:config}") String str, GraphIndexService graphIndexService) {
        this.configIndexName = str;
        this.iterationTimeoutMs = l;
        this.indexingGraphTaskExecutor = asyncTaskManager;
        this.indexingTaskExecutor = asyncTaskManager2;
        this.graphIndexService = graphIndexService;
        this.indexConfigProvider = indexConfigProvider;
    }

    public List<ResourceIndex> findAllIndexResources(String str, RdfStoreService rdfStoreService) {
        ResultSetDto executePaginatedQuery = IndexUtils.executePaginatedQuery(rdfStoreService, str);
        if (executePaginatedQuery.getVars().containsAll(ImmutableList.of("graph", URI_PROP_NAME, TYPE_PROP_NAME, INDEX_PROP_NAME))) {
            return (List) executePaginatedQuery.stream().map(querySolutionDto -> {
                return ResourceIndex.create(querySolutionDto.getProperty("graph"), querySolutionDto.getProperty(URI_PROP_NAME), querySolutionDto.getProperty(TYPE_PROP_NAME), querySolutionDto.getProperty(INDEX_PROP_NAME));
            }).collect(Collectors.toList());
        }
        throw new IndexException("Query results doesn't contain all required fields: {}, {}, {}, {}", new Object[]{"graph", URI_PROP_NAME, TYPE_PROP_NAME, INDEX_PROP_NAME});
    }

    public Boolean isAsyncIndexBusy() {
        return Boolean.valueOf(this.indexingTaskExecutor.isBusy());
    }

    @Async("indexingTaskExecutor")
    @Deprecated
    public void reindexAsync(String str) {
        reindexSync(str);
    }

    @Async("indexingTaskExecutor")
    @Deprecated
    public void reindexAsync(String[] strArr) {
        reindexSync(strArr);
    }

    @Async("indexingTaskExecutor")
    public void reindexAsync(RdfStoreService rdfStoreService, String str) {
        reindexSync(rdfStoreService, str);
    }

    @Async("indexingTaskExecutor")
    public void reindexAsync(RdfStoreService rdfStoreService, String[] strArr) {
        reindexSync(rdfStoreService, strArr);
    }

    @Deprecated
    public void reindexSync(String[] strArr) {
        for (String str : strArr) {
            reindexSync(str);
        }
    }

    public void reindexSync(RdfStoreService rdfStoreService, String[] strArr) {
        for (String str : strArr) {
            reindexSync(rdfStoreService, str);
        }
    }

    @Deprecated
    public Map<String, Long> reindexSync(String str) {
        return reindexSync(this.indexConfigProvider.getRdfStoreService(), str, null);
    }

    public Map<String, Long> reindexSync(RdfStoreService rdfStoreService, String str) {
        return reindexSync(rdfStoreService, str, null);
    }

    @Deprecated
    public Map<String, Long> reindexSync(String str, Consumer<List<ResourceIndex>> consumer) {
        return reindexSync(this.indexConfigProvider.getRdfStoreService(), str, consumer);
    }

    public Map<String, Long> reindexSync(String str, List<String> list, RdfStoreService rdfStoreService, Consumer<List<ResourceIndex>> consumer) {
        log.info("Launching reindexing for graph {}", str);
        Model executePaginatedConstructQuery = IndexUtils.executePaginatedConstructQuery(rdfStoreService, "select * { graph <" + str + "> { ?x ?y ?z } }");
        log.info("Loaded graph with {} statements", Long.valueOf(executePaginatedConstructQuery.size()));
        Dataset create = DatasetFactory.create();
        create.addNamedModel(str, executePaginatedConstructQuery);
        DatasetRdfStoreService datasetRdfStoreService = new DatasetRdfStoreService(create);
        List<ResourceIndex> list2 = (List) list.stream().flatMap(str2 -> {
            return findAllIndexResources(str2, rdfStoreService).stream();
        }).distinct().collect(Collectors.toList());
        log.info("Queries processed with {} results", Integer.valueOf(list2.size()));
        List list3 = (List) list2.stream().map((v0) -> {
            return v0.getIndex();
        }).distinct().collect(Collectors.toList());
        try {
            ObjectNode readTree = new ObjectMapper().readTree("{  \"query\":{    \"bool\":{      \"must\":[{         \"match\": {           \"graph\":\"" + str + "\"          }        }      ]    }  }}");
            Iterator it = list3.iterator();
            while (it.hasNext()) {
                this.indexConfigProvider.getElasticStore().deleteByQuery((String) it.next(), readTree);
            }
            return list2.size() > 0 ? reindexSync(list2, (RdfStoreService) datasetRdfStoreService, consumer, executePaginatedConstructQuery) : ImmutableMap.of();
        } catch (IOException e) {
            log.error("Can not remove indexed graph {}", str);
            return ImmutableMap.of();
        }
    }

    public void garbageCollect(List<String> list, RdfStoreService rdfStoreService, Consumer<List<ResourceIndex>> consumer, long j) {
        List<String> list2 = (List) ((List) list.stream().flatMap(str -> {
            return findAllIndexResources(str, rdfStoreService).stream();
        }).distinct().collect(Collectors.toList())).stream().map((v0) -> {
            return v0.getIndex();
        }).distinct().collect(Collectors.toList());
        try {
            String str2 = "{  \"query\": {    \"bool\": {      \"must\": [        {          \"range\": {            \"timestampms\": {              \"lt\": " + j + "            }          }        }      ]    }  }}";
            ObjectNode readTree = new ObjectMapper().readTree(str2);
            for (String str3 : list2) {
                log.info("GC preparing to process index {} with query {}", str3, str2);
                ObjectNode deleteByQueryWithAck = this.indexConfigProvider.getElasticStore().deleteByQueryWithAck(str3, readTree, Params.waitFor());
                if (deleteByQueryWithAck != null) {
                    log.info("GC processed index {} with acknowledgement {}", str3, deleteByQueryWithAck);
                }
            }
        } catch (IOException e) {
            log.error("Can not collect garbage: ", e);
        }
    }

    public void garbageCollect(List<String> list, RdfStoreService rdfStoreService, Consumer<List<ResourceIndex>> consumer) {
        try {
            ObjectNode documentById = this.indexConfigProvider.getElasticStore().getDocumentById(this.configIndexName, INDEX_TIMESTAMP_PROPERTY_NAME);
            if (documentById != null) {
                JsonNode jsonNode = documentById.get("_source").get(INDEX_TIMESTAMP_MS_NAME);
                if (jsonNode instanceof LongNode) {
                    garbageCollect(list, rdfStoreService, consumer, jsonNode.longValue());
                } else {
                    log.error("Indexing timestamp has unexpected format {}", jsonNode);
                }
            }
        } catch (Exception e) {
            log.error("Can not collect garbage using index {} {}", this.configIndexName, e);
        }
    }

    public void updateReindexTimestamp(long j) {
        log.info("Requested reindex timestamp update {}", Long.valueOf(j));
        try {
            this.indexConfigProvider.getElasticStore().indexDocument(this.configIndexName, INDEX_TIMESTAMP_PROPERTY_NAME, new ObjectMapper().readTree("{\"timestampms\":" + j + "}"));
            log.info("Reindex timestamp created {}", Long.valueOf(j));
        } catch (Exception e) {
            log.error("Can not write indexing timestamp", e);
        }
    }

    private Map<String, Long> reindexSync(List<ResourceIndex> list, RdfStoreService rdfStoreService, Consumer<List<ResourceIndex>> consumer, Model model) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        if (consumer != null) {
            consumer.accept(list);
        }
        List<ResourceIndex> synchronizedList = Collections.synchronizedList(new ArrayList());
        long size = list.size();
        long j = 0;
        long epochMilli = ZonedDateTime.now().toInstant().toEpochMilli();
        for (int i = 0; !list.isEmpty() && i < INDEX_TYPE_ATTEMPTS; i++) {
            log.info("Indexing {} resources attempt {} with timestamp {}", new Object[]{Integer.valueOf(list.size()), Integer.valueOf(i), Long.valueOf(epochMilli)});
            reindexIteration(list, synchronizedList, rdfStoreService, model, epochMilli);
            j += list.size() - synchronizedList.size();
            list = synchronizedList;
            synchronizedList = Collections.synchronizedList(new ArrayList());
        }
        long size2 = list.size();
        if (size2 > 0) {
            for (ResourceIndex resourceIndex : list) {
                log.error("Resource {} of type {} from graph {} failed all reindex attemts.", new Object[]{resourceIndex.getUri(), resourceIndex.getType(), resourceIndex.getGraph()});
            }
        }
        stopWatch.stop();
        if (log.isInfoEnabled()) {
            log.info("Reindex summary. Total: {}. Indexed: {}. Failed: {}. Time spent: {}", new Object[]{Long.valueOf(size), Long.valueOf(j), Long.valueOf(size2), stopWatch.shortSummary()});
        }
        return ImmutableMap.of("total", Long.valueOf(size), "indexed", Long.valueOf(j), "failed", Long.valueOf(size2), "duration", Long.valueOf(stopWatch.getTotalTimeMillis()));
    }

    public Map<String, Long> reindexSync(RdfStoreService rdfStoreService, String str, Consumer<List<ResourceIndex>> consumer) {
        log.info("Launching reindexing for query {}", str);
        List<ResourceIndex> synchronizedList = Collections.synchronizedList(findAllIndexResources(str, rdfStoreService));
        log.info("Query finished with {} results", Integer.valueOf(synchronizedList.size()));
        return reindexSync(synchronizedList, rdfStoreService, consumer, (Model) null);
    }

    private void reindexIteration(List<ResourceIndex> list, List<ResourceIndex> list2, RdfStoreService rdfStoreService, Model model, long j) {
        long currentTimeMillis = System.currentTimeMillis();
        LinkedHashMap<String, List<ResourceIndex>> groupByGraph = groupByGraph(list);
        log.info("Starting iteration. Found {} resources in {} graphs", Integer.valueOf(list.size()), Integer.valueOf(groupByGraph.size()));
        for (String str : groupByGraph.keySet()) {
            try {
                this.indexingGraphTaskExecutor.awaitPoolIsReady();
                log.info("Starting graph indexing {}", str);
                this.graphIndexService.indexGraphAsync(str, groupByGraph.get(str), list2, Params.noRefresh().withTimestamp(j), rdfStoreService, model);
            } catch (Exception e) {
                log.info("Graph {} indexing is failed. Adding to list of failed resources.", str, e);
                list2.addAll(groupByGraph.get(str));
            }
        }
        log.info("Waiting to stop iteration. Indexing is still busy with {}", String.join(", ", this.indexingGraphTaskExecutor.getExecutionKeysAsStrings()));
        List<String> awaitBusyWithNotMoreAndNoLongerThan = this.indexingGraphTaskExecutor.awaitBusyWithNotMoreAndNoLongerThan(0, this.iterationTimeoutMs.longValue());
        if (awaitBusyWithNotMoreAndNoLongerThan.size() > 0) {
            awaitBusyWithNotMoreAndNoLongerThan.forEach(str2 -> {
                log.warn("Graph {} failed timeout", str2);
            });
            List list3 = (List) list.stream().filter(resourceIndex -> {
                return awaitBusyWithNotMoreAndNoLongerThan.stream().noneMatch(str3 -> {
                    return resourceIndex.getGraph().equals(str3);
                });
            }).collect(Collectors.toList());
            list.clear();
            list.addAll(list3);
        }
        log.info("Indexing iteration is done. Duration: {}, failed resources: {}", IndexUtils.prettyDurationMs(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(list2.size() + awaitBusyWithNotMoreAndNoLongerThan.size()));
    }

    private LinkedHashMap<String, List<ResourceIndex>> groupByGraph(List<ResourceIndex> list) {
        return (LinkedHashMap) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getGraph();
        }, resourceIndex -> {
            return Arrays.asList(resourceIndex);
        }, ListUtils::union, LinkedHashMap::new));
    }

    public Map<String, Long> reindexSimple(RdfStoreService rdfStoreService, String str) {
        log.info("Launching reindexing for query {}", str);
        List<ResourceIndex> synchronizedList = Collections.synchronizedList(findAllIndexResources(str, rdfStoreService));
        log.info("Query finished with {} results", Integer.valueOf(synchronizedList.size()));
        return reindexSimpleExecution(synchronizedList, rdfStoreService);
    }

    private Map<String, Long> reindexSimpleExecution(List<ResourceIndex> list, RdfStoreService rdfStoreService) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        List<ResourceIndex> synchronizedList = Collections.synchronizedList(new ArrayList());
        long size = list.size();
        long j = 0;
        long epochMilli = ZonedDateTime.now().toInstant().toEpochMilli();
        for (int i = 0; !list.isEmpty() && i < INDEX_TYPE_ATTEMPTS; i++) {
            log.info("Indexing {} resources attempt {} with timestamp {}", new Object[]{Integer.valueOf(list.size()), Integer.valueOf(i), Long.valueOf(epochMilli)});
            reindexSimpleIteration(list, synchronizedList, rdfStoreService, epochMilli);
            j += list.size() - synchronizedList.size();
            list = synchronizedList;
            synchronizedList = Collections.synchronizedList(new ArrayList());
        }
        long size2 = list.size();
        if (size2 > 0) {
            for (ResourceIndex resourceIndex : list) {
                log.error("Resource {} of type {} from graph {} failed all reindex attempts.", new Object[]{resourceIndex.getUri(), resourceIndex.getType(), resourceIndex.getGraph()});
            }
        }
        stopWatch.stop();
        if (log.isInfoEnabled()) {
            log.info("Reindex summary. Total: {}. Indexed: {}. Failed: {}. Time spent: {}", new Object[]{Long.valueOf(size), Long.valueOf(j), Long.valueOf(size2), stopWatch.shortSummary()});
        }
        return ImmutableMap.of("total", Long.valueOf(size), "indexed", Long.valueOf(j), "failed", Long.valueOf(size2), "duration", Long.valueOf(stopWatch.getTotalTimeMillis()));
    }

    private void reindexSimpleIteration(List<ResourceIndex> list, List<ResourceIndex> list2, RdfStoreService rdfStoreService, long j) {
        long currentTimeMillis = System.currentTimeMillis();
        LinkedHashMap<String, List<ResourceIndex>> groupByGraph = groupByGraph(list);
        log.info("Starting iteration. Found {} resources in {} graphs", Integer.valueOf(list.size()), Integer.valueOf(groupByGraph.size()));
        for (String str : groupByGraph.keySet()) {
            try {
                log.info("Starting graph indexing {}", str);
                this.graphIndexService.indexGraphSync(str, groupByGraph.get(str), list2, Params.noRefresh().withTimestamp(j), rdfStoreService, null);
            } catch (Exception e) {
                log.info("Graph {} indexing is failed. Adding to list of failed resources.", str, e);
                list2.addAll(groupByGraph.get(str));
            }
        }
        log.info("Indexing iteration is done. Duration: {}, failed resources: {}", IndexUtils.prettyDurationMs(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(list2.size()));
    }
}
