package org.apache.skywalking.library.elasticsearch.bulk;

import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.util.Exceptions;
import io.netty.buffer.Unpooled;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.apache.skywalking.library.elasticsearch.ElasticSearch;
import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest;
import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory;
import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.class */
public final class BulkProcessor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(BulkProcessor.class);
    private final ArrayBlockingQueue<Holder> requests;
    private final AtomicReference<ElasticSearch> es;
    private final int bulkActions;
    private final Semaphore semaphore;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor$Holder.class */
    public static class Holder {
        private final CompletableFuture<Void> future;
        private final Object request;

        @Generated
        public Holder(CompletableFuture<Void> completableFuture, Object obj) {
            this.future = completableFuture;
            this.request = obj;
        }
    }

    public static BulkProcessorBuilder builder() {
        return new BulkProcessorBuilder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BulkProcessor(AtomicReference<ElasticSearch> atomicReference, int i, Duration duration, int i2) {
        Objects.requireNonNull(duration, "flushInterval");
        this.es = (AtomicReference) Objects.requireNonNull(atomicReference, "es");
        this.bulkActions = i;
        this.semaphore = new Semaphore(i2 > 0 ? i2 : 1);
        this.requests = new ArrayBlockingQueue<>(i + 1);
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("ElasticSearch BulkProcessor");
            return thread;
        });
        scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        scheduledThreadPoolExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
        scheduledThreadPoolExecutor.scheduleWithFixedDelay(new RunnableWithExceptionProtection(this::flush, th -> {
            log.error("flush data to ES failure:", th);
        }), 0L, duration.getSeconds(), TimeUnit.SECONDS);
    }

    public CompletableFuture<Void> add(IndexRequest indexRequest) {
        return internalAdd(indexRequest);
    }

    public CompletableFuture<Void> add(UpdateRequest updateRequest) {
        return internalAdd(updateRequest);
    }

    private CompletableFuture<Void> internalAdd(Object obj) {
        Objects.requireNonNull(obj, "request");
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.requests.put(new Holder(completableFuture, obj));
        flushIfNeeded();
        return completableFuture;
    }

    private void flushIfNeeded() {
        if (this.requests.size() >= this.bulkActions) {
            flush();
        }
    }

    public void flush() {
        if (this.requests.isEmpty()) {
            return;
        }
        try {
            this.semaphore.acquire();
            ArrayList arrayList = new ArrayList(this.requests.size());
            this.requests.drainTo(arrayList);
            CompletableFuture<Void> doFlush = doFlush(arrayList);
            doFlush.whenComplete((r3, th) -> {
                this.semaphore.release();
            });
            doFlush.join();
        } catch (InterruptedException e) {
            log.error("Interrupted when trying to get semaphore to execute bulk requests", e);
        }
    }

    private CompletableFuture<Void> doFlush(List<Holder> list) {
        log.debug("Executing bulk with {} requests", Integer.valueOf(list.size()));
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture thenCompose = this.es.get().version().thenCompose(elasticSearchVersion -> {
            try {
                RequestFactory requestFactory = elasticSearchVersion.requestFactory();
                ArrayList arrayList = new ArrayList();
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    arrayList.add(elasticSearchVersion.codec().encode(((Holder) it.next()).request));
                    arrayList.add("\n".getBytes());
                }
                return this.es.get().client().execute(requestFactory.bulk().bulk(Unpooled.wrappedBuffer((byte[][]) arrayList.toArray((Object[]) new byte[0])))).aggregate().thenAccept(aggregatedHttpResponse -> {
                    if (aggregatedHttpResponse.status() != HttpStatus.OK) {
                        throw new RuntimeException(aggregatedHttpResponse.contentUtf8());
                    }
                });
            } catch (Exception e) {
                return (CompletionStage) Exceptions.throwUnsafely(e);
            }
        });
        thenCompose.whenComplete((r5, th) -> {
            if (th != null) {
                list.stream().map(holder -> {
                    return holder.future;
                }).forEach(completableFuture -> {
                    completableFuture.completeExceptionally(th);
                });
                log.error("Failed to execute requests in bulk", th);
            } else {
                log.debug("Succeeded to execute {} requests in bulk", Integer.valueOf(list.size()));
                list.stream().map(holder2 -> {
                    return holder2.future;
                }).forEach(completableFuture2 -> {
                    completableFuture2.complete(null);
                });
            }
        });
        return thenCompose;
    }
}
