package onextent.akka.eventhubs;

import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.SupervisorStrategy;
import akka.annotation.InternalApi;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import onextent.akka.eventhubs.Connector;
import scala.Function0;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.JavaConverters$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Statics;

/* compiled from: AbstractPartitionReader.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005=b!B\b\u0011\u0003\u00039\u0002\u0002C\u0018\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0019\t\u0011M\u0002!\u0011!Q\u0001\nQBQ\u0001\u000f\u0001\u0005\u0002eBq!\u0010\u0001A\u0002\u0013\u0005a\bC\u0004I\u0001\u0001\u0007I\u0011A%\t\r=\u0003\u0001\u0015)\u0003@\u0011\u001d\u0001\u0006A1A\u0005\u0002ECa\u0001\u0018\u0001!\u0002\u0013\u0011\u0006bB/\u0001\u0005\u0004%\tA\u0018\u0005\u0007E\u0002\u0001\u000b\u0011B0\t\u0011\r\u0004\u0001R1A\u0005\u0002\u0011DQ\u0001\u001b\u0001\u0005\u0002%DQ!\u001c\u0001\u0005\u00029Dq!!\u0005\u0001\t\u0003\n\u0019BA\fBEN$(/Y2u!\u0006\u0014H/\u001b;j_:\u0014V-\u00193fe*\u0011\u0011CE\u0001\nKZ,g\u000e\u001e5vENT!a\u0005\u000b\u0002\t\u0005\\7.\u0019\u0006\u0002+\u0005AqN\\3yi\u0016tGo\u0001\u0001\u0014\t\u0001Ab$\n\t\u00033qi\u0011A\u0007\u0006\u00027\u0005)1oY1mC&\u0011QD\u0007\u0002\u0007\u0003:L(+\u001a4\u0011\u0005}\u0019S\"\u0001\u0011\u000b\u0005\u0005\u0012\u0013!B1di>\u0014(\"A\n\n\u0005\u0011\u0002#!B!di>\u0014\bC\u0001\u0014.\u001b\u00059#B\u0001\u0015*\u00031\u00198-\u00197bY><w-\u001b8h\u0015\tQ3&\u0001\u0005usB,7/\u00194f\u0015\u0005a\u0013aA2p[&\u0011af\n\u0002\f\u0019\u0006T\u0018\u0010T8hO&tw-A\u0006qCJ$\u0018\u000e^5p]&#\u0007CA\r2\u0013\t\u0011$DA\u0002J]R\fA\"\u001a<f]RDUOY\"p]\u001a\u0004\"!\u000e\u001c\u000e\u0003AI!a\u000e\t\u0003\u0019\u00153XM\u001c;Ik\n\u001cuN\u001c4\u0002\rqJg.\u001b;?)\rQ4\b\u0010\t\u0003k\u0001AQaL\u0002A\u0002ABQaM\u0002A\u0002Q\nQa\u001d;bi\u0016,\u0012a\u0010\t\u0003\u0001\u001ak\u0011!\u0011\u0006\u0003#\tS!a\u0011#\u0002\u000b\u0005TXO]3\u000b\u0005\u0015[\u0013!C7jGJ|7o\u001c4u\u0013\t9\u0015IA\u0007Fm\u0016tG\u000fU8tSRLwN\\\u0001\ngR\fG/Z0%KF$\"AS'\u0011\u0005eY\u0015B\u0001'\u001b\u0005\u0011)f.\u001b;\t\u000f9+\u0011\u0011!a\u0001\u007f\u0005\u0019\u0001\u0010J\u0019\u0002\rM$\u0018\r^3!\u0003=)\u00070Z2vi>\u00148+\u001a:wS\u000e,W#\u0001*\u0011\u0005MSV\"\u0001+\u000b\u0005U3\u0016AC2p]\u000e,(O]3oi*\u0011q\u000bW\u0001\u0005kRLGNC\u0001Z\u0003\u0011Q\u0017M^1\n\u0005m#&\u0001G*dQ\u0016$W\u000f\\3e\u000bb,7-\u001e;peN+'O^5dK\u0006\u0001R\r_3dkR|'oU3sm&\u001cW\rI\u0001\tK\"\u001cE.[3oiV\tq\f\u0005\u0002AA&\u0011\u0011-\u0011\u0002\u000f\u000bZ,g\u000e\u001e%vE\u000ec\u0017.\u001a8u\u0003%)\u0007n\u00117jK:$\b%\u0001\u0005sK\u000e,\u0017N^3s+\u0005)\u0007C\u0001!g\u0013\t9\u0017IA\tQCJ$\u0018\u000e^5p]J+7-Z5wKJ\fA\"\u001b8jiJ+7-Z5wKJ,\u0012A\u001b\t\u00043-T\u0015B\u00017\u001b\u0005%1UO\\2uS>t\u0007'\u0001\u0003sK\u0006$G#A8\u0011\u0007AD8P\u0004\u0002rm:\u0011!/^\u0007\u0002g*\u0011AOF\u0001\u0007yI|w\u000e\u001e \n\u0003mI!a\u001e\u000e\u0002\u000fA\f7m[1hK&\u0011\u0011P\u001f\u0002\u0005\u0019&\u001cHO\u0003\u0002x5A\u0019A0a\u0003\u000f\u0007u\f9AD\u0002\u007f\u0003\u000bq1a`A\u0002\u001d\r\u0011\u0018\u0011A\u0005\u0002+%\u00111\u0003F\u0005\u0003#II1!!\u0003\u0011\u0003%\u0019uN\u001c8fGR|'/\u0003\u0003\u0002\u000e\u0005=!!B#wK:$(bAA\u0005!\u0005Q\u0001O]3SKN$\u0018M\u001d;\u0015\u000b)\u000b)\"a\b\t\u000f\u0005]a\u00021\u0001\u0002\u001a\u00051!/Z1t_:\u00042\u0001]A\u000e\u0013\r\tiB\u001f\u0002\n)\"\u0014xn^1cY\u0016Dq!!\t\u000f\u0001\u0004\t\u0019#A\u0004nKN\u001c\u0018mZ3\u0011\u000be\t)#!\u000b\n\u0007\u0005\u001d\"D\u0001\u0004PaRLwN\u001c\t\u00043\u0005-\u0012bAA\u00175\t\u0019\u0011I\\=")
/* loaded from: input_file:onextent/akka/eventhubs/AbstractPartitionReader.class */
public abstract class AbstractPartitionReader implements Actor, LazyLogging {
    private PartitionReceiver receiver;
    private final int partitionId;
    private final EventHubConf eventHubConf;
    private EventPosition state;
    private final ScheduledExecutorService executorService;
    private final EventHubClient ehClient;
    private transient Logger logger;
    private ActorContext context;
    private ActorRef self;
    private volatile boolean bitmap$0;
    private volatile transient boolean bitmap$trans$0;

    public final ActorRef sender() {
        return Actor.sender$(this);
    }

    @InternalApi
    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        Actor.aroundReceive$(this, partialFunction, obj);
    }

    @InternalApi
    public void aroundPreStart() {
        Actor.aroundPreStart$(this);
    }

    @InternalApi
    public void aroundPostStop() {
        Actor.aroundPostStop$(this);
    }

    @InternalApi
    public void aroundPreRestart(Throwable th, Option<Object> option) {
        Actor.aroundPreRestart$(this, th, option);
    }

    @InternalApi
    public void aroundPostRestart(Throwable th) {
        Actor.aroundPostRestart$(this, th);
    }

    public SupervisorStrategy supervisorStrategy() {
        return Actor.supervisorStrategy$(this);
    }

    public void preStart() throws Exception {
        Actor.preStart$(this);
    }

    public void postStop() throws Exception {
        Actor.postStop$(this);
    }

    public void postRestart(Throwable th) throws Exception {
        Actor.postRestart$(this, th);
    }

    public void unhandled(Object obj) {
        Actor.unhandled$(this, obj);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [onextent.akka.eventhubs.AbstractPartitionReader] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$(this);
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? logger$lzycompute() : this.logger;
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public EventPosition state() {
        return this.state;
    }

    public void state_$eq(EventPosition eventPosition) {
        this.state = eventPosition;
    }

    public ScheduledExecutorService executorService() {
        return this.executorService;
    }

    public EventHubClient ehClient() {
        return this.ehClient;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [onextent.akka.eventhubs.AbstractPartitionReader] */
    private PartitionReceiver receiver$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.receiver = ehClient().createEpochReceiverSync(this.eventHubConf.ehConsumerGroup(), BoxesRunTime.boxToInteger(this.partitionId).toString(), state(), System.currentTimeMillis());
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.receiver;
    }

    public PartitionReceiver receiver() {
        return !this.bitmap$0 ? receiver$lzycompute() : this.receiver;
    }

    public Function0<BoxedUnit> initReceiver() {
        return () -> {
            this.receiver().setReceiveTimeout(Duration.ofSeconds(20L));
        };
    }

    public List<Connector.Event> read() {
        List list;
        List list2;
        List list3 = Nil$.MODULE$;
        while (true) {
            List list4 = list3;
            if (!list4.isEmpty()) {
                return list4.map(eventData -> {
                    return new Connector.Event(this.self(), this.partitionId, eventData);
                });
            }
            Some apply = Option$.MODULE$.apply(receiver().receiveSync(this.eventHubConf.ehRecieverBatchSize()));
            if (apply instanceof Some) {
                Some apply2 = Option$.MODULE$.apply(((Iterable) apply.value()).iterator());
                if (apply2 instanceof Some) {
                    Iterator it = (Iterator) apply2.value();
                    if (it.hasNext()) {
                        List list5 = ((IterableOnceOps) JavaConverters$.MODULE$.asScalaIteratorConverter(it).asScala()).toList();
                        if (logger().underlying().isDebugEnabled()) {
                            logger().underlying().debug("read {} messages with read batch size of {}", new Object[]{BoxesRunTime.boxToInteger(list5.length()), BoxesRunTime.boxToInteger(this.eventHubConf.ehRecieverBatchSize())});
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        } else {
                            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                        }
                        list2 = list5;
                        list = list2;
                    }
                }
                list2 = Nil$.MODULE$;
                list = list2;
            } else {
                list = Nil$.MODULE$;
            }
            list3 = list;
        }
    }

    public void preRestart(Throwable th, Option<Object> option) {
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("preRestart calling close on ehClient for pid {}", BoxesRunTime.boxToInteger(this.partitionId));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        ehClient().closeSync();
        Actor.preRestart$(this, th, option);
    }

    public AbstractPartitionReader(int i, EventHubConf eventHubConf) {
        this.partitionId = i;
        this.eventHubConf = eventHubConf;
        Actor.$init$(this);
        LazyLogging.$init$(this);
        this.state = eventHubConf.defaultOffset().equals("LATEST") ? EventPosition.fromEndOfStream() : EventPosition.fromStartOfStream();
        this.executorService = Executors.newScheduledThreadPool(eventHubConf.threads());
        this.ehClient = EventHubClient.createSync(eventHubConf.connStr(), executorService());
        Statics.releaseFence();
    }
}
