package com.azure.search.documents.implementation.batching;

import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.JsonSerializer;
import com.azure.search.documents.implementation.SearchIndexClientImpl;
import com.azure.search.documents.implementation.converters.IndexActionConverter;
import com.azure.search.documents.implementation.util.Utility;
import com.azure.search.documents.models.IndexAction;
import com.azure.search.documents.models.IndexBatchException;
import com.azure.search.documents.models.IndexDocumentsResult;
import com.azure.search.documents.models.IndexingResult;
import com.azure.search.documents.options.OnActionAddedOptions;
import com.azure.search.documents.options.OnActionErrorOptions;
import com.azure.search.documents.options.OnActionSentOptions;
import com.azure.search.documents.options.OnActionSucceededOptions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/search/documents/implementation/batching/SearchIndexingPublisher.class */
public final class SearchIndexingPublisher<T> {
    private static final double JITTER_FACTOR = 0.05d;
    private static final String BATCH_SIZE_SCALED_DOWN = "Scaling down batch size due to 413 (Payload too large) response.{}Scaled down from {} to {}";
    private static final ClientLogger LOGGER = new ClientLogger(SearchIndexingPublisher.class);
    private final SearchIndexClientImpl restClient;
    private final JsonSerializer serializer;
    private final boolean autoFlush;
    private int batchActionCount;
    private final int maxRetries;
    private final Duration throttlingDelay;
    private final Duration maxThrottlingDelay;
    private final Consumer<OnActionAddedOptions<T>> onActionAddedConsumer;
    private final Consumer<OnActionSentOptions<T>> onActionSentConsumer;
    private final Consumer<OnActionSucceededOptions<T>> onActionSucceededConsumer;
    private final Consumer<OnActionErrorOptions<T>> onActionErrorConsumer;
    private final Function<T, String> documentKeyRetriever;
    private final Function<Integer, Integer> scaleDownFunction = num -> {
        return Integer.valueOf(num.intValue() / 2);
    };
    private final Object actionsMutex = new Object();
    private final LinkedList<TryTrackingIndexAction<T>> actions = new LinkedList<>();
    private final LinkedList<TryTrackingIndexAction<T>> inFlightActions = new LinkedList<>();
    private final Semaphore processingSemaphore = new Semaphore(1);
    volatile AtomicInteger backoffCount = new AtomicInteger();
    volatile Duration currentRetryDelay = Duration.ZERO;

    public SearchIndexingPublisher(SearchIndexClientImpl searchIndexClientImpl, JsonSerializer jsonSerializer, Function<T, String> function, boolean z, int i, int i2, Duration duration, Duration duration2, Consumer<OnActionAddedOptions<T>> consumer, Consumer<OnActionSucceededOptions<T>> consumer2, Consumer<OnActionErrorOptions<T>> consumer3, Consumer<OnActionSentOptions<T>> consumer4) {
        this.documentKeyRetriever = (Function) Objects.requireNonNull(function, "'documentKeyRetriever' cannot be null");
        this.restClient = searchIndexClientImpl;
        this.serializer = jsonSerializer;
        this.autoFlush = z;
        this.batchActionCount = i;
        this.maxRetries = i2;
        this.throttlingDelay = duration;
        this.maxThrottlingDelay = duration2.compareTo(this.throttlingDelay) < 0 ? this.throttlingDelay : duration2;
        this.onActionAddedConsumer = consumer;
        this.onActionSentConsumer = consumer4;
        this.onActionSucceededConsumer = consumer2;
        this.onActionErrorConsumer = consumer3;
    }

    public synchronized Collection<IndexAction<T>> getActions() {
        ArrayList arrayList = new ArrayList();
        Iterator<TryTrackingIndexAction<T>> it = this.inFlightActions.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getAction());
        }
        Iterator<TryTrackingIndexAction<T>> it2 = this.actions.iterator();
        while (it2.hasNext()) {
            arrayList.add(it2.next().getAction());
        }
        return arrayList;
    }

    public int getBatchActionCount() {
        return this.batchActionCount;
    }

    public synchronized Duration getCurrentRetryDelay() {
        return this.currentRetryDelay;
    }

    public synchronized Mono<Void> addActions(Collection<IndexAction<T>> collection, Context context, Runnable runnable) {
        collection.stream().map(indexAction -> {
            return new TryTrackingIndexAction(indexAction, (String) this.documentKeyRetriever.apply(indexAction.getDocument()));
        }).forEach(tryTrackingIndexAction -> {
            if (this.onActionAddedConsumer != null) {
                this.onActionAddedConsumer.accept(new OnActionAddedOptions<>(tryTrackingIndexAction.getAction()));
            }
            this.actions.add(tryTrackingIndexAction);
        });
        LOGGER.verbose("Actions added, new pending queue size: {}.", new Object[]{Integer.valueOf(this.actions.size())});
        if (!this.autoFlush || !batchAvailableForProcessing()) {
            return Mono.empty();
        }
        runnable.run();
        LOGGER.verbose("Adding documents triggered batch size limit, sending documents for indexing.");
        return flush(false, false, context);
    }

    public Mono<Void> flush(boolean z, boolean z2, Context context) {
        if (z) {
            this.processingSemaphore.acquireUninterruptibly();
            return flushLoop(z2, context).doFinally(signalType -> {
                this.processingSemaphore.release();
            });
        }
        if (this.processingSemaphore.tryAcquire()) {
            return flushLoop(z2, context).doFinally(signalType2 -> {
                this.processingSemaphore.release();
            });
        }
        LOGGER.verbose("Batch already in-flight and not waiting for completion. Performing no-op.");
        return Mono.empty();
    }

    private Mono<Void> flushLoop(boolean z, Context context) {
        return createAndProcessBatch(context).expand(indexBatchResponse -> {
            return Flux.defer(() -> {
                return (batchAvailableForProcessing() || z) ? createAndProcessBatch(context) : Flux.empty();
            });
        }).then();
    }

    private Mono<IndexBatchResponse> createAndProcessBatch(Context context) {
        List<TryTrackingIndexAction<T>> createBatch = createBatch();
        return CoreUtils.isNullOrEmpty(createBatch) ? Mono.empty() : sendBatch((List) createBatch.stream().map(tryTrackingIndexAction -> {
            return IndexActionConverter.map(tryTrackingIndexAction.getAction(), this.serializer);
        }).collect(Collectors.toList()), createBatch, context).map(indexBatchResponse -> {
            handleResponse(createBatch, indexBatchResponse);
            return indexBatchResponse;
        });
    }

    private List<TryTrackingIndexAction<T>> createBatch() {
        ArrayList arrayList;
        synchronized (this.actionsMutex) {
            int min = Math.min(this.batchActionCount, this.actions.size() + this.inFlightActions.size());
            arrayList = new ArrayList(min);
            HashSet hashSet = new HashSet(min * 2);
            int fillFromQueue = fillFromQueue(arrayList, this.inFlightActions, min, hashSet);
            if (fillFromQueue == min) {
                reinsertFailedActions(this.inFlightActions);
            } else {
                fillFromQueue(arrayList, this.actions, min - fillFromQueue, hashSet);
            }
        }
        return arrayList;
    }

    private int fillFromQueue(List<TryTrackingIndexAction<T>> list, List<TryTrackingIndexAction<T>> list2, int i, Set<String> set) {
        int i2 = 0;
        int i3 = 0;
        int size = list2.size();
        while (i3 < i && i2 < size) {
            int i4 = i2;
            i2++;
            TryTrackingIndexAction<T> tryTrackingIndexAction = list2.get(i4 - i3);
            if (!set.contains(tryTrackingIndexAction.getKey())) {
                set.add(tryTrackingIndexAction.getKey());
                list.add(list2.remove((i2 - 1) - i3));
                i3++;
            }
        }
        return i3;
    }

    private Mono<IndexBatchResponse> sendBatch(List<com.azure.search.documents.implementation.models.IndexAction> list, List<TryTrackingIndexAction<T>> list2, Context context) {
        LOGGER.verbose("Sending a batch of size {}.", new Object[]{Integer.valueOf(list2.size())});
        if (this.onActionSentConsumer != null) {
            list2.forEach(tryTrackingIndexAction -> {
                this.onActionSentConsumer.accept(new OnActionSentOptions<>(tryTrackingIndexAction.getAction()));
            });
        }
        Mono<Response<IndexDocumentsResult>> indexDocumentsWithResponseAsync = Utility.indexDocumentsWithResponseAsync(this.restClient, list, true, context, LOGGER);
        if (!this.currentRetryDelay.isZero() && !this.currentRetryDelay.isNegative()) {
            indexDocumentsWithResponseAsync = indexDocumentsWithResponseAsync.delaySubscription(this.currentRetryDelay);
        }
        return indexDocumentsWithResponseAsync.map(response -> {
            return new IndexBatchResponse(response.getStatusCode(), ((IndexDocumentsResult) response.getValue()).getResults(), list.size(), false);
        }).doOnCancel(() -> {
            LOGGER.warning("Request was cancelled before response, adding all in-flight documents back to queue.");
            this.inFlightActions.addAll(list2);
        }).onErrorResume(IndexBatchException.class, indexBatchException -> {
            return Mono.just(new IndexBatchResponse(207, indexBatchException.getIndexingResults(), list.size(), true));
        }).onErrorResume(HttpResponseException.class, httpResponseException -> {
            int statusCode = httpResponseException.getResponse().getStatusCode();
            if (statusCode != 413) {
                return Mono.just(new IndexBatchResponse(statusCode, null, list.size(), true));
            }
            int min = Math.min(this.batchActionCount, list.size());
            this.batchActionCount = Math.max(1, this.scaleDownFunction.apply(Integer.valueOf(min)).intValue());
            LOGGER.verbose(BATCH_SIZE_SCALED_DOWN, new Object[]{System.lineSeparator(), Integer.valueOf(min), Integer.valueOf(this.batchActionCount)});
            int size = list.size();
            if (size == 1) {
                return Mono.just(new IndexBatchResponse(statusCode, null, size, true));
            }
            int min2 = Math.min(list.size(), this.batchActionCount);
            List<TryTrackingIndexAction<T>> subList = list2.subList(min2, list2.size());
            reinsertFailedActions(subList);
            subList.clear();
            return sendBatch(list.subList(0, min2), list2, context);
        }).onErrorResume(Exception.class, exc -> {
            return Mono.just(new IndexBatchResponse(0, null, list.size(), true));
        });
    }

    private void handleResponse(List<TryTrackingIndexAction<T>> list, IndexBatchResponse indexBatchResponse) {
        if (indexBatchResponse.getStatusCode() == 413 && indexBatchResponse.getCount() == 1) {
            IndexAction<T> action = list.get(0).getAction();
            if (this.onActionErrorConsumer != null) {
                this.onActionErrorConsumer.accept(new OnActionErrorOptions(action).setThrowable(createDocumentTooLargeException()));
                return;
            }
            return;
        }
        ArrayList arrayList = new ArrayList();
        boolean z = indexBatchResponse.getStatusCode() == 503;
        if (indexBatchResponse.getResults() == null) {
            arrayList.addAll(list);
        } else {
            for (IndexingResult indexingResult : indexBatchResponse.getResults()) {
                String key = indexingResult.getKey();
                TryTrackingIndexAction<T> orElse = list.stream().filter(tryTrackingIndexAction -> {
                    return key.equals(tryTrackingIndexAction.getKey());
                }).findFirst().orElse(null);
                if (orElse == null) {
                    LOGGER.warning("Unable to correlate result key {} to initial document.", new Object[]{key});
                } else if (isSuccess(indexingResult.getStatusCode())) {
                    if (this.onActionSucceededConsumer != null) {
                        this.onActionSucceededConsumer.accept(new OnActionSucceededOptions<>(orElse.getAction()));
                    }
                } else if (isRetryable(indexingResult.getStatusCode())) {
                    z |= indexingResult.getStatusCode() == 503;
                    if (orElse.getTryCount() < this.maxRetries) {
                        orElse.incrementTryCount();
                        arrayList.add(orElse);
                    } else if (this.onActionErrorConsumer != null) {
                        this.onActionErrorConsumer.accept(new OnActionErrorOptions(orElse.getAction()).setThrowable(createDocumentHitRetryLimitException()).setIndexingResult(indexingResult));
                    }
                } else if (this.onActionErrorConsumer != null) {
                    this.onActionErrorConsumer.accept(new OnActionErrorOptions(orElse.getAction()).setIndexingResult(indexingResult));
                }
            }
        }
        if (z) {
            this.currentRetryDelay = calculateRetryDelay(this.backoffCount.getAndIncrement());
        } else {
            this.backoffCount.set(0);
            this.currentRetryDelay = Duration.ZERO;
        }
        if (CoreUtils.isNullOrEmpty(arrayList)) {
            return;
        }
        reinsertFailedActions(arrayList);
    }

    private void reinsertFailedActions(List<TryTrackingIndexAction<T>> list) {
        synchronized (this.actionsMutex) {
            for (int size = list.size() - 1; size >= 0; size--) {
                this.actions.push(list.get(size));
            }
        }
    }

    private boolean batchAvailableForProcessing() {
        return this.actions.size() + this.inFlightActions.size() >= this.batchActionCount;
    }

    private static boolean isSuccess(int i) {
        return i == 200 || i == 201;
    }

    private static boolean isRetryable(int i) {
        return i == 409 || i == 422 || i == 503;
    }

    private Duration calculateRetryDelay(int i) {
        return Duration.ofNanos(Math.min((1 << i) * ThreadLocalRandom.current().nextLong((long) (this.throttlingDelay.toNanos() * 0.95d), (long) (this.throttlingDelay.toNanos() * 1.05d)), this.maxThrottlingDelay.toNanos()));
    }

    private static RuntimeException createDocumentTooLargeException() {
        return new RuntimeException("Document is too large to be indexed and won't be tried again.");
    }

    private static RuntimeException createDocumentHitRetryLimitException() {
        return new RuntimeException("Document has reached retry limit and won't be tried again.");
    }
}
