package onextent.akka.eventhubs;

import akka.actor.ActorRef;
import akka.pattern.AskableActorRef$;
import akka.pattern.package$;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventPosition;
import java.util.concurrent.TimeoutException;
import onextent.akka.eventhubs.Connector;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.concurrent.Await$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: Eventhubs.scala */
/* loaded from: input_file:onextent/akka/eventhubs/Eventhubs$$anon$1.class */
public final class Eventhubs$$anon$1 extends GraphStageLogic {
    private final /* synthetic */ Eventhubs $outer;

    public /* synthetic */ Eventhubs onextent$akka$eventhubs$Eventhubs$$anon$$$outer() {
        return this.$outer;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public Eventhubs$$anon$1(Eventhubs eventhubs) {
        super(eventhubs.m11shape());
        if (eventhubs == null) {
            throw null;
        }
        this.$outer = eventhubs;
        setHandler(eventhubs.out(), new OutHandler(this) { // from class: onextent.akka.eventhubs.Eventhubs$$anon$1$$anon$2
            private long count;
            private final /* synthetic */ Eventhubs$$anon$1 $outer;

            public void onDownstreamFinish() throws Exception {
                OutHandler.class.onDownstreamFinish(this);
            }

            public long count() {
                return this.count;
            }

            public void count_$eq(long j) {
                this.count = j;
            }

            public void onPull() {
                BoxedUnit boxedUnit;
                BoxedUnit boxedUnit2;
                try {
                    if (this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().logger().underlying().isDebugEnabled()) {
                        this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().logger().underlying().debug("Pull");
                        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                    }
                    ActorRef ask = package$.MODULE$.ask(this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().connector());
                    Connector.Pull pull = new Connector.Pull();
                    Object result = Await$.MODULE$.result(AskableActorRef$.MODULE$.ask$extension1(ask, pull, this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().onextent$akka$eventhubs$Eventhubs$$eventHubConf.requestTimeout(), AskableActorRef$.MODULE$.ask$default$3$extension(ask, pull)), this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().onextent$akka$eventhubs$Eventhubs$$eventHubConf.requestDuration());
                    if (result instanceof Throwable) {
                        Throwable th = (Throwable) result;
                        if (this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().logger().underlying().isErrorEnabled()) {
                            this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().logger().underlying().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"pull request error for partition ", ". aborting..."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().onextent$akka$eventhubs$Eventhubs$$partitionId)})), th);
                            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
                        } else {
                            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
                        }
                        this.$outer.completeStage();
                        BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
                        return;
                    }
                    if (!(result instanceof Connector.Event)) {
                        if (this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().logger().underlying().isErrorEnabled()) {
                            this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().logger().underlying().error("I don't know how to handle success {}", new Object[]{result});
                            boxedUnit = BoxedUnit.UNIT;
                        } else {
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        return;
                    }
                    Connector.Event event = (Connector.Event) result;
                    ActorRef from = event.from();
                    int partitionId = event.partitionId();
                    EventData eventData = event.eventData();
                    String str = new String(eventData.getBytes());
                    String partitionKey = eventData.getSystemProperties().getPartitionKey();
                    eventData.getSystemProperties().getPartitionKey();
                    this.$outer.push(this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().out(), new Tuple2(str, new Connector.AckableOffset(new Connector.Ack(partitionId, EventPosition.fromOffset(eventData.getSystemProperties().getOffset()), (Map) ((TraversableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(eventData.getProperties()).asScala()).map(new Eventhubs$$anon$1$$anon$2$$anonfun$1(this), Map$.MODULE$.canBuildFrom()), partitionKey), from)));
                    count_$eq(count() + 1);
                    if (this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().logger().underlying().isDebugEnabled()) {
                        this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().logger().underlying().debug("key {} read from partition {}, count = {}", new Object[]{partitionKey, BoxesRunTime.boxToInteger(this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().onextent$akka$eventhubs$Eventhubs$$partitionId), BoxesRunTime.boxToLong(count())});
                        boxedUnit2 = BoxedUnit.UNIT;
                    } else {
                        boxedUnit2 = BoxedUnit.UNIT;
                    }
                } catch (TimeoutException e) {
                    if (this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().logger().underlying().isErrorEnabled()) {
                        this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().logger().underlying().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"pull request timeout for partition ", ". aborting..."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().onextent$akka$eventhubs$Eventhubs$$partitionId)})), e);
                        BoxedUnit boxedUnit8 = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit9 = BoxedUnit.UNIT;
                    }
                    this.$outer.completeStage();
                    Object orElse = scala.sys.package$.MODULE$.env().getOrElse("AKKA_EH_DIE_ON_ERROR", new Eventhubs$$anon$1$$anon$2$$anonfun$onPull$1(this));
                    if (orElse == null) {
                        if ("YES" != 0) {
                            return;
                        }
                    } else if (!orElse.equals("YES")) {
                        return;
                    }
                    System.exit(1);
                } catch (Throwable th2) {
                    if (this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().logger().underlying().isErrorEnabled()) {
                        this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().logger().underlying().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"pull request exception '", "' for partition ", ". restarting..."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{th2.getMessage(), BoxesRunTime.boxToInteger(this.$outer.onextent$akka$eventhubs$Eventhubs$$anon$$$outer().onextent$akka$eventhubs$Eventhubs$$partitionId)})), th2);
                        BoxedUnit boxedUnit10 = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit11 = BoxedUnit.UNIT;
                    }
                    this.$outer.completeStage();
                    Object orElse2 = scala.sys.package$.MODULE$.env().getOrElse("AKKA_EH_DIE_ON_ERROR", new Eventhubs$$anon$1$$anon$2$$anonfun$onPull$2(this));
                    if (orElse2 == null) {
                        if ("YES" != 0) {
                            return;
                        }
                    } else if (!orElse2.equals("YES")) {
                        return;
                    }
                    System.exit(1);
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.class.$init$(this);
                this.count = 0L;
            }
        });
    }
}
