package com.github.takezoe.akka.stream.elasticsearch;

import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import com.github.takezoe.akka.stream.elasticsearch.ElasticsearchFlowStage;
import java.util.Map;
import org.apache.http.Header;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.collection.mutable.Queue;
import scala.concurrent.Future$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import spray.json.JsArray;
import spray.json.JsObject$;
import spray.json.JsString;
import spray.json.JsValue;
import spray.json.package$;

/* compiled from: ElasticsearchFlowStage.scala */
/* loaded from: input_file:com/github/takezoe/akka/stream/elasticsearch/ElasticsearchFlowStage$$anon$1.class */
public final class ElasticsearchFlowStage$$anon$1 extends GraphStageLogic implements ResponseListener, InHandler, OutHandler {
    private ElasticsearchFlowStage.State state;
    private final Queue<IncomingMessage<T>> queue;
    private final AsyncCallback<Throwable> failureHandler;
    private final AsyncCallback<Response> responseHandler;
    private final /* synthetic */ ElasticsearchFlowStage $outer;

    public void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    private ElasticsearchFlowStage.State state() {
        return this.state;
    }

    private void state_$eq(ElasticsearchFlowStage.State state) {
        this.state = state;
    }

    private Queue<IncomingMessage<T>> queue() {
        return this.queue;
    }

    private AsyncCallback<Throwable> failureHandler() {
        return this.failureHandler;
    }

    private AsyncCallback<Response> responseHandler() {
        return this.responseHandler;
    }

    public void preStart() {
        pull(this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$in());
    }

    private void tryPull() {
        if (queue().size() >= this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$settings.bufferSize() || isClosed(this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$in()) || hasBeenPulled(this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$in())) {
            return;
        }
        pull(this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$in());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFailure(Throwable th) {
        failStage(th);
    }

    private void handleSuccess() {
        completeStage();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleResponse(Response response) {
        Vector vector = (Vector) ((JsArray) package$.MODULE$.pimpString(EntityUtils.toString(response.getEntity())).parseJson().asJsObject().fields().apply("items")).elements().flatMap(jsValue -> {
            String value = ((JsString) ((JsValue) jsValue.asJsObject().fields().apply("index")).asJsObject().fields().apply("result")).value();
            if (value != null ? !value.equals("created") : "created" != 0) {
                if (value != null ? !value.equals("updated") : "updated" != 0) {
                    return Option$.MODULE$.option2Iterable(new Some(value));
                }
            }
            return Option$.MODULE$.option2Iterable(None$.MODULE$);
        }, Vector$.MODULE$.canBuildFrom());
        if (vector.nonEmpty()) {
            failStage(new IllegalStateException(vector.mkString("\n")));
        }
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$settings.bufferSize()).flatMap(obj -> {
            return $anonfun$handleResponse$2(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        if (indexedSeq.isEmpty()) {
            if (ElasticsearchFlowStage$Finished$.MODULE$.equals(state())) {
                handleSuccess();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                state_$eq(ElasticsearchFlowStage$Idle$.MODULE$);
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        } else {
            sendBulkUpdateRequest(indexedSeq);
        }
        push(this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$out(), Future$.MODULE$.successful(response));
    }

    public void onFailure(Exception exc) {
        failureHandler().invoke(exc);
    }

    public void onSuccess(Response response) {
        responseHandler().invoke(response);
    }

    private void sendBulkUpdateRequest(Seq<IncomingMessage<T>> seq) {
        this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$client.performRequestAsync("POST", "/_bulk", (Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Nil$.MODULE$)).asJava(), new StringEntity(((TraversableOnce) seq.map(incomingMessage -> {
            return JsObject$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("index"), JsObject$.MODULE$.apply(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Option[]{Option$.MODULE$.apply(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("_index"), new JsString(this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$indexName))), Option$.MODULE$.apply(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("_type"), new JsString(this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$typeName))), incomingMessage.id().map(str -> {
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("_id"), new JsString(str));
            })})).flatten(option -> {
                return Option$.MODULE$.option2Iterable(option);
            })))})).toString() + "\n" + this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$writer.convert(incomingMessage.source());
        }, Seq$.MODULE$.canBuildFrom())).mkString("", "\n", "\n")), this, new Header[]{new BasicHeader("Content-Type", "application/x-ndjson")});
    }

    public void onPull() {
        tryPull();
    }

    public void onPush() {
        queue().enqueue(Predef$.MODULE$.wrapRefArray(new IncomingMessage[]{(IncomingMessage) grab(this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$in())}));
        if (ElasticsearchFlowStage$Idle$.MODULE$.equals(state())) {
            state_$eq(ElasticsearchFlowStage$Sending$.MODULE$);
            sendBulkUpdateRequest((IndexedSeq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), this.$outer.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$settings.bufferSize()).flatMap(obj -> {
                return $anonfun$onPush$1(this, BoxesRunTime.unboxToInt(obj));
            }, IndexedSeq$.MODULE$.canBuildFrom()));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        tryPull();
    }

    public void onUpstreamFailure(Throwable th) {
        handleFailure(th);
    }

    public void onUpstreamFinish() {
        ElasticsearchFlowStage.State state = state();
        if (ElasticsearchFlowStage$Idle$.MODULE$.equals(state)) {
            handleSuccess();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (ElasticsearchFlowStage$Sending$.MODULE$.equals(state)) {
            state_$eq(ElasticsearchFlowStage$Finished$.MODULE$);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            if (!ElasticsearchFlowStage$Finished$.MODULE$.equals(state)) {
                throw new MatchError(state);
            }
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ boolean $anonfun$handleResponse$3(IncomingMessage incomingMessage) {
        return true;
    }

    public static final /* synthetic */ Iterable $anonfun$handleResponse$2(ElasticsearchFlowStage$$anon$1 elasticsearchFlowStage$$anon$1, int i) {
        return Option$.MODULE$.option2Iterable(elasticsearchFlowStage$$anon$1.queue().dequeueFirst(incomingMessage -> {
            return BoxesRunTime.boxToBoolean($anonfun$handleResponse$3(incomingMessage));
        }));
    }

    public static final /* synthetic */ boolean $anonfun$onPush$2(IncomingMessage incomingMessage) {
        return true;
    }

    public static final /* synthetic */ Iterable $anonfun$onPush$1(ElasticsearchFlowStage$$anon$1 elasticsearchFlowStage$$anon$1, int i) {
        return Option$.MODULE$.option2Iterable(elasticsearchFlowStage$$anon$1.queue().dequeueFirst(incomingMessage -> {
            return BoxesRunTime.boxToBoolean($anonfun$onPush$2(incomingMessage));
        }));
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ElasticsearchFlowStage$$anon$1(ElasticsearchFlowStage<T> elasticsearchFlowStage) {
        super(elasticsearchFlowStage.m0shape());
        if (elasticsearchFlowStage == 0) {
            throw null;
        }
        this.$outer = elasticsearchFlowStage;
        InHandler.$init$(this);
        OutHandler.$init$(this);
        this.state = ElasticsearchFlowStage$Idle$.MODULE$;
        this.queue = new Queue<>();
        this.failureHandler = getAsyncCallback(th -> {
            this.handleFailure(th);
            return BoxedUnit.UNIT;
        });
        this.responseHandler = getAsyncCallback(response -> {
            this.handleResponse(response);
            return BoxedUnit.UNIT;
        });
        setHandlers(elasticsearchFlowStage.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$in(), elasticsearchFlowStage.com$github$takezoe$akka$stream$elasticsearch$ElasticsearchFlowStage$$out(), this);
    }
}
