package com.azure.search.documents.implementation.batching;

import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.SharedExecutorService;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.JsonSerializer;
import com.azure.search.documents.implementation.SearchIndexClientImpl;
import com.azure.search.documents.implementation.converters.IndexActionConverter;
import com.azure.search.documents.implementation.util.Utility;
import com.azure.search.documents.models.IndexAction;
import com.azure.search.documents.models.IndexBatchException;
import com.azure.search.documents.models.IndexDocumentsResult;
import com.azure.search.documents.models.IndexingResult;
import com.azure.search.documents.options.OnActionAddedOptions;
import com.azure.search.documents.options.OnActionErrorOptions;
import com.azure.search.documents.options.OnActionSentOptions;
import com.azure.search.documents.options.OnActionSucceededOptions;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import reactor.util.function.Tuple2;

/* loaded from: input_file:com/azure/search/documents/implementation/batching/SearchIndexingPublisher.class */
public final class SearchIndexingPublisher<T> {
    private static final ClientLogger LOGGER = new ClientLogger(SearchIndexingPublisher.class);
    private final SearchIndexClientImpl restClient;
    private final JsonSerializer serializer;
    private final boolean autoFlush;
    private int batchSize;
    private final int maxRetries;
    private final long throttlingDelayNanos;
    private final long maxThrottlingDelayNanos;
    private final Consumer<OnActionAddedOptions<T>> onActionAdded;
    private final Consumer<OnActionSentOptions<T>> onActionSent;
    private final Consumer<OnActionSucceededOptions<T>> onActionSucceeded;
    private final Consumer<OnActionErrorOptions<T>> onActionError;
    private final Function<T, String> documentKeyRetriever;
    private final Function<Integer, Integer> scaleDownFunction = num -> {
        return Integer.valueOf(num.intValue() / 2);
    };
    private final ReentrantLock lock = new ReentrantLock(true);
    volatile AtomicInteger backoffCount = new AtomicInteger();
    volatile Duration currentRetryDelay = Duration.ZERO;
    private final IndexingDocumentManager<T> documentManager = new IndexingDocumentManager<>();

    public SearchIndexingPublisher(SearchIndexClientImpl searchIndexClientImpl, JsonSerializer jsonSerializer, Function<T, String> function, boolean z, int i, int i2, Duration duration, Duration duration2, Consumer<OnActionAddedOptions<T>> consumer, Consumer<OnActionSucceededOptions<T>> consumer2, Consumer<OnActionErrorOptions<T>> consumer3, Consumer<OnActionSentOptions<T>> consumer4) {
        this.documentKeyRetriever = (Function) Objects.requireNonNull(function, "'documentKeyRetriever' cannot be null");
        this.restClient = searchIndexClientImpl;
        this.serializer = jsonSerializer;
        this.autoFlush = z;
        this.batchSize = i;
        this.maxRetries = i2;
        this.throttlingDelayNanos = duration.toNanos();
        this.maxThrottlingDelayNanos = duration2.compareTo(duration) < 0 ? this.throttlingDelayNanos : duration2.toNanos();
        this.onActionAdded = consumer;
        this.onActionSent = consumer4;
        this.onActionSucceeded = consumer2;
        this.onActionError = consumer3;
    }

    public Collection<IndexAction<T>> getActions() {
        return this.documentManager.getActions();
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public Duration getCurrentRetryDelay() {
        return this.currentRetryDelay;
    }

    public void addActions(Collection<IndexAction<T>> collection, Duration duration, Context context, Runnable runnable) {
        Tuple2<Integer, Boolean> addAndCheckForBatch = this.documentManager.addAndCheckForBatch(collection, this.documentKeyRetriever, this.onActionAdded, this.batchSize);
        LOGGER.verbose("Actions added, new pending queue size: {}.", new Object[]{addAndCheckForBatch.getT1()});
        if (this.autoFlush && ((Boolean) addAndCheckForBatch.getT2()).booleanValue()) {
            runnable.run();
            LOGGER.verbose("Adding documents triggered batch size limit, sending documents for indexing.");
            flush(false, false, duration, context);
        }
    }

    public void flush(boolean z, boolean z2, Duration duration, Context context) {
        if (z) {
            this.lock.lock();
            try {
                flushLoop(z2, duration, context);
                this.lock.unlock();
                return;
            } finally {
            }
        }
        if (!this.lock.tryLock()) {
            LOGGER.verbose("Batch already in-flight and not waiting for completion. Performing no-op.");
            return;
        }
        try {
            flushLoop(z2, duration, context);
            this.lock.unlock();
        } finally {
        }
    }

    private void flushLoop(boolean z, Duration duration, Context context) {
        if (duration == null || duration.isNegative() || duration.isZero()) {
            flushLoopHelper(z, context, null);
            return;
        }
        AtomicReference atomicReference = new AtomicReference();
        try {
            CoreUtils.getResultWithTimeout(SharedExecutorService.getInstance().submit(() -> {
                flushLoopHelper(z, context, atomicReference);
            }), duration);
        } catch (InterruptedException e) {
            throw LOGGER.logExceptionAsError(new RuntimeException(e));
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause instanceof Error) {
                throw ((Error) cause);
            }
            if (!(cause instanceof RuntimeException)) {
                throw LOGGER.logExceptionAsError(new RuntimeException(cause));
            }
            throw LOGGER.logExceptionAsError((RuntimeException) cause);
        } catch (TimeoutException e3) {
            this.documentManager.reinsertCancelledActions((List) atomicReference.get());
            throw LOGGER.logExceptionAsError(new RuntimeException(e3));
        }
    }

    private void flushLoopHelper(boolean z, Context context, AtomicReference<List<TryTrackingIndexAction<T>>> atomicReference) {
        List<TryTrackingIndexAction<T>> tryCreateBatch;
        List<TryTrackingIndexAction<T>> tryCreateBatch2 = this.documentManager.tryCreateBatch(this.batchSize, true);
        if (atomicReference != null) {
            atomicReference.set(tryCreateBatch2);
        }
        IndexBatchResponse processBatch = processBatch(tryCreateBatch2, context);
        while (processBatch != null && (tryCreateBatch = this.documentManager.tryCreateBatch(this.batchSize, z)) != null) {
            if (atomicReference != null) {
                atomicReference.set(tryCreateBatch);
            }
            processBatch = processBatch(tryCreateBatch, context);
        }
    }

    private IndexBatchResponse processBatch(List<TryTrackingIndexAction<T>> list, Context context) {
        if (CoreUtils.isNullOrEmpty(list)) {
            return null;
        }
        IndexBatchResponse sendBatch = sendBatch((List) list.stream().map(tryTrackingIndexAction -> {
            return IndexActionConverter.map(tryTrackingIndexAction.getAction(), this.serializer);
        }).collect(Collectors.toList()), list, context);
        handleResponse(list, sendBatch);
        return sendBatch;
    }

    private IndexBatchResponse sendBatch(List<com.azure.search.documents.implementation.models.IndexAction> list, List<TryTrackingIndexAction<T>> list2, Context context) {
        LOGGER.verbose("Sending a batch of size {}.", new Object[]{Integer.valueOf(list2.size())});
        if (this.onActionSent != null) {
            list2.forEach(tryTrackingIndexAction -> {
                this.onActionSent.accept(new OnActionSentOptions<>(tryTrackingIndexAction.getAction()));
            });
        }
        if (!this.currentRetryDelay.isZero() && !this.currentRetryDelay.isNegative()) {
            sleep(this.currentRetryDelay.toMillis());
        }
        try {
            Response<IndexDocumentsResult> indexDocumentsWithResponse = Utility.indexDocumentsWithResponse(this.restClient, list, true, context, LOGGER);
            return new IndexBatchResponse(indexDocumentsWithResponse.getStatusCode(), ((IndexDocumentsResult) indexDocumentsWithResponse.getValue()).getResults(), list.size(), false);
        } catch (IndexBatchException e) {
            return new IndexBatchResponse(207, e.getIndexingResults(), list.size(), true);
        } catch (HttpResponseException e2) {
            int statusCode = e2.getResponse().getStatusCode();
            if (statusCode != 413) {
                return new IndexBatchResponse(statusCode, null, list.size(), true);
            }
            int min = Math.min(this.batchSize, list.size());
            this.batchSize = Math.max(1, this.scaleDownFunction.apply(Integer.valueOf(min)).intValue());
            LOGGER.verbose("Scaling down batch size due to 413 (Payload too large) response.{}Scaled down from {} to {}", new Object[]{System.lineSeparator(), Integer.valueOf(min), Integer.valueOf(this.batchSize)});
            int size = list.size();
            if (size == 1) {
                return new IndexBatchResponse(statusCode, null, size, true);
            }
            int min2 = Math.min(list.size(), this.batchSize);
            List<TryTrackingIndexAction<T>> subList = list2.subList(min2, list2.size());
            this.documentManager.reinsertFailedActions(subList);
            subList.clear();
            return sendBatch(list.subList(0, min2), list2, context);
        } catch (Exception e3) {
            return new IndexBatchResponse(0, null, list.size(), true);
        }
    }

    private void handleResponse(List<TryTrackingIndexAction<T>> list, IndexBatchResponse indexBatchResponse) {
        if (indexBatchResponse.getStatusCode() == 413 && indexBatchResponse.getCount() == 1) {
            IndexAction<T> action = list.get(0).getAction();
            if (this.onActionError != null) {
                this.onActionError.accept(new OnActionErrorOptions(action).setThrowable(SearchBatchingUtils.createDocumentTooLargeException()));
                return;
            }
            return;
        }
        LinkedList linkedList = new LinkedList();
        boolean z = indexBatchResponse.getStatusCode() == 503;
        if (indexBatchResponse.getResults() == null) {
            linkedList.addAll(list);
        } else {
            for (IndexingResult indexingResult : indexBatchResponse.getResults()) {
                String key = indexingResult.getKey();
                TryTrackingIndexAction<T> orElse = list.stream().filter(tryTrackingIndexAction -> {
                    return key.equals(tryTrackingIndexAction.getKey());
                }).findFirst().orElse(null);
                if (orElse == null) {
                    LOGGER.warning("Unable to correlate result key {} to initial document.", new Object[]{key});
                } else if (SearchBatchingUtils.isSuccess(indexingResult.getStatusCode())) {
                    if (this.onActionSucceeded != null) {
                        this.onActionSucceeded.accept(new OnActionSucceededOptions<>(orElse.getAction()));
                    }
                } else if (SearchBatchingUtils.isRetryable(indexingResult.getStatusCode())) {
                    z |= indexingResult.getStatusCode() == 503;
                    if (orElse.getTryCount() < this.maxRetries) {
                        orElse.incrementTryCount();
                        linkedList.add(orElse);
                    } else if (this.onActionError != null) {
                        this.onActionError.accept(new OnActionErrorOptions(orElse.getAction()).setThrowable(SearchBatchingUtils.createDocumentHitRetryLimitException()).setIndexingResult(indexingResult));
                    }
                } else if (this.onActionError != null) {
                    this.onActionError.accept(new OnActionErrorOptions(orElse.getAction()).setIndexingResult(indexingResult));
                }
            }
        }
        if (z) {
            this.currentRetryDelay = SearchBatchingUtils.calculateRetryDelay(this.backoffCount.getAndIncrement(), this.throttlingDelayNanos, this.maxThrottlingDelayNanos);
        } else {
            this.backoffCount.set(0);
            this.currentRetryDelay = Duration.ZERO;
        }
        if (CoreUtils.isNullOrEmpty(linkedList)) {
            return;
        }
        this.documentManager.reinsertFailedActions(linkedList);
    }

    private static void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }
}
