package org.vertexium.elasticsearch7.bulk;

import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.vertexium.ElementId;
import org.vertexium.ElementLocation;
import org.vertexium.VertexiumException;
import org.vertexium.elasticsearch7.Elasticsearch7SearchIndex;
import org.vertexium.elasticsearch7.IndexRefreshTracker;
import org.vertexium.metric.Histogram;
import org.vertexium.metric.Timer;
import org.vertexium.metric.VertexiumMetricRegistry;
import org.vertexium.util.LimitedLinkedBlockingQueue;
import org.vertexium.util.VertexiumLogger;
import org.vertexium.util.VertexiumLoggerFactory;

/* loaded from: input_file:org/vertexium/elasticsearch7/bulk/BulkUpdateService.class */
public class BulkUpdateService {
    private static final VertexiumLogger LOGGER = VertexiumLoggerFactory.getLogger(BulkUpdateService.class);
    private final Elasticsearch7SearchIndex searchIndex;
    private final IndexRefreshTracker indexRefreshTracker;
    private final LimitedLinkedBlockingQueue<BulkItem> incomingItems = new LimitedLinkedBlockingQueue<>();
    private final OutstandingItemsList outstandingItems = new OutstandingItemsList();
    private final Thread processItemsThread = new Thread(this::processItems);
    private final Timer flushTimer;
    private final Histogram batchSizeHistogram;
    private final Duration bulkRequestTimeout;
    private final ThreadPoolExecutor ioExecutor;
    private final int maxFailCount;
    private final BulkItemBatch batch;
    private volatile boolean shutdown;

    public BulkUpdateService(Elasticsearch7SearchIndex elasticsearch7SearchIndex, IndexRefreshTracker indexRefreshTracker, BulkUpdateServiceConfiguration bulkUpdateServiceConfiguration) {
        this.searchIndex = elasticsearch7SearchIndex;
        this.indexRefreshTracker = indexRefreshTracker;
        this.ioExecutor = new ThreadPoolExecutor(bulkUpdateServiceConfiguration.getCorePoolSize(), bulkUpdateServiceConfiguration.getMaximumPoolSize(), 10L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new LimitedLinkedBlockingQueue(), runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("vertexium-es-processItems-io-" + thread.getId());
            return thread;
        });
        this.processItemsThread.setName("vertexium-es-processItems");
        this.processItemsThread.setDaemon(true);
        this.processItemsThread.start();
        this.bulkRequestTimeout = bulkUpdateServiceConfiguration.getBulkRequestTimeout();
        this.maxFailCount = bulkUpdateServiceConfiguration.getMaxFailCount();
        this.batch = new BulkItemBatch(bulkUpdateServiceConfiguration.getMaxBatchSize(), bulkUpdateServiceConfiguration.getMaxBatchSizeInBytes(), bulkUpdateServiceConfiguration.getBatchWindowTime());
        VertexiumMetricRegistry metricsRegistry = elasticsearch7SearchIndex.getMetricsRegistry();
        this.flushTimer = metricsRegistry.getTimer(BulkUpdateService.class, new String[]{"flush", "timer"});
        this.batchSizeHistogram = metricsRegistry.getHistogram(BulkUpdateService.class, new String[]{"batch", "histogram"});
        String createName = metricsRegistry.createName(BulkUpdateService.class, new String[]{"outstandingItems", "size"});
        OutstandingItemsList outstandingItemsList = this.outstandingItems;
        outstandingItemsList.getClass();
        metricsRegistry.getGauge(createName, outstandingItemsList::size);
    }

    private void processItems() {
        while (!this.shutdown) {
            try {
                BulkItem bulkItem = (BulkItem) this.incomingItems.poll(100L, TimeUnit.MILLISECONDS);
                if (this.batch.shouldFlushByTime()) {
                    flushBatch();
                }
                if (bulkItem != null) {
                    if (filterByRetryTime(bulkItem)) {
                        while (!this.batch.add(bulkItem)) {
                            flushBatch();
                        }
                    }
                }
            } catch (InterruptedException e) {
                return;
            } catch (Exception e2) {
                LOGGER.error("process items failed", e2);
            }
        }
    }

    private void flushBatch() {
        List<BulkItem> itemsAndClear = this.batch.getItemsAndClear();
        if (itemsAndClear.size() > 0) {
            this.ioExecutor.execute(() -> {
                processBatch(itemsAndClear);
            });
        }
    }

    private boolean filterByRetryTime(BulkItem bulkItem) {
        if (bulkItem.getFailCount() == 0 || ((long) (bulkItem.getCreatedOrLastTriedTime() + (10.0d * Math.pow(2.0d, bulkItem.getFailCount())))) <= System.currentTimeMillis()) {
            return true;
        }
        this.incomingItems.add(bulkItem);
        return false;
    }

    private void processBatch(List<BulkItem> list) {
        try {
            this.batchSizeHistogram.update(list.size());
            BulkResponse bulkResponse = (BulkResponse) this.searchIndex.getClient().bulk(bulkItemsToBulkRequest(list)).get(this.bulkRequestTimeout.toMillis(), TimeUnit.MILLISECONDS);
            this.indexRefreshTracker.pushChanges((Set) list.stream().peek((v0) -> {
                v0.updateLastTriedTime();
            }).map((v0) -> {
                return v0.getIndexName();
            }).collect(Collectors.toSet()));
            int i = 0;
            for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
                int i2 = i;
                i++;
                BulkItem bulkItem = list.get(i2);
                if (bulkItemResponse.isFailed()) {
                    handleFailure(bulkItem, bulkItemResponse);
                } else {
                    handleSuccess(bulkItem);
                }
            }
        } catch (Exception e) {
            LOGGER.error("bulk request failed", e);
            if (list.size() <= 1) {
                complete(list.get(0), e);
                return;
            }
            Iterator<BulkItem> it = list.iterator();
            while (it.hasNext()) {
                processBatch(Lists.newArrayList(new BulkItem[]{it.next()}));
            }
        }
    }

    private void handleSuccess(BulkItem bulkItem) {
        complete(bulkItem, null);
    }

    private void handleFailure(BulkItem bulkItem, BulkItemResponse bulkItemResponse) {
        BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
        bulkItem.incrementFailCount();
        if (bulkItem.getFailCount() >= this.maxFailCount) {
            complete(bulkItem, new BulkVertexiumException("fail count exceeded the max number of failures", failure));
            return;
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        try {
            this.searchIndex.handleBulkFailure(bulkItem, bulkItemResponse, atomicBoolean);
            if (atomicBoolean.get()) {
                this.incomingItems.add(bulkItem);
            } else {
                complete(bulkItem, null);
            }
        } catch (Exception e) {
            complete(bulkItem, e);
        }
    }

    private void complete(BulkItem bulkItem, Exception exc) {
        this.outstandingItems.remove(bulkItem);
        if (exc == null) {
            bulkItem.getFuture().complete(null);
        } else {
            bulkItem.getFuture().completeExceptionally(exc);
        }
    }

    private BulkRequest bulkItemsToBulkRequest(List<BulkItem> list) {
        BulkRequestBuilder prepareBulk = this.searchIndex.getClient().prepareBulk();
        Iterator<BulkItem> it = list.iterator();
        while (it.hasNext()) {
            DeleteRequest actionRequest = it.next().getActionRequest();
            if (actionRequest instanceof IndexRequest) {
                prepareBulk.add((IndexRequest) actionRequest);
            } else if (actionRequest instanceof UpdateRequest) {
                prepareBulk.add((UpdateRequest) actionRequest);
            } else {
                if (!(actionRequest instanceof DeleteRequest)) {
                    throw new VertexiumException("unhandled request type: " + actionRequest.getClass().getName());
                }
                prepareBulk.add(actionRequest);
            }
        }
        return prepareBulk.request();
    }

    public void flush() {
        this.flushTimer.time(() -> {
            List<CompletableFuture<Void>> futures = this.outstandingItems.getFutures();
            flushBatch();
            try {
                CompletableFuture.allOf((CompletableFuture[]) futures.toArray(new CompletableFuture[0])).get();
            } catch (Exception e) {
                throw new VertexiumException("failed to wait for flush", e);
            }
        });
    }

    public CompletableFuture<Void> addUpdate(ElementLocation elementLocation, UpdateRequest updateRequest) {
        return add(new UpdateBulkItem(elementLocation, updateRequest));
    }

    public CompletableFuture<Void> addUpdate(ElementLocation elementLocation, String str, String str2, UpdateRequest updateRequest) {
        return add(new UpdateBulkItem(elementLocation, str, str2, updateRequest));
    }

    public CompletableFuture<Void> addDelete(ElementId elementId, String str, DeleteRequest deleteRequest) {
        return add(new DeleteBulkItem(elementId, str, deleteRequest));
    }

    private CompletableFuture<Void> add(BulkItem bulkItem) {
        this.outstandingItems.add(bulkItem);
        this.incomingItems.add(bulkItem);
        return bulkItem.getFuture();
    }

    public void shutdown() {
        this.shutdown = true;
        try {
            this.processItemsThread.join(10000L);
        } catch (InterruptedException e) {
        }
        this.ioExecutor.shutdown();
    }

    public void flushUntilElementIdIsComplete(String str) {
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            CompletableFuture<Void> futureForElementId = this.outstandingItems.getFutureForElementId(str);
            if (futureForElementId == null) {
                break;
            }
            try {
                futureForElementId.get();
            } catch (Exception e) {
                throw new VertexiumException("Failed to flushUntilElementIdIsComplete: " + str, e);
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (currentTimeMillis2 > 1000) {
            String format = String.format("flush of %s got stuck for %dms", str, Long.valueOf(currentTimeMillis2));
            if (currentTimeMillis2 > 60000) {
                LOGGER.error("%s", new Object[]{format});
            } else if (currentTimeMillis2 > 10000) {
                LOGGER.warn("%s", new Object[]{format});
            } else {
                LOGGER.info("%s", new Object[]{format});
            }
        }
    }
}
