package io.telicent.smart.cache.search.sinks;

import io.telicent.smart.cache.observability.events.ComponentEventSource;
import io.telicent.smart.cache.observability.events.CounterEvent;
import io.telicent.smart.cache.observability.events.DurationEvent;
import io.telicent.smart.cache.observability.events.EventListener;
import io.telicent.smart.cache.observability.events.EventSourceSupport;
import io.telicent.smart.cache.observability.events.EventUtil;
import io.telicent.smart.cache.observability.events.MetricEvent;
import io.telicent.smart.cache.projectors.Sink;
import io.telicent.smart.cache.projectors.sinks.builder.SinkBuilder;
import io.telicent.smart.cache.projectors.utils.ThroughputTracker;
import io.telicent.smart.cache.search.SearchIndexer;
import io.telicent.smart.cache.sources.Event;
import io.telicent.smart.cache.sources.Header;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/telicent/smart/cache/search/sinks/SearchIndexerSink.class */
public class SearchIndexerSink<TKey, TValue> implements Sink<Event<TKey, TValue>>, ComponentEventSource<MetricEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SearchIndexerSink.class);
    protected final SearchIndexer<TValue> indexer;
    protected final Function<TValue, String> idProvider;
    protected final Function<Event<TKey, TValue>, Boolean> isDeletion;
    protected final IndexDeletionAction deleteAction;
    protected final ThroughputTracker tracker;
    protected final Sink<Event<TKey, TValue>> deadLetterSink;
    protected final EventSourceSupport<MetricEvent> eventSupport = new EventSourceSupport<>();

    /* loaded from: input_file:io/telicent/smart/cache/search/sinks/SearchIndexerSink$Builder.class */
    public static final class Builder<TKey, TValue> implements SinkBuilder<Event<TKey, TValue>, SearchIndexerSink<TKey, TValue>> {
        private SearchIndexer<TValue> indexer;
        private Function<TValue, String> idProvider;
        private Function<Event<TKey, TValue>, Boolean> isDeletion = event -> {
            return false;
        };
        private IndexDeletionAction onDelete = IndexDeletionAction.CONTENTS;
        private long reportBatchSize = 1000;
        private Sink<Event<TKey, TValue>> deadLetterSink;

        public Builder<TKey, TValue> indexer(SearchIndexer<TValue> searchIndexer) {
            this.indexer = searchIndexer;
            return this;
        }

        public Builder<TKey, TValue> idProvider(Function<TValue, String> function) {
            this.idProvider = function;
            return this;
        }

        public Builder<TKey, TValue> isDeletionWhen(Function<Event<TKey, TValue>, Boolean> function) {
            this.isDeletion = function;
            return this;
        }

        public Builder<TKey, TValue> noDeletes() {
            this.isDeletion = event -> {
                return false;
            };
            return this;
        }

        public Builder<TKey, TValue> onDeletion(IndexDeletionAction indexDeletionAction) {
            this.onDelete = indexDeletionAction;
            return this;
        }

        public Builder<TKey, TValue> reportBatchSize(long j) {
            this.reportBatchSize = j;
            return this;
        }

        public Builder<TKey, TValue> deadLetterSink(Sink<?> sink) {
            this.deadLetterSink = sink;
            return this;
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public SearchIndexerSink<TKey, TValue> m25build() {
            return new SearchIndexerSink<>(this.indexer, this.idProvider, this.isDeletion, this.onDelete, this.reportBatchSize, this.deadLetterSink);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SearchIndexerSink(SearchIndexer<TValue> searchIndexer, Function<TValue, String> function, Function<Event<TKey, TValue>, Boolean> function2, IndexDeletionAction indexDeletionAction, long j, Sink<Event<TKey, TValue>> sink) {
        Objects.requireNonNull(searchIndexer, "Search Indexer cannot be null");
        Objects.requireNonNull(function, "ID Provider function cannot be null");
        Objects.requireNonNull(function2, "Deletion detection function cannot be null");
        this.indexer = searchIndexer;
        this.idProvider = function;
        this.isDeletion = function2;
        this.deleteAction = indexDeletionAction;
        this.tracker = ThroughputTracker.create().logger(LOGGER).reportBatchSize(j).inSeconds().action("Indexed").itemsName("Documents").metricsLabel("indexed_documents").build();
        this.deadLetterSink = sink;
    }

    @Override // 
    public void send(Event<TKey, TValue> event) {
        this.tracker.itemReceived();
        EventUtil.emit(this.eventSupport, new MetricEvent[]{CounterEvent.counterEvent("search.indexer.sink.received")});
        long currentTimeMillis = System.currentTimeMillis();
        try {
            if (this.isDeletion.apply(event).booleanValue()) {
                switch (this.deleteAction) {
                    case DOCUMENT:
                        this.indexer.deleteDocument((String) this.idProvider.apply(event.value()));
                        break;
                    case CONTENTS:
                    default:
                        this.indexer.deleteContents(this.idProvider, event.value());
                        break;
                }
            } else {
                this.indexer.index(this.idProvider, event.value());
            }
            this.tracker.itemProcessed();
            EventUtil.emit(this.eventSupport, new MetricEvent[]{CounterEvent.counterEvent("search.indexer.sink.success"), DurationEvent.durationEvent("search.indexer.sink.success.duration", currentTimeMillis, System.currentTimeMillis())});
            if (event.source() != null) {
                event.source().processed(List.of(event));
            }
        } catch (RuntimeException e) {
            EventUtil.emit(this.eventSupport, new MetricEvent[]{CounterEvent.counterEvent("search.indexer.sink.failure"), DurationEvent.durationEvent("search.indexer.sink.failure.duration", currentTimeMillis, System.currentTimeMillis())});
            if (this.deadLetterSink == null) {
                throw e;
            }
            LOGGER.warn("Unable to send event [" + String.valueOf(event) + "] to indexer, and will be sent to dead letter sink: ", e);
            this.deadLetterSink.send(decorateDeadLetterEventWithMetadata(event, e));
        }
    }

    private Event<TKey, TValue> decorateDeadLetterEventWithMetadata(Event<TKey, TValue> event, Throwable th) {
        return event.addHeaders(Stream.of(new Header("Dead-Letter-Reason", th.getMessage())));
    }

    public void close() {
        this.tracker.reportThroughput();
        this.tracker.reset();
        this.indexer.flush(true);
    }

    public static <TKey, TValue> Builder<TKey, TValue> create() {
        return new Builder<>();
    }

    public void addListener(EventListener eventListener) {
        this.eventSupport.addListener(eventListener);
    }

    public boolean removeListener(EventListener eventListener) {
        return this.eventSupport.removeListener(eventListener);
    }
}
