package monix.connect.elasticsearch;

import com.sksamuel.elastic4s.CommonRequestOptions$;
import com.sksamuel.elastic4s.ElasticClient;
import com.sksamuel.elastic4s.ElasticDsl$;
import com.sksamuel.elastic4s.RequestFailure;
import com.sksamuel.elastic4s.RequestSuccess;
import com.sksamuel.elastic4s.Response;
import com.sksamuel.elastic4s.requests.searches.SearchHit;
import com.sksamuel.elastic4s.requests.searches.SearchRequest;
import com.sksamuel.elastic4s.requests.searches.SearchResponse;
import monix.eval.Task;
import monix.eval.Task$;
import monix.execution.Ack$Continue$;
import monix.execution.Ack$Stop$;
import monix.execution.Cancelable;
import monix.execution.internal.InternalApi;
import monix.reactive.Observable;
import monix.reactive.observers.Subscriber;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Some$;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.Future;
import scala.reflect.ManifestFactory$;
import scala.runtime.BoxedUnit;

/* compiled from: ElasticsearchSource.scala */
@InternalApi
/* loaded from: input_file:monix/connect/elasticsearch/ElasticsearchSource.class */
public class ElasticsearchSource extends Observable<SearchHit> {
    private final SearchRequest request;
    private final ElasticClient client;
    private Option<String> scrollId = None$.MODULE$;
    private final String keepAlive;

    public static ElasticsearchSource search(SearchRequest searchRequest, ElasticClient elasticClient) {
        return ElasticsearchSource$.MODULE$.search(searchRequest, elasticClient);
    }

    public ElasticsearchSource(SearchRequest searchRequest, ElasticClient elasticClient) {
        this.request = searchRequest;
        this.client = elasticClient;
        this.keepAlive = (String) searchRequest.keepAlive().getOrElse(ElasticsearchSource::$init$$$anonfun$1);
    }

    public Cancelable unsafeSubscribeFn(Subscriber<SearchHit> subscriber) {
        return fastLoop(Queue$.MODULE$.empty(), subscriber).runToFuture(subscriber.scheduler());
    }

    private Task<BoxedUnit> fastLoop(Queue<SearchHit> queue, Subscriber<SearchHit> subscriber) {
        return fetch(queue, subscriber).flatMap(boxedUnit -> {
            return queue.nonEmpty() ? Task$.MODULE$.deferFuture(() -> {
                return fastLoop$$anonfun$1$$anonfun$1(r1, r2);
            }) : Task$.MODULE$.now(Ack$Stop$.MODULE$);
        }).flatMap(ack -> {
            if (Ack$Continue$.MODULE$.equals(ack)) {
                return fastLoop(queue, subscriber);
            }
            if (!Ack$Stop$.MODULE$.equals(ack)) {
                throw new MatchError(ack);
            }
            subscriber.onComplete();
            return Task$.MODULE$.unit();
        });
    }

    private void populateHandler(Response<SearchResponse> response, Queue<SearchHit> queue, Subscriber<SearchHit> subscriber) {
        if (response instanceof RequestFailure) {
            subscriber.onError(((RequestFailure) response).error().asException());
            return;
        }
        if (!(response instanceof RequestSuccess)) {
            throw new MatchError(response);
        }
        SearchResponse searchResponse = (SearchResponse) ((RequestSuccess) response).result();
        Some scrollId = searchResponse.scrollId();
        if (None$.MODULE$.equals(scrollId)) {
            subscriber.onError(new RuntimeException("Search response did not include a scroll id"));
            return;
        }
        if (!(scrollId instanceof Some)) {
            throw new MatchError(scrollId);
        }
        this.scrollId = Some$.MODULE$.apply((String) scrollId.value());
        if (searchResponse.hits().hits().length == 0 && queue.isEmpty()) {
            subscriber.onComplete();
        } else {
            queue.$plus$plus$eq(Predef$.MODULE$.wrapRefArray(searchResponse.hits().hits()));
        }
    }

    private Task<BoxedUnit> fetch(Queue<SearchHit> queue, Subscriber<SearchHit> subscriber) {
        if (!queue.isEmpty()) {
            return Task$.MODULE$.unit();
        }
        Some some = this.scrollId;
        if (some instanceof Some) {
            return ((Task) this.client.execute(ElasticDsl$.MODULE$.searchScroll((String) some.value()).keepAlive(this.keepAlive), package$.MODULE$.taskExecutor(), package$.MODULE$.taskFunctor(), ElasticDsl$.MODULE$.SearchScrollHandler(), ManifestFactory$.MODULE$.classType(SearchResponse.class), CommonRequestOptions$.MODULE$.defaults())).map(response -> {
                populateHandler(response, queue, subscriber);
            });
        }
        if (None$.MODULE$.equals(some)) {
            return ((Task) this.client.execute(this.request.keepAlive(this.keepAlive), package$.MODULE$.taskExecutor(), package$.MODULE$.taskFunctor(), ElasticDsl$.MODULE$.SearchHandler(), ManifestFactory$.MODULE$.classType(SearchResponse.class), CommonRequestOptions$.MODULE$.defaults())).map(response2 -> {
                populateHandler(response2, queue, subscriber);
            });
        }
        throw new MatchError(some);
    }

    private static final String $init$$$anonfun$1() {
        return "1m";
    }

    private static final Future fastLoop$$anonfun$1$$anonfun$1(Queue queue, Subscriber subscriber) {
        return subscriber.onNext(queue.dequeue());
    }
}
