package io.telicent.smart.cache.search.sinks;

import io.telicent.smart.cache.observability.events.CounterEvent;
import io.telicent.smart.cache.observability.events.DurationEvent;
import io.telicent.smart.cache.observability.events.EventListener;
import io.telicent.smart.cache.observability.events.EventUtil;
import io.telicent.smart.cache.observability.events.MetricEvent;
import io.telicent.smart.cache.projectors.Sink;
import io.telicent.smart.cache.projectors.sinks.builder.SinkBuilder;
import io.telicent.smart.cache.search.SearchIndexer;
import io.telicent.smart.cache.search.model.SearchIndexBulkResult;
import io.telicent.smart.cache.search.model.SearchIndexBulkResults;
import io.telicent.smart.cache.sources.Event;
import io.telicent.smart.cache.sources.Header;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/telicent/smart/cache/search/sinks/BulkSearchIndexerSink.class */
public class BulkSearchIndexerSink<TKey, TValue> extends SearchIndexerSink<TKey, TValue> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BulkSearchIndexerSink.class);
    public static final Duration MINIMUM_IDLE_TIME = Duration.ofSeconds(1);
    public static final Duration DEFAULT_IDLE_TIME = Duration.ofMinutes(1);
    private final ExecutorService executor;
    private final IndexingIdleTrigger<TKey, TValue> trigger;
    private final List<Event<TKey, TValue>> items;
    private final int batchSize;
    private final int flushPerBatches;
    private int batches;
    private int idleTriggers;
    private long lastIndexedAt;
    private boolean currentBatchIsForDeletion;

    /* loaded from: input_file:io/telicent/smart/cache/search/sinks/BulkSearchIndexerSink$Builder.class */
    public static final class Builder<TKey, TValue> implements SinkBuilder<Event<TKey, TValue>, BulkSearchIndexerSink<TKey, TValue>> {
        private SearchIndexer<TValue> indexer;
        private Function<TValue, String> idProvider;
        private Function<Event<TKey, TValue>, Boolean> isDeletion = event -> {
            return false;
        };
        private IndexDeletionAction onDelete = IndexDeletionAction.CONTENTS;
        private Duration maxIdleTime = BulkSearchIndexerSink.DEFAULT_IDLE_TIME;
        private int indexBatchSize = 1000;
        private int flushPerBatches = 10;
        private long reportBatchSize = 1000;
        private Sink<Event<TKey, TValue>> deadLetterSink;
        private EventListener<?>[] eventListeners;

        public Builder<TKey, TValue> indexer(SearchIndexer<TValue> searchIndexer) {
            this.indexer = searchIndexer;
            return this;
        }

        public Builder<TKey, TValue> idProvider(Function<TValue, String> function) {
            this.idProvider = function;
            return this;
        }

        public Builder<TKey, TValue> indexBatchSize(int i) {
            this.indexBatchSize = i;
            return this;
        }

        public Builder<TKey, TValue> isDeletionWhen(Function<Event<TKey, TValue>, Boolean> function) {
            this.isDeletion = function;
            return this;
        }

        public Builder<TKey, TValue> noDeletes() {
            this.isDeletion = event -> {
                return false;
            };
            return this;
        }

        public Builder<TKey, TValue> onDeletion(IndexDeletionAction indexDeletionAction) {
            this.onDelete = indexDeletionAction;
            return this;
        }

        public Builder<TKey, TValue> reportBatchSize(long j) {
            this.reportBatchSize = j;
            return this;
        }

        public Builder<TKey, TValue> deadLetterSink(Sink<?> sink) {
            this.deadLetterSink = sink;
            return this;
        }

        public Builder<TKey, TValue> batchSize(int i) {
            return indexBatchSize(i).reportBatchSize(i);
        }

        public Builder<TKey, TValue> flushPerBatches(int i) {
            this.flushPerBatches = i;
            return this;
        }

        public Builder<TKey, TValue> maxIdleTime(Duration duration) {
            this.maxIdleTime = duration;
            return this;
        }

        public Builder<TKey, TValue> eventListener(EventListener<?>... eventListenerArr) {
            this.eventListeners = eventListenerArr;
            return this;
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public BulkSearchIndexerSink<TKey, TValue> m20build() {
            BulkSearchIndexerSink<TKey, TValue> bulkSearchIndexerSink = new BulkSearchIndexerSink<>(this.indexer, this.idProvider, this.isDeletion, this.onDelete, this.indexBatchSize, this.flushPerBatches, this.reportBatchSize, this.maxIdleTime, this.deadLetterSink);
            if (this.eventListeners != null) {
                Stream stream = Arrays.stream(this.eventListeners);
                Objects.requireNonNull(bulkSearchIndexerSink);
                stream.forEach(bulkSearchIndexerSink::addListener);
            }
            return bulkSearchIndexerSink;
        }
    }

    /* loaded from: input_file:io/telicent/smart/cache/search/sinks/BulkSearchIndexerSink$IndexingIdleTrigger.class */
    private static final class IndexingIdleTrigger<TKey, TValue> implements Runnable {
        private static final Logger LOGGER = LoggerFactory.getLogger(IndexingIdleTrigger.class);
        public static final long IDLE_TRIGGER_CHECK_INTERVAL = 250;
        private final BulkSearchIndexerSink<TKey, TValue> sink;
        private final Duration maxIdleTime;
        private volatile boolean cancelled;

        public IndexingIdleTrigger(BulkSearchIndexerSink<TKey, TValue> bulkSearchIndexerSink, Duration duration) {
            this.sink = bulkSearchIndexerSink;
            this.maxIdleTime = duration;
        }

        public void cancel() {
            this.cancelled = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            LOGGER.debug("Bulk Indexing configured with max idle time of {}", this.maxIdleTime);
            while (!this.cancelled) {
                if (Duration.ofMillis(System.currentTimeMillis() - ((BulkSearchIndexerSink) this.sink).lastIndexedAt).compareTo(this.maxIdleTime) >= 1) {
                    synchronized (((BulkSearchIndexerSink) this.sink).items) {
                        if (!((BulkSearchIndexerSink) this.sink).items.isEmpty()) {
                            LOGGER.debug("Triggering a bulk index as max idle time of {} was exceeded", this.maxIdleTime);
                            ((BulkSearchIndexerSink) this.sink).idleTriggers++;
                            try {
                                this.sink.bulkIndex();
                            } catch (Throwable th) {
                                LOGGER.error("Background idle time bulk indexing failed:", th);
                            }
                        }
                    }
                }
                try {
                    Thread.sleep(250L);
                } catch (InterruptedException e) {
                }
            }
            LOGGER.debug("Bulk Indexing idle trigger cancelled");
        }
    }

    BulkSearchIndexerSink(SearchIndexer<TValue> searchIndexer, Function<TValue, String> function, Function<Event<TKey, TValue>, Boolean> function2, IndexDeletionAction indexDeletionAction, int i, int i2, long j, Duration duration, Sink<Event<TKey, TValue>> sink) {
        super(searchIndexer, function, function2, indexDeletionAction, j, sink);
        this.items = new ArrayList();
        this.lastIndexedAt = System.currentTimeMillis();
        if (i <= 0) {
            throw new IllegalArgumentException("indexBatchSize must be >= 1");
        }
        if (i > j) {
            throw new IllegalArgumentException("indexBatchSize must be < reportBatchSize");
        }
        if (j % i != 0) {
            throw new IllegalArgumentException("reportBatchSize must be a multiple of the indexBatchSize");
        }
        if (i2 <= 0) {
            throw new IllegalArgumentException("flushPerBatches must be >= 1");
        }
        this.batchSize = i;
        this.flushPerBatches = i2;
        duration = duration == null ? DEFAULT_IDLE_TIME : duration;
        boolean z = i > 1;
        if (MINIMUM_IDLE_TIME.compareTo(duration) >= 1) {
            throw new IllegalArgumentException(String.format("maxIdleTime must be a minimum of %,d seconds", Long.valueOf(MINIMUM_IDLE_TIME.getSeconds())));
        }
        this.executor = z ? Executors.newSingleThreadExecutor() : null;
        this.trigger = z ? new IndexingIdleTrigger<>(this, duration) : null;
        if (z) {
            this.executor.submit(this.trigger);
        }
    }

    @Override // io.telicent.smart.cache.search.sinks.SearchIndexerSink
    public void send(Event<TKey, TValue> event) {
        if (this.batchSize == 1) {
            super.send((Event) event);
            return;
        }
        this.tracker.itemReceived();
        synchronized (this.items) {
            boolean booleanValue = this.isDeletion.apply(event).booleanValue();
            if (booleanValue != this.currentBatchIsForDeletion) {
                if (!this.items.isEmpty()) {
                    bulkIndex();
                }
                this.currentBatchIsForDeletion = booleanValue;
            }
            this.items.add(event);
            if (this.items.size() == this.batchSize) {
                bulkIndex();
            }
        }
    }

    private void bulkIndex() {
        SearchIndexBulkResults<TValue> bulkIndex;
        synchronized (this.items) {
            LOGGER.debug("Bulk indexing {} items", Integer.valueOf(this.items.size()));
            List list = this.items.stream().map((v0) -> {
                return v0.value();
            }).toList();
            long currentTimeMillis = System.currentTimeMillis();
            if (this.currentBatchIsForDeletion) {
                switch (this.deleteAction) {
                    case DOCUMENT:
                        bulkIndex = this.indexer.bulkDeleteDocuments(this.idProvider, list);
                        EventUtil.emit(this.eventSupport, metricsFor(bulkIndex, "delete", currentTimeMillis, System.currentTimeMillis()));
                        break;
                    case CONTENTS:
                    default:
                        bulkIndex = this.indexer.bulkDeleteContents(this.idProvider, list);
                        EventUtil.emit(this.eventSupport, metricsFor(bulkIndex, "deletecontents", currentTimeMillis, System.currentTimeMillis()));
                        break;
                }
            } else {
                bulkIndex = this.indexer.bulkIndex(this.idProvider, list);
                EventUtil.emit(this.eventSupport, metricsFor(bulkIndex, "index", currentTimeMillis, System.currentTimeMillis()));
            }
            if (bulkIndex.getSuccessfulCount() > 0) {
                this.tracker.itemsProcessed(bulkIndex.getSuccessfulCount());
            }
            IntStream range = IntStream.range(0, this.items.size());
            SearchIndexBulkResults<TValue> searchIndexBulkResults = bulkIndex;
            Objects.requireNonNull(searchIndexBulkResults);
            IntStream filter = range.filter(searchIndexBulkResults::isSuccessful);
            List<Event<TKey, TValue>> list2 = this.items;
            Objects.requireNonNull(list2);
            Map map = (Map) filter.mapToObj(list2::get).filter(event -> {
                return event.source() != null;
            }).collect(Collectors.groupingBy((v0) -> {
                return v0.source();
            }));
            if (!map.isEmpty()) {
                map.forEach((eventSource, list3) -> {
                    try {
                        eventSource.processed(list3);
                    } catch (Throwable th) {
                        LOGGER.warn("Failed to report items processed to event source {} ({}): {}", new Object[]{eventSource.getClass().getSimpleName(), eventSource, th.getMessage()});
                    }
                });
            }
            if (this.deadLetterSink != null) {
                SearchIndexBulkResults<TValue> searchIndexBulkResults2 = bulkIndex;
                IntStream range2 = IntStream.range(0, this.items.size());
                SearchIndexBulkResults<TValue> searchIndexBulkResults3 = bulkIndex;
                Objects.requireNonNull(searchIndexBulkResults3);
                range2.filter(searchIndexBulkResults3::isFailure).mapToObj(i -> {
                    return new ImmutablePair(this.items.get(i), (SearchIndexBulkResult) searchIndexBulkResults2.getResults().get(i));
                }).forEach(immutablePair -> {
                    Event<TKey, TValue> decorateDeadLetterEventWithMetadata = decorateDeadLetterEventWithMetadata((Event) immutablePair.getLeft(), (SearchIndexBulkResult<?>) immutablePair.getRight());
                    try {
                        this.deadLetterSink.send(decorateDeadLetterEventWithMetadata);
                    } catch (Throwable th) {
                        LOGGER.error("Failed to send index error event [" + String.valueOf(decorateDeadLetterEventWithMetadata) + "] to dead letter sink: ", th);
                    }
                });
            }
            this.items.clear();
            this.batches++;
            if (this.batches % this.flushPerBatches == 0) {
                this.indexer.flush(false);
            }
            this.lastIndexedAt = System.currentTimeMillis();
        }
    }

    private List<MetricEvent> metricsFor(SearchIndexBulkResults<TValue> searchIndexBulkResults, String str, long j, long j2) {
        ArrayList arrayList = new ArrayList();
        if (searchIndexBulkResults.getSuccessfulCount() > 0) {
            arrayList.add(CounterEvent.counterEvent("search.indexer.bulksink." + str + ".success", searchIndexBulkResults.getSuccessfulCount()));
        }
        if (searchIndexBulkResults.getFailureCount() > 0) {
            arrayList.add(CounterEvent.counterEvent("search.indexer.bulksink." + str + ".failure", searchIndexBulkResults.getFailureCount()));
        }
        if (searchIndexBulkResults.size() > 0) {
            arrayList.add(DurationEvent.durationEvent("search.indexer.bulksink." + str + ".duration", j, j2));
        }
        return arrayList;
    }

    private Event<TKey, TValue> decorateDeadLetterEventWithMetadata(Event<TKey, TValue> event, SearchIndexBulkResult<?> searchIndexBulkResult) {
        return event.addHeaders(Stream.of(new Header("Dead-Letter-Reason", searchIndexBulkResult.getReason())));
    }

    public int batchesIndexed() {
        return this.batches;
    }

    public int idleTriggers() {
        return this.idleTriggers;
    }

    @Override // io.telicent.smart.cache.search.sinks.SearchIndexerSink
    public void close() {
        synchronized (this.items) {
            if (!this.items.isEmpty()) {
                bulkIndex();
            }
        }
        if (this.trigger != null) {
            this.trigger.cancel();
            this.executor.shutdownNow();
        }
        this.tracker.reportThroughput();
        this.tracker.reset();
        this.batches = 0;
        this.lastIndexedAt = System.currentTimeMillis();
        this.indexer.flush(true);
    }

    public static <TKey, TValue> Builder<TKey, TValue> createBulk() {
        return new Builder<>();
    }
}
