package onextent.akka.eventhubs;

import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.SupervisorStrategy;
import akka.annotation.InternalApi;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.time.Duration;
import java.util.Iterator;
import onextent.akka.eventhubs.Connector;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.JavaConverters$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.compat.java8.FutureConverters$;
import scala.concurrent.Await$;
import scala.concurrent.duration.package;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Statics;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: AbstractPartitionReader.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005Ea!B\u0007\u000f\u0003\u0003)\u0002\u0002C\u0017\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0018\t\u0011E\u0002!\u0011!Q\u0001\nIBQA\u000e\u0001\u0005\u0002]Bqa\u000f\u0001A\u0002\u0013\u0005A\bC\u0004G\u0001\u0001\u0007I\u0011A$\t\r5\u0003\u0001\u0015)\u0003>\u0011\u001dq\u0005A1A\u0005\u0002=Caa\u0015\u0001!\u0002\u0013\u0001\u0006\u0002\u0003+\u0001\u0011\u000b\u0007I\u0011A+\t\u000be\u0003A\u0011\u0001.\t\u000by\u0003A\u0011A0\t\u000be\u0004A\u0011\t>\u0003/\u0005\u00137\u000f\u001e:bGR\u0004\u0016M\u001d;ji&|gNU3bI\u0016\u0014(BA\b\u0011\u0003%)g/\u001a8uQV\u00147O\u0003\u0002\u0012%\u0005!\u0011m[6b\u0015\u0005\u0019\u0012\u0001C8oKb$XM\u001c;\u0004\u0001M!\u0001A\u0006\u000f$!\t9\"$D\u0001\u0019\u0015\u0005I\u0012!B:dC2\f\u0017BA\u000e\u0019\u0005\u0019\te.\u001f*fMB\u0011Q$I\u0007\u0002=)\u0011q\u0004I\u0001\u0006C\u000e$xN\u001d\u0006\u0002#%\u0011!E\b\u0002\u0006\u0003\u000e$xN\u001d\t\u0003I-j\u0011!\n\u0006\u0003M\u001d\nAb]2bY\u0006dwnZ4j]\u001eT!\u0001K\u0015\u0002\u0011QL\b/Z:bM\u0016T\u0011AK\u0001\u0004G>l\u0017B\u0001\u0017&\u0005-a\u0015M_=M_\u001e<\u0017N\\4\u0002\u0017A\f'\u000f^5uS>t\u0017\n\u001a\t\u0003/=J!\u0001\r\r\u0003\u0007%sG/\u0001\u0007fm\u0016tG\u000fS;c\u0007>tg\r\u0005\u00024i5\ta\"\u0003\u00026\u001d\taQI^3oi\"+(mQ8oM\u00061A(\u001b8jiz\"2\u0001O\u001d;!\t\u0019\u0004\u0001C\u0003.\u0007\u0001\u0007a\u0006C\u00032\u0007\u0001\u0007!'A\u0003ti\u0006$X-F\u0001>!\tqD)D\u0001@\u0015\ty\u0001I\u0003\u0002B\u0005\u0006)\u0011M_;sK*\u00111)K\u0001\n[&\u001c'o\\:pMRL!!R \u0003\u001b\u00153XM\u001c;Q_NLG/[8o\u0003%\u0019H/\u0019;f?\u0012*\u0017\u000f\u0006\u0002I\u0017B\u0011q#S\u0005\u0003\u0015b\u0011A!\u00168ji\"9A*BA\u0001\u0002\u0004i\u0014a\u0001=%c\u000511\u000f^1uK\u0002\n\u0001\"\u001a5DY&,g\u000e^\u000b\u0002!B\u0011a(U\u0005\u0003%~\u0012a\"\u0012<f]RDUOY\"mS\u0016tG/A\u0005fQ\u000ec\u0017.\u001a8uA\u0005A!/Z2fSZ,'/F\u0001W!\tqt+\u0003\u0002Y\u007f\t\t\u0002+\u0019:uSRLwN\u001c*fG\u0016Lg/\u001a:\u0002\u0019%t\u0017\u000e\u001e*fG\u0016Lg/\u001a:\u0016\u0003m\u00032a\u0006/I\u0013\ti\u0006DA\u0005Gk:\u001cG/[8oa\u0005!!/Z1e)\u0005\u0001\u0007cA1jY:\u0011!m\u001a\b\u0003G\u001al\u0011\u0001\u001a\u0006\u0003KR\ta\u0001\u0010:p_Rt\u0014\"A\r\n\u0005!D\u0012a\u00029bG.\fw-Z\u0005\u0003U.\u0014A\u0001T5ti*\u0011\u0001\u000e\u0007\t\u0003[Zt!A\u001c;\u000f\u0005=\u001chB\u00019s\u001d\t\u0019\u0017/C\u0001\u0014\u0013\t\t\"#\u0003\u0002\u0010!%\u0011QOD\u0001\n\u0007>tg.Z2u_JL!a\u001e=\u0003\u000b\u00153XM\u001c;\u000b\u0005Ut\u0011A\u00039sKJ+7\u000f^1siR!\u0001j_A\u0001\u0011\u0015aH\u00021\u0001~\u0003\u0019\u0011X-Y:p]B\u0011\u0011M`\u0005\u0003\u007f.\u0014\u0011\u0002\u00165s_^\f'\r\\3\t\u000f\u0005\rA\u00021\u0001\u0002\u0006\u00059Q.Z:tC\u001e,\u0007#B\f\u0002\b\u0005-\u0011bAA\u00051\t1q\n\u001d;j_:\u00042aFA\u0007\u0013\r\ty\u0001\u0007\u0002\u0004\u0003:L\b")
/* loaded from: input_file:onextent/akka/eventhubs/AbstractPartitionReader.class */
public abstract class AbstractPartitionReader implements Actor, LazyLogging {
    private PartitionReceiver receiver;
    private final int partitionId;
    private final EventHubConf eventHubConf;
    private EventPosition state;
    private final EventHubClient ehClient;
    private transient Logger logger;
    private ActorContext context;
    private ActorRef self;
    private volatile boolean bitmap$0;
    private volatile transient boolean bitmap$trans$0;

    public final ActorRef sender() {
        return Actor.sender$(this);
    }

    @InternalApi
    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        Actor.aroundReceive$(this, partialFunction, obj);
    }

    @InternalApi
    public void aroundPreStart() {
        Actor.aroundPreStart$(this);
    }

    @InternalApi
    public void aroundPostStop() {
        Actor.aroundPostStop$(this);
    }

    @InternalApi
    public void aroundPreRestart(Throwable th, Option<Object> option) {
        Actor.aroundPreRestart$(this, th, option);
    }

    @InternalApi
    public void aroundPostRestart(Throwable th) {
        Actor.aroundPostRestart$(this, th);
    }

    public SupervisorStrategy supervisorStrategy() {
        return Actor.supervisorStrategy$(this);
    }

    public void preStart() throws Exception {
        Actor.preStart$(this);
    }

    public void postStop() throws Exception {
        Actor.postStop$(this);
    }

    public void postRestart(Throwable th) throws Exception {
        Actor.postRestart$(this, th);
    }

    public void unhandled(Object obj) {
        Actor.unhandled$(this, obj);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [onextent.akka.eventhubs.AbstractPartitionReader] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$(this);
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? logger$lzycompute() : this.logger;
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public EventPosition state() {
        return this.state;
    }

    public void state_$eq(EventPosition eventPosition) {
        this.state = eventPosition;
    }

    public EventHubClient ehClient() {
        return this.ehClient;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [onextent.akka.eventhubs.AbstractPartitionReader] */
    private PartitionReceiver receiver$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.receiver = ehClient().createEpochReceiverSync(this.eventHubConf.ehConsumerGroup(), BoxesRunTime.boxToInteger(this.partitionId).toString(), state(), System.currentTimeMillis());
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.receiver;
    }

    public PartitionReceiver receiver() {
        return !this.bitmap$0 ? receiver$lzycompute() : this.receiver;
    }

    public Function0<BoxedUnit> initReceiver() {
        return () -> {
            this.receiver().setReceiveTimeout(Duration.ofSeconds(20L));
        };
    }

    public List<Connector.Event> read() {
        List list;
        List list2;
        List list3 = (List) package$.MODULE$.List().apply(Nil$.MODULE$);
        while (true) {
            List list4 = list3;
            if (!list4.isEmpty()) {
                return list4.map(eventData -> {
                    return new Connector.Event(this.self(), this.partitionId, eventData);
                });
            }
            Success success = (Try) Await$.MODULE$.ready(FutureConverters$.MODULE$.toScala(receiver().receive(this.eventHubConf.ehReceiverBatchSize())), new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(60)).seconds()).value().get();
            boolean z = false;
            if (success instanceof Success) {
                z = true;
                Iterable iterable = (Iterable) success.value();
                if (iterable != null) {
                    Some apply = Option$.MODULE$.apply(iterable.iterator());
                    if (apply instanceof Some) {
                        Iterator it = (Iterator) apply.value();
                        if (it.hasNext()) {
                            List list5 = ((IterableOnceOps) JavaConverters$.MODULE$.asScalaIteratorConverter(it).asScala()).toList();
                            if (logger().underlying().isDebugEnabled()) {
                                logger().underlying().debug("read {} java future messages with read batch size of {}", new Object[]{BoxesRunTime.boxToInteger(list5.length()), BoxesRunTime.boxToInteger(this.eventHubConf.ehReceiverBatchSize())});
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                            } else {
                                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                            }
                            list2 = list5;
                            list = list2;
                            list3 = list;
                        }
                    }
                    list2 = (List) package$.MODULE$.List().apply(Nil$.MODULE$);
                    list = list2;
                    list3 = list;
                }
            }
            if (!z) {
                if (!(success instanceof Failure)) {
                    throw new MatchError(success);
                }
                Throwable exception = ((Failure) success).exception();
                if (logger().underlying().isErrorEnabled()) {
                    logger().underlying().error(new StringBuilder(19).append("read failed due to ").append(exception).toString(), exception);
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
                throw exception;
            }
            if (logger().underlying().isDebugEnabled()) {
                logger().underlying().debug("read failed - null result, non-fatal");
                BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
            }
            list = (List) package$.MODULE$.List().apply(Nil$.MODULE$);
            list3 = list;
        }
    }

    public void preRestart(Throwable th, Option<Object> option) {
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("preRestart calling close on ehClient for pid {}", BoxesRunTime.boxToInteger(this.partitionId));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        ehClient().closeSync();
        Actor.preRestart$(this, th, option);
    }

    public AbstractPartitionReader(int i, EventHubConf eventHubConf) {
        this.partitionId = i;
        this.eventHubConf = eventHubConf;
        Actor.$init$(this);
        LazyLogging.$init$(this);
        this.state = eventHubConf.defaultOffset().equals("LATEST") ? EventPosition.fromEndOfStream() : EventPosition.fromStartOfStream();
        this.ehClient = eventHubConf.createClient();
        Statics.releaseFence();
    }
}
