package org.springframework.data.elasticsearch.core;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.dao.DataAccessException;
import org.springframework.data.convert.EntityReader;
import org.springframework.data.elasticsearch.BulkFailureException;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.EntityOperations;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocument;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.event.ReactiveAfterConvertCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.support.VersionInfo;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.http.HttpStatus;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

/* loaded from: input_file:org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.class */
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware {
    private static final Logger QUERY_LOGGER = LoggerFactory.getLogger("org.springframework.data.elasticsearch.core.QUERY");
    private final ReactiveElasticsearchClient client;
    private final ElasticsearchConverter converter;
    private final MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> mappingContext;
    private final ElasticsearchExceptionTranslator exceptionTranslator;
    private final EntityOperations operations;
    protected RequestFactory requestFactory;

    @Nullable
    private WriteRequest.RefreshPolicy refreshPolicy;

    @Nullable
    private IndicesOptions indicesOptions;

    @Nullable
    private ReactiveEntityCallbacks entityCallbacks;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate$DocumentCallback.class */
    public interface DocumentCallback<T> {
        @NonNull
        Mono<T> toEntity(@Nullable Document document);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate$Entities.class */
    public class Entities<T> {
        private final List<T> entities;

        private Entities(List<T> list) {
            Assert.notNull(list, "entities cannot be null");
            this.entities = list;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isEmpty() {
            return this.entities.isEmpty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<IndexQuery> indexQueries() {
            Stream<T> stream = this.entities.stream();
            ReactiveElasticsearchTemplate reactiveElasticsearchTemplate = ReactiveElasticsearchTemplate.this;
            return (List) stream.map(obj -> {
                return reactiveElasticsearchTemplate.getIndexQuery(obj);
            }).collect(Collectors.toList());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public T entityAt(long j) {
            return this.entities.get((int) j);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate$ReadDocumentCallback.class */
    public class ReadDocumentCallback<T> implements DocumentCallback<T> {
        private final EntityReader<? super T, Document> reader;
        private final Class<T> type;
        private final IndexCoordinates index;

        public ReadDocumentCallback(EntityReader<? super T, Document> entityReader, Class<T> cls, IndexCoordinates indexCoordinates) {
            Assert.notNull(entityReader, "reader is null");
            Assert.notNull(cls, "type is null");
            this.reader = entityReader;
            this.type = cls;
            this.index = indexCoordinates;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate.DocumentCallback
        @NonNull
        public Mono<T> toEntity(@Nullable Document document) {
            if (document == null) {
                return Mono.empty();
            }
            return ReactiveElasticsearchTemplate.this.maybeCallAfterConvert(this.reader.read(this.type, document), document, this.index);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate$ReadSearchDocumentCallback.class */
    public class ReadSearchDocumentCallback<T> implements SearchDocumentCallback<T> {
        private final DocumentCallback<T> delegate;
        private final Class<T> type;

        public ReadSearchDocumentCallback(Class<T> cls, IndexCoordinates indexCoordinates) {
            Assert.notNull(cls, "type is null");
            this.delegate = new ReadDocumentCallback(ReactiveElasticsearchTemplate.this.converter, cls, indexCoordinates);
            this.type = cls;
        }

        @Override // org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate.SearchDocumentCallback
        public Mono<T> toEntity(SearchDocument searchDocument) {
            return this.delegate.toEntity(searchDocument);
        }

        @Override // org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate.SearchDocumentCallback
        public Mono<SearchHit<T>> toSearchHit(SearchDocument searchDocument) {
            return toEntity(searchDocument).map(obj -> {
                return SearchHitMapping.mappingFor(this.type, ReactiveElasticsearchTemplate.this.converter).mapHit(searchDocument, obj);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate$SearchDocumentCallback.class */
    public interface SearchDocumentCallback<T> {
        @NonNull
        Mono<T> toEntity(@NonNull SearchDocument searchDocument);

        @NonNull
        Mono<SearchHit<T>> toSearchHit(@NonNull SearchDocument searchDocument);
    }

    public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient reactiveElasticsearchClient) {
        this(reactiveElasticsearchClient, new MappingElasticsearchConverter(new SimpleElasticsearchMappingContext()));
    }

    public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient reactiveElasticsearchClient, ElasticsearchConverter elasticsearchConverter) {
        this.refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE;
        this.indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled();
        Assert.notNull(reactiveElasticsearchClient, "client must not be null");
        Assert.notNull(elasticsearchConverter, "converter must not be null");
        this.client = reactiveElasticsearchClient;
        this.converter = elasticsearchConverter;
        this.mappingContext = elasticsearchConverter.getMappingContext();
        this.exceptionTranslator = new ElasticsearchExceptionTranslator();
        this.operations = new EntityOperations(this.mappingContext);
        this.requestFactory = new RequestFactory(elasticsearchConverter);
        logVersions();
    }

    private void logVersions() {
        getClusterVersion().doOnSuccess(VersionInfo::logVersions).doOnError(th -> {
            VersionInfo.logVersions(null);
        }).subscribe();
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (this.entityCallbacks == null) {
            setEntityCallbacks(ReactiveEntityCallbacks.create(applicationContext));
        }
    }

    public void setRefreshPolicy(@Nullable WriteRequest.RefreshPolicy refreshPolicy) {
        this.refreshPolicy = refreshPolicy;
    }

    @Nullable
    public WriteRequest.RefreshPolicy getRefreshPolicy() {
        return this.refreshPolicy;
    }

    public void setIndicesOptions(@Nullable IndicesOptions indicesOptions) {
        this.indicesOptions = indicesOptions;
    }

    public void setEntityCallbacks(ReactiveEntityCallbacks reactiveEntityCallbacks) {
        Assert.notNull(reactiveEntityCallbacks, "EntityCallbacks must not be null!");
        this.entityCallbacks = reactiveEntityCallbacks;
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public <T> Mono<T> save(T t, IndexCoordinates indexCoordinates) {
        Assert.notNull(t, "Entity must not be null!");
        return maybeCallBeforeConvert(t, indexCoordinates).flatMap(obj -> {
            return doIndex(obj, indexCoordinates);
        }).map(tuple2 -> {
            Object t1 = tuple2.getT1();
            IndexResponse indexResponse = (IndexResponse) tuple2.getT2();
            return updateIndexedObject(t1, IndexedObjectInformation.of(indexResponse.getId(), Long.valueOf(indexResponse.getSeqNo()), Long.valueOf(indexResponse.getPrimaryTerm()), Long.valueOf(indexResponse.getVersion())));
        }).flatMap(obj2 -> {
            return maybeCallAfterSave(obj2, indexCoordinates);
        });
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public <T> Mono<T> save(T t) {
        return save((ReactiveElasticsearchTemplate) t, getIndexCoordinatesFor(t.getClass()));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> mono, Class<T> cls) {
        return saveAll(mono, getIndexCoordinatesFor(cls));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> mono, IndexCoordinates indexCoordinates) {
        Assert.notNull(mono, "Entities must not be null!");
        return mono.flatMapMany(collection -> {
            return Flux.fromIterable(collection).concatMap(obj -> {
                return maybeCallBeforeConvert(obj, indexCoordinates);
            });
        }).collectList().map(list -> {
            return new Entities(list);
        }).flatMapMany(entities -> {
            return entities.isEmpty() ? Flux.empty() : doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), indexCoordinates).index().flatMap(tuple2 -> {
                Object entityAt = entities.entityAt(((Long) tuple2.getT1()).longValue());
                DocWriteResponse response = ((BulkItemResponse) tuple2.getT2()).getResponse();
                updateIndexedObject(entityAt, IndexedObjectInformation.of(response.getId(), Long.valueOf(response.getSeqNo()), Long.valueOf(response.getPrimaryTerm()), Long.valueOf(response.getVersion())));
                return maybeCallAfterSave(entityAt, indexCoordinates);
            });
        });
    }

    private <T> T updateIndexedObject(T t, IndexedObjectInformation indexedObjectInformation) {
        this.operations.forEntity(t, this.converter.getConversionService()).populateIdIfNecessary(indexedObjectInformation.getId());
        ElasticsearchPersistentEntity<?> requiredPersistentEntity = getRequiredPersistentEntity(t.getClass());
        PersistentPropertyAccessor propertyAccessor = requiredPersistentEntity.getPropertyAccessor(t);
        if (indexedObjectInformation.getSeqNo() != null && indexedObjectInformation.getPrimaryTerm() != null && requiredPersistentEntity.hasSeqNoPrimaryTermProperty()) {
            propertyAccessor.setProperty(requiredPersistentEntity.getSeqNoPrimaryTermProperty(), new SeqNoPrimaryTerm(indexedObjectInformation.getSeqNo().longValue(), indexedObjectInformation.getPrimaryTerm().longValue()));
        }
        if (indexedObjectInformation.getVersion() != null && requiredPersistentEntity.hasVersionProperty()) {
            propertyAccessor.setProperty(requiredPersistentEntity.m76getVersionProperty(), indexedObjectInformation.getVersion());
        }
        return t;
    }

    private ElasticsearchPersistentEntity<?> getRequiredPersistentEntity(Class<?> cls) {
        return (ElasticsearchPersistentEntity) this.converter.getMappingContext().getRequiredPersistentEntity(cls);
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public <T> Flux<T> multiGet(Query query, Class<T> cls) {
        return multiGet(query, cls, getIndexCoordinatesFor(cls));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public <T> Flux<T> multiGet(Query query, Class<T> cls, IndexCoordinates indexCoordinates) {
        Assert.notNull(indexCoordinates, "Index must not be null");
        Assert.notNull(cls, "Class must not be null");
        Assert.notNull(query, "Query must not be null");
        Assert.notEmpty(query.getIds(), "No Id define for Query");
        ReadDocumentCallback readDocumentCallback = new ReadDocumentCallback(this.converter, cls, indexCoordinates);
        MultiGetRequest multiGetRequest = this.requestFactory.multiGetRequest(query, cls, indexCoordinates);
        return Flux.from(execute(reactiveElasticsearchClient -> {
            return reactiveElasticsearchClient.multiGet(multiGetRequest);
        })).concatMap(getResult -> {
            return readDocumentCallback.toEntity(DocumentAdapters.from(getResult));
        });
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public Mono<Void> bulkUpdate(List<UpdateQuery> list, BulkOptions bulkOptions, IndexCoordinates indexCoordinates) {
        Assert.notNull(list, "List of UpdateQuery must not be null");
        Assert.notNull(bulkOptions, "BulkOptions must not be null");
        Assert.notNull(indexCoordinates, "Index must not be null");
        return doBulkOperation(list, bulkOptions, indexCoordinates).then();
    }

    protected Mono<IndexResponse> doIndex(IndexRequest indexRequest) {
        return Mono.from(execute(reactiveElasticsearchClient -> {
            return reactiveElasticsearchClient.index(indexRequest);
        }));
    }

    protected Flux<BulkItemResponse> doBulkOperation(List<?> list, BulkOptions bulkOptions, IndexCoordinates indexCoordinates) {
        BulkRequest prepareWriteRequest = prepareWriteRequest(this.requestFactory.bulkRequest(list, bulkOptions, indexCoordinates));
        return this.client.bulk(prepareWriteRequest).onErrorMap(th -> {
            return new UncategorizedElasticsearchException("Error while bulk for request: " + prepareWriteRequest.toString(), th);
        }).flatMap(this::checkForBulkOperationFailure).flatMapMany(bulkResponse -> {
            return Flux.fromArray(bulkResponse.getItems());
        });
    }

    protected Mono<BulkResponse> checkForBulkOperationFailure(BulkResponse bulkResponse) {
        if (!bulkResponse.hasFailures()) {
            return Mono.just(bulkResponse);
        }
        HashMap hashMap = new HashMap();
        for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
            if (bulkItemResponse.isFailed()) {
                hashMap.put(bulkItemResponse.getId(), bulkItemResponse.getFailureMessage());
            }
        }
        return Mono.error(new BulkFailureException("Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" + hashMap + ']', hashMap));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public Mono<Boolean> exists(String str, Class<?> cls) {
        return doExists(str, getIndexCoordinatesFor(cls));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public Mono<Boolean> exists(String str, IndexCoordinates indexCoordinates) {
        return doExists(str, indexCoordinates);
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public Mono<Boolean> exists(String str, Class<?> cls, IndexCoordinates indexCoordinates) {
        Assert.notNull(str, "Id must not be null!");
        return doExists(str, indexCoordinates);
    }

    private Mono<Boolean> doExists(String str, IndexCoordinates indexCoordinates) {
        return Mono.defer(() -> {
            return doExists(this.requestFactory.getRequest(str, indexCoordinates));
        });
    }

    protected Mono<Boolean> doExists(GetRequest getRequest) {
        return Mono.from(execute(reactiveElasticsearchClient -> {
            return reactiveElasticsearchClient.exists(getRequest);
        })).onErrorReturn(NoSuchIndexException.class, false);
    }

    private <T> Mono<Tuple2<T, IndexResponse>> doIndex(T t, IndexCoordinates indexCoordinates) {
        return Mono.just(t).zipWith(doIndex(prepareIndexRequest(t, this.requestFactory.indexRequest(getIndexQuery(t), indexCoordinates))));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public IndexQuery getIndexQuery(Object obj) {
        Number version;
        SeqNoPrimaryTerm seqNoPrimaryTerm;
        EntityOperations.AdaptibleEntity forEntity = this.operations.forEntity(obj, this.converter.getConversionService());
        Object id = forEntity.getId();
        IndexQuery indexQuery = new IndexQuery();
        if (id != null) {
            indexQuery.setId(id.toString());
        }
        indexQuery.setObject(obj);
        boolean z = false;
        if (forEntity.hasSeqNoPrimaryTerm() && (seqNoPrimaryTerm = forEntity.getSeqNoPrimaryTerm()) != null) {
            indexQuery.setSeqNo(Long.valueOf(seqNoPrimaryTerm.getSequenceNumber()));
            indexQuery.setPrimaryTerm(Long.valueOf(seqNoPrimaryTerm.getPrimaryTerm()));
            z = true;
        }
        if (!z && forEntity.isVersionedEntity() && (version = forEntity.getVersion()) != null) {
            indexQuery.setVersion(Long.valueOf(version.longValue()));
        }
        indexQuery.setRouting(forEntity.getRouting());
        return indexQuery;
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public <T> Mono<T> get(String str, Class<T> cls) {
        return get(str, cls, getIndexCoordinatesFor(cls));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public <T> Mono<T> get(String str, Class<T> cls, IndexCoordinates indexCoordinates) {
        Assert.notNull(str, "Id must not be null!");
        ReadDocumentCallback readDocumentCallback = new ReadDocumentCallback(this.converter, cls, indexCoordinates);
        return doGet(str, indexCoordinates).flatMap(getResult -> {
            return readDocumentCallback.toEntity(DocumentAdapters.from(getResult));
        });
    }

    private Mono<GetResult> doGet(String str, IndexCoordinates indexCoordinates) {
        return Mono.defer(() -> {
            return doGet(this.requestFactory.getRequest(str, indexCoordinates));
        });
    }

    protected Mono<GetResult> doGet(GetRequest getRequest) {
        return Mono.from(execute(reactiveElasticsearchClient -> {
            return reactiveElasticsearchClient.get(getRequest);
        })).onErrorResume(NoSuchIndexException.class, noSuchIndexException -> {
            return Mono.empty();
        });
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public Mono<String> delete(Object obj, IndexCoordinates indexCoordinates) {
        EntityOperations.AdaptibleEntity forEntity = this.operations.forEntity(obj, this.converter.getConversionService());
        return forEntity.getId() == null ? Mono.error(new IllegalArgumentException("entity must have an id")) : Mono.defer(() -> {
            return doDeleteById(this.converter.convertId(forEntity.getId()), forEntity.getRouting(), indexCoordinates);
        });
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public Mono<String> delete(Object obj) {
        return delete(obj, getIndexCoordinatesFor(obj.getClass()));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public Mono<String> delete(String str, Class<?> cls) {
        Assert.notNull(str, "id must not be null");
        Assert.notNull(cls, "entityType must not be null");
        return delete(str, getIndexCoordinatesFor(cls));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public Mono<String> delete(String str, IndexCoordinates indexCoordinates) {
        Assert.notNull(str, "id must not be null");
        Assert.notNull(indexCoordinates, "index must not be null");
        return doDeleteById(str, null, indexCoordinates);
    }

    private Mono<String> doDeleteById(String str, @Nullable String str2, IndexCoordinates indexCoordinates) {
        return Mono.defer(() -> {
            return doDelete(prepareDeleteRequest(this.requestFactory.deleteRequest(str, str2, indexCoordinates)));
        });
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public Mono<Long> delete(Query query, Class<?> cls, IndexCoordinates indexCoordinates) {
        Assert.notNull(query, "Query must not be null!");
        return doDeleteBy(query, cls, indexCoordinates).map((v0) -> {
            return v0.getDeleted();
        }).next();
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates indexCoordinates) {
        Assert.notNull(updateQuery, "UpdateQuery must not be null");
        Assert.notNull(indexCoordinates, "Index must not be null");
        return Mono.defer(() -> {
            UpdateRequest updateRequest = this.requestFactory.updateRequest(updateQuery, indexCoordinates);
            return Mono.from(execute(reactiveElasticsearchClient -> {
                return reactiveElasticsearchClient.update(updateRequest);
            })).map(updateResponse -> {
                return new UpdateResponse(UpdateResponse.Result.valueOf(updateResponse.getResult().name()));
            });
        });
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveDocumentOperations
    public Mono<Long> delete(Query query, Class<?> cls) {
        return delete(query, cls, getIndexCoordinatesFor(cls));
    }

    private Flux<BulkByScrollResponse> doDeleteBy(Query query, Class<?> cls, IndexCoordinates indexCoordinates) {
        return Flux.defer(() -> {
            return doDeleteBy(prepareDeleteByRequest(this.requestFactory.deleteByQueryRequest(query, cls, indexCoordinates)));
        });
    }

    protected Mono<String> doDelete(DeleteRequest deleteRequest) {
        return Mono.from(execute(reactiveElasticsearchClient -> {
            return reactiveElasticsearchClient.delete(deleteRequest);
        })).flatMap(deleteResponse -> {
            return HttpStatus.valueOf(deleteResponse.status().getStatus()).equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.just(deleteResponse.getId());
        }).onErrorResume(NoSuchIndexException.class, noSuchIndexException -> {
            return Mono.empty();
        });
    }

    protected Mono<BulkByScrollResponse> doDeleteBy(DeleteByQueryRequest deleteByQueryRequest) {
        return Mono.from(execute(reactiveElasticsearchClient -> {
            return reactiveElasticsearchClient.deleteBy(deleteByQueryRequest);
        })).onErrorResume(NoSuchIndexException.class, noSuchIndexException -> {
            return Mono.empty();
        });
    }

    protected DeleteRequest prepareDeleteRequest(DeleteRequest deleteRequest) {
        return prepareWriteRequest(deleteRequest);
    }

    protected DeleteByQueryRequest prepareDeleteByRequest(DeleteByQueryRequest deleteByQueryRequest) {
        if (this.refreshPolicy != null) {
            deleteByQueryRequest = WriteRequest.RefreshPolicy.NONE.equals(this.refreshPolicy) ? (DeleteByQueryRequest) deleteByQueryRequest.setRefresh(false) : (DeleteByQueryRequest) deleteByQueryRequest.setRefresh(true);
        }
        if (this.indicesOptions != null) {
            deleteByQueryRequest = deleteByQueryRequest.setIndicesOptions(this.indicesOptions);
        }
        return deleteByQueryRequest;
    }

    protected IndexRequest prepareIndexRequest(Object obj, IndexRequest indexRequest) {
        return prepareWriteRequest(indexRequest);
    }

    protected <R extends WriteRequest<R>> R prepareWriteRequest(R r) {
        return this.refreshPolicy == null ? r : (R) r.setRefreshPolicy(this.refreshPolicy);
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveSearchOperations
    public <T> Flux<SearchHit<T>> search(Query query, Class<?> cls, Class<T> cls2, IndexCoordinates indexCoordinates) {
        ReadSearchDocumentCallback readSearchDocumentCallback = new ReadSearchDocumentCallback(cls2, indexCoordinates);
        Flux<SearchDocument> doFind = doFind(query, cls, indexCoordinates);
        readSearchDocumentCallback.getClass();
        return doFind.concatMap(readSearchDocumentCallback::toSearchHit);
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveSearchOperations
    public <T> Flux<SearchHit<T>> search(Query query, Class<?> cls, Class<T> cls2) {
        return search(query, cls, cls2, getIndexCoordinatesFor(cls));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveSearchOperations
    public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> cls, Class<T> cls2) {
        return searchForPage(query, cls, cls2, getIndexCoordinatesFor(cls));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveSearchOperations
    public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> cls, Class<T> cls2, IndexCoordinates indexCoordinates) {
        ReadSearchDocumentCallback readSearchDocumentCallback = new ReadSearchDocumentCallback(cls2, indexCoordinates);
        return doFindForResponse(query, cls, indexCoordinates).flatMap(searchDocumentResponse -> {
            Flux fromIterable = Flux.fromIterable(searchDocumentResponse.getSearchDocuments());
            readSearchDocumentCallback.getClass();
            return fromIterable.flatMap(readSearchDocumentCallback::toEntity).collectList().map(list -> {
                return SearchHitMapping.mappingFor(cls2, this.converter).mapHits(searchDocumentResponse, list);
            });
        }).map(searchHits -> {
            return SearchHitSupport.searchPageFor(searchHits, query.getPageable());
        });
    }

    private Flux<SearchDocument> doFind(Query query, Class<?> cls, IndexCoordinates indexCoordinates) {
        return Flux.defer(() -> {
            SearchRequest prepareSearchRequest = prepareSearchRequest(this.requestFactory.searchRequest(query, cls, indexCoordinates));
            return (query.getPageable().isPaged() || query.isLimiting()) ? doFind(prepareSearchRequest) : doScroll(prepareSearchRequest);
        });
    }

    private Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> cls, IndexCoordinates indexCoordinates) {
        return Mono.defer(() -> {
            return doFindForResponse(prepareSearchRequest(this.requestFactory.searchRequest(query, cls, indexCoordinates)));
        });
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveSearchOperations
    public Flux<Aggregation> aggregate(Query query, Class<?> cls) {
        return aggregate(query, cls, getIndexCoordinatesFor(cls));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveSearchOperations
    public Flux<Aggregation> aggregate(Query query, Class<?> cls, IndexCoordinates indexCoordinates) {
        return doAggregate(query, cls, indexCoordinates);
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveSearchOperations
    public Flux<Suggest> suggest(SuggestBuilder suggestBuilder, Class<?> cls) {
        return doSuggest(suggestBuilder, getIndexCoordinatesFor(cls));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveSearchOperations
    public Flux<Suggest> suggest(SuggestBuilder suggestBuilder, IndexCoordinates indexCoordinates) {
        return doSuggest(suggestBuilder, indexCoordinates);
    }

    private Flux<Suggest> doSuggest(SuggestBuilder suggestBuilder, IndexCoordinates indexCoordinates) {
        return Flux.defer(() -> {
            SearchRequest searchRequest = this.requestFactory.searchRequest(suggestBuilder, indexCoordinates);
            return Flux.from(execute(reactiveElasticsearchClient -> {
                return reactiveElasticsearchClient.suggest(searchRequest);
            }));
        });
    }

    private Flux<Aggregation> doAggregate(Query query, Class<?> cls, IndexCoordinates indexCoordinates) {
        return Flux.defer(() -> {
            return doAggregate(prepareSearchRequest(this.requestFactory.searchRequest(query, cls, indexCoordinates)));
        });
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveSearchOperations
    public Mono<Long> count(Query query, Class<?> cls) {
        return count(query, cls, getIndexCoordinatesFor(cls));
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveSearchOperations
    public Mono<Long> count(Query query, Class<?> cls, IndexCoordinates indexCoordinates) {
        return doCount(query, cls, indexCoordinates);
    }

    private Mono<Long> doCount(Query query, Class<?> cls, IndexCoordinates indexCoordinates) {
        return Mono.defer(() -> {
            return doCount(prepareSearchRequest(this.requestFactory.searchRequest(query, cls, indexCoordinates)));
        });
    }

    protected Flux<SearchDocument> doFind(SearchRequest searchRequest) {
        if (QUERY_LOGGER.isDebugEnabled()) {
            QUERY_LOGGER.debug("Executing doFind: {}", searchRequest);
        }
        return Flux.from(execute(reactiveElasticsearchClient -> {
            return reactiveElasticsearchClient.search(searchRequest);
        })).map(DocumentAdapters::from).onErrorResume(NoSuchIndexException.class, noSuchIndexException -> {
            return Mono.empty();
        });
    }

    protected Mono<SearchDocumentResponse> doFindForResponse(SearchRequest searchRequest) {
        if (QUERY_LOGGER.isDebugEnabled()) {
            QUERY_LOGGER.debug("Executing doFindForResponse: {}", searchRequest);
        }
        return Mono.from(execute(reactiveElasticsearchClient -> {
            return reactiveElasticsearchClient.searchForResponse(searchRequest);
        })).map(SearchDocumentResponse::from);
    }

    protected Flux<Aggregation> doAggregate(SearchRequest searchRequest) {
        if (QUERY_LOGGER.isDebugEnabled()) {
            QUERY_LOGGER.debug("Executing doCount: {}", searchRequest);
        }
        return Flux.from(execute(reactiveElasticsearchClient -> {
            return reactiveElasticsearchClient.aggregate(searchRequest);
        })).onErrorResume(NoSuchIndexException.class, noSuchIndexException -> {
            return Flux.empty();
        });
    }

    protected Mono<Long> doCount(SearchRequest searchRequest) {
        if (QUERY_LOGGER.isDebugEnabled()) {
            QUERY_LOGGER.debug("Executing doCount: {}", searchRequest);
        }
        return Mono.from(execute(reactiveElasticsearchClient -> {
            return reactiveElasticsearchClient.count(searchRequest);
        })).onErrorResume(NoSuchIndexException.class, noSuchIndexException -> {
            return Mono.just(0L);
        });
    }

    protected Flux<SearchDocument> doScroll(SearchRequest searchRequest) {
        if (QUERY_LOGGER.isDebugEnabled()) {
            QUERY_LOGGER.debug("Executing doScroll: {}", searchRequest);
        }
        return Flux.from(execute(reactiveElasticsearchClient -> {
            return reactiveElasticsearchClient.scroll(searchRequest);
        })).map(DocumentAdapters::from).onErrorResume(NoSuchIndexException.class, noSuchIndexException -> {
            return Mono.empty();
        });
    }

    protected SearchRequest prepareSearchRequest(SearchRequest searchRequest) {
        return this.indicesOptions == null ? searchRequest : searchRequest.indicesOptions(this.indicesOptions);
    }

    protected Mono<String> getClusterVersion() {
        try {
            return Mono.from(execute(reactiveElasticsearchClient -> {
                return reactiveElasticsearchClient.info();
            })).map(mainResponse -> {
                return mainResponse.getVersion().toString();
            });
        } catch (Exception e) {
            return Mono.empty();
        }
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations
    public <T> Publisher<T> execute(ReactiveElasticsearchOperations.ClientCallback<Publisher<T>> clientCallback) {
        return Flux.defer(() -> {
            return clientCallback.doWithClient(getClient());
        }).onErrorMap(this::translateException);
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations
    public <T> Publisher<T> executeWithIndicesClient(ReactiveElasticsearchOperations.IndicesClientCallback<Publisher<T>> indicesClientCallback) {
        return Flux.defer(() -> {
            return indicesClientCallback.doWithClient(getIndicesClient());
        }).onErrorMap(this::translateException);
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations
    public ElasticsearchConverter getElasticsearchConverter() {
        return this.converter;
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations
    public ReactiveIndexOperations indexOps(IndexCoordinates indexCoordinates) {
        return new DefaultReactiveIndexOperations(this, indexCoordinates);
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations
    public ReactiveIndexOperations indexOps(Class<?> cls) {
        return new DefaultReactiveIndexOperations(this, cls);
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations
    public IndexCoordinates getIndexCoordinatesFor(Class<?> cls) {
        return getPersistentEntityFor(cls).getIndexCoordinates();
    }

    @Override // org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations
    @Nullable
    public ElasticsearchPersistentEntity<?> getPersistentEntityFor(@Nullable Class<?> cls) {
        if (cls != null) {
            return (ElasticsearchPersistentEntity) this.mappingContext.getPersistentEntity(cls);
        }
        return null;
    }

    protected ReactiveElasticsearchClient getClient() {
        return this.client;
    }

    protected ReactiveElasticsearchClient.Indices getIndicesClient() {
        if (this.client instanceof ReactiveElasticsearchClient.Indices) {
            return (ReactiveElasticsearchClient.Indices) this.client;
        }
        throw new UncategorizedElasticsearchException("No ReactiveElasticsearchClient.Indices implementation available");
    }

    private RuntimeException translateException(Throwable th) {
        RuntimeException runtimeException = th instanceof RuntimeException ? (RuntimeException) th : new RuntimeException(th.getMessage(), th);
        DataAccessException translateExceptionIfPossible = this.exceptionTranslator.translateExceptionIfPossible(runtimeException);
        return translateExceptionIfPossible != null ? translateExceptionIfPossible : runtimeException;
    }

    protected <T> Mono<T> maybeCallBeforeConvert(T t, IndexCoordinates indexCoordinates) {
        return null != this.entityCallbacks ? this.entityCallbacks.callback(ReactiveBeforeConvertCallback.class, t, new Object[]{indexCoordinates}) : Mono.just(t);
    }

    protected <T> Mono<T> maybeCallAfterSave(T t, IndexCoordinates indexCoordinates) {
        return null != this.entityCallbacks ? this.entityCallbacks.callback(ReactiveAfterSaveCallback.class, t, new Object[]{indexCoordinates}) : Mono.just(t);
    }

    protected <T> Mono<T> maybeCallAfterConvert(T t, Document document, IndexCoordinates indexCoordinates) {
        return null != this.entityCallbacks ? this.entityCallbacks.callback(ReactiveAfterConvertCallback.class, t, new Object[]{document, indexCoordinates}) : Mono.just(t);
    }
}
