package monix.kafka;

import monix.eval.Callback;
import monix.eval.Callback$;
import monix.eval.Callback$Extensions$;
import monix.eval.Task;
import monix.eval.Task$;
import monix.execution.Ack;
import monix.execution.Ack$;
import monix.execution.Ack$AckExtensions$;
import monix.execution.Ack$Continue$;
import monix.execution.Ack$Stop$;
import monix.execution.Cancelable;
import monix.execution.Cancelable$;
import monix.execution.Scheduler;
import monix.execution.Scheduler$;
import monix.kafka.config.ObservableCommitType;
import monix.kafka.config.ObservableCommitType$Async$;
import monix.kafka.config.ObservableCommitType$Sync$;
import monix.reactive.Observable;
import monix.reactive.Observer$;
import monix.reactive.observers.Subscriber;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import scala.MatchError;
import scala.Option;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.concurrent.Future$;
import scala.concurrent.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

/* compiled from: KafkaConsumerObservable.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]d\u0001B\u0001\u0003\u0005\u001d\u0011qcS1gW\u0006\u001cuN\\:v[\u0016\u0014xJY:feZ\f'\r\\3\u000b\u0005\r!\u0011!B6bM.\f'\"A\u0003\u0002\u000b5|g.\u001b=\u0004\u0001U\u0019\u0001BH\u0016\u0014\u0005\u0001I\u0001c\u0001\u0006\u000e\u001f5\t1B\u0003\u0002\r\t\u0005A!/Z1di&4X-\u0003\u0002\u000f\u0017\tQqJY:feZ\f'\r\\3\u0011\tAQBDK\u0007\u0002#)\u0011!cE\u0001\tG>t7/^7fe*\u0011A#F\u0001\bG2LWM\u001c;t\u0015\t\u0019aC\u0003\u0002\u00181\u00051\u0011\r]1dQ\u0016T\u0011!G\u0001\u0004_J<\u0017BA\u000e\u0012\u00059\u0019uN\\:v[\u0016\u0014(+Z2pe\u0012\u0004\"!\b\u0010\r\u0001\u0011)q\u0004\u0001b\u0001A\t\t1*\u0005\u0002\"OA\u0011!%J\u0007\u0002G)\tA%A\u0003tG\u0006d\u0017-\u0003\u0002'G\t9aj\u001c;iS:<\u0007C\u0001\u0012)\u0013\tI3EA\u0002B]f\u0004\"!H\u0016\u0005\u000b1\u0002!\u0019\u0001\u0011\u0003\u0003YC\u0001B\f\u0001\u0003\u0002\u0003\u0006IaL\u0001\u0007G>tg-[4\u0011\u0005A\nT\"\u0001\u0002\n\u0005I\u0012!aE&bM.\f7i\u001c8tk6,'oQ8oM&<\u0007\u0002\u0003\n\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001b\u0011\u0007UB$(D\u00017\u0015\t9D!\u0001\u0003fm\u0006d\u0017BA\u001d7\u0005\u0011!\u0016m]6\u0011\tAYDDK\u0005\u0003yE\u0011QbS1gW\u0006\u001cuN\\:v[\u0016\u0014\b\"\u0002 \u0001\t\u0013y\u0014A\u0002\u001fj]&$h\bF\u0002A\u0003\n\u0003B\u0001\r\u0001\u001dU!)a&\u0010a\u0001_!)!#\u0010a\u0001i!)A\t\u0001C\u0001\u000b\u0006\tRO\\:bM\u0016\u001cVOY:de&\u0014WM\u00128\u0015\u0005\u0019c\u0005CA$K\u001b\u0005A%BA%\u0005\u0003%)\u00070Z2vi&|g.\u0003\u0002L\u0011\nQ1)\u00198dK2\f'\r\\3\t\u000b5\u001b\u0005\u0019\u0001(\u0002\u0007=,H\u000fE\u0002P%>i\u0011\u0001\u0015\u0006\u0003#.\t\u0011b\u001c2tKJ4XM]:\n\u0005M\u0003&AC*vEN\u001c'/\u001b2fe\")Q\u000b\u0001C\u0005-\u0006Aa-Z3e)\u0006\u001c8\u000e\u0006\u0002X7B\u0019Q\u0007\u000f-\u0011\u0005\tJ\u0016B\u0001.$\u0005\u0011)f.\u001b;\t\u000b5#\u0006\u0019\u0001(\b\u000bu\u0013\u0001\u0012\u00010\u0002/-\u000bgm[1D_:\u001cX/\\3s\u001f\n\u001cXM\u001d<bE2,\u0007C\u0001\u0019`\r\u0015\t!\u0001#\u0001a'\ry\u0016\r\u001a\t\u0003E\tL!aY\u0012\u0003\r\u0005s\u0017PU3g!\t\u0011S-\u0003\u0002gG\ta1+\u001a:jC2L'0\u00192mK\")ah\u0018C\u0001QR\ta\fC\u0003k?\u0012\u00051.A\u0003baBd\u00170F\u0002m_F$2!\u001c:u!\u0011\u0001\u0004A\u001c9\u0011\u0005uyG!B\u0010j\u0005\u0004\u0001\u0003CA\u000fr\t\u0015a\u0013N1\u0001!\u0011\u0015\u0019\u0018\u000e1\u00010\u0003\r\u0019gm\u001a\u0005\u0006%%\u0004\r!\u001e\t\u0004ka2\b\u0003\u0002\t<]BDQA[0\u0005\u0002a,2!_?��)\u0015Q\u0018\u0011CA\n)\u0015Y\u0018\u0011AA\u0006!\u0011\u0001\u0004\u0001 @\u0011\u0005uiH!B\u0010x\u0005\u0004\u0001\u0003CA\u000f��\t\u0015asO1\u0001!\u0011\u001d\t\u0019a\u001ea\u0002\u0003\u000b\t\u0011a\u0013\t\u0005a\u0005\u001dA0C\u0002\u0002\n\t\u0011A\u0002R3tKJL\u0017\r\\5{KJDq!!\u0004x\u0001\b\ty!A\u0001W!\u0011\u0001\u0014q\u0001@\t\u000bM<\b\u0019A\u0018\t\u000f\u0005Uq\u000f1\u0001\u0002\u0018\u00051Ao\u001c9jGN\u0004b!!\u0007\u0002*\u0005=b\u0002BA\u000e\u0003KqA!!\b\u0002$5\u0011\u0011q\u0004\u0006\u0004\u0003C1\u0011A\u0002\u001fs_>$h(C\u0001%\u0013\r\t9cI\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\tY#!\f\u0003\t1K7\u000f\u001e\u0006\u0004\u0003O\u0019\u0003\u0003BA\u0019\u0003sqA!a\r\u00026A\u0019\u0011QD\u0012\n\u0007\u0005]2%\u0001\u0004Qe\u0016$WMZ\u0005\u0005\u0003w\tiD\u0001\u0004TiJLgn\u001a\u0006\u0004\u0003o\u0019\u0003bBA!?\u0012\u0005\u00111I\u0001\u000fGJ,\u0017\r^3D_:\u001cX/\\3s+\u0019\t)%a\u0014\u0002TQ1\u0011qIA/\u0003?\"b!!\u0013\u0002V\u0005e\u0003\u0003B\u001b9\u0003\u0017\u0002b\u0001E\u001e\u0002N\u0005E\u0003cA\u000f\u0002P\u00111q$a\u0010C\u0002\u0001\u00022!HA*\t\u0019a\u0013q\bb\u0001A!A\u00111AA \u0001\b\t9\u0006E\u00031\u0003\u000f\ti\u0005\u0003\u0005\u0002\u000e\u0005}\u00029AA.!\u0015\u0001\u0014qAA)\u0011\u0019q\u0013q\ba\u0001_!A\u0011QCA \u0001\u0004\t9\u0002C\u0005\u0002d}\u000b\t\u0011\"\u0003\u0002f\u0005Y!/Z1e%\u0016\u001cx\u000e\u001c<f)\t\t9\u0007\u0005\u0003\u0002j\u0005MTBAA6\u0015\u0011\ti'a\u001c\u0002\t1\fgn\u001a\u0006\u0003\u0003c\nAA[1wC&!\u0011QOA6\u0005\u0019y%M[3di\u0002")
/* loaded from: input_file:monix/kafka/KafkaConsumerObservable.class */
public final class KafkaConsumerObservable<K, V> extends Observable<ConsumerRecord<K, V>> {
    private final KafkaConsumerConfig config;
    private final Task<KafkaConsumer<K, V>> consumer;

    public static <K, V> Task<KafkaConsumer<K, V>> createConsumer(KafkaConsumerConfig kafkaConsumerConfig, List<String> list, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return KafkaConsumerObservable$.MODULE$.createConsumer(kafkaConsumerConfig, list, deserializer, deserializer2);
    }

    public static <K, V> KafkaConsumerObservable<K, V> apply(KafkaConsumerConfig kafkaConsumerConfig, List<String> list, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return KafkaConsumerObservable$.MODULE$.apply(kafkaConsumerConfig, list, deserializer, deserializer2);
    }

    public static <K, V> KafkaConsumerObservable<K, V> apply(KafkaConsumerConfig kafkaConsumerConfig, Task<KafkaConsumer<K, V>> task) {
        return KafkaConsumerObservable$.MODULE$.apply(kafkaConsumerConfig, task);
    }

    public Cancelable unsafeSubscribeFn(final Subscriber<ConsumerRecord<K, V>> subscriber) {
        final KafkaConsumerObservable kafkaConsumerObservable = null;
        return feedTask(subscriber).runAsync(new Callback<BoxedUnit>(kafkaConsumerObservable, subscriber) { // from class: monix.kafka.KafkaConsumerObservable$$anon$1
            private final Subscriber out$1;

            public void onSuccess(BoxedUnit boxedUnit) {
                this.out$1.onComplete();
            }

            public void onError(Throwable th) {
                this.out$1.onError(th);
            }

            {
                this.out$1 = subscriber;
            }
        }, subscriber.scheduler());
    }

    private Task<BoxedUnit> feedTask(Subscriber<ConsumerRecord<K, V>> subscriber) {
        long millis = this.config.fetchMaxWaitTime().toMillis();
        boolean z = !this.config.enableAutoCommit() && this.config.observableCommitOrder().isBefore();
        boolean z2 = !this.config.enableAutoCommit() && this.config.observableCommitOrder().isAfter();
        return Task$.MODULE$.unsafeCreate((context, callback) -> {
            $anonfun$feedTask$10(this, subscriber, millis, z, z2, context, callback);
            return BoxedUnit.UNIT;
        });
    }

    private final void consumerCommit$1(KafkaConsumer kafkaConsumer) {
        ObservableCommitType observableCommitType = this.config.observableCommitType();
        if (ObservableCommitType$Sync$.MODULE$.equals(observableCommitType)) {
        } else {
            if (!ObservableCommitType$Async$.MODULE$.equals(observableCommitType)) {
                throw new MatchError(observableCommitType);
            }
        }
    }

    public static final /* synthetic */ void $anonfun$feedTask$6(KafkaConsumerObservable kafkaConsumerObservable, boolean z, KafkaConsumer kafkaConsumer, Scheduler scheduler, Task.Context context, Callback callback, Try r11) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        if (!(r11 instanceof Success)) {
            if (!(r11 instanceof Failure)) {
                throw new MatchError(r11);
            }
            Callback$Extensions$.MODULE$.asyncOnError$extension(Callback$.MODULE$.Extensions(callback), ((Failure) r11).exception(), scheduler);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        Ack ack = (Ack) ((Success) r11).value();
        try {
            synchronized (kafkaConsumer) {
                if (context.connection().isCanceled()) {
                    Callback$Extensions$.MODULE$.asyncOnSuccess$extension(Callback$.MODULE$.Extensions(callback), Ack$Stop$.MODULE$, scheduler);
                } else {
                    if (z) {
                        kafkaConsumerObservable.consumerCommit$1(kafkaConsumer);
                    }
                    Callback$Extensions$.MODULE$.asyncOnSuccess$extension(Callback$.MODULE$.Extensions(callback), ack, scheduler);
                }
            }
            boxedUnit2 = BoxedUnit.UNIT;
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            Throwable th2 = (Throwable) unapply.get();
            if (1 != 0) {
                Callback$Extensions$.MODULE$.asyncOnError$extension(Callback$.MODULE$.Extensions(callback), th2, scheduler);
                boxedUnit = BoxedUnit.UNIT;
            } else {
                scheduler.reportFailure(th2);
                boxedUnit = BoxedUnit.UNIT;
            }
            boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$feedTask$3(KafkaConsumerObservable kafkaConsumerObservable, Subscriber subscriber, long j, boolean z, boolean z2, KafkaConsumer kafkaConsumer, Task.Context context, Callback callback) {
        Scheduler scheduler = context.scheduler();
        new Scheduler.Extensions(Scheduler$.MODULE$.Extensions(scheduler)).executeAsync(() -> {
            Ack$Stop$ failed;
            Ack$Stop$ feed;
            Ack$Stop$ ack$Stop$;
            context.frameRef().reset();
            try {
                synchronized (kafkaConsumer) {
                    if (context.connection().isCanceled()) {
                        feed = Ack$Stop$.MODULE$;
                    } else {
                        ConsumerRecords consumerRecords = (ConsumerRecords) package$.MODULE$.blocking(() -> {
                            return kafkaConsumer.poll(j);
                        });
                        if (z) {
                            kafkaConsumerObservable.consumerCommit$1(kafkaConsumer);
                        }
                        feed = Observer$.MODULE$.feed(subscriber, (Iterable) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(consumerRecords).asScala(), subscriber.scheduler());
                    }
                    ack$Stop$ = feed;
                }
                failed = ack$Stop$;
            } catch (Throwable th) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (unapply.isEmpty()) {
                    throw th;
                }
                failed = Future$.MODULE$.failed((Throwable) unapply.get());
            }
            Ack$AckExtensions$.MODULE$.syncOnComplete$extension(Ack$.MODULE$.AckExtensions(failed), r14 -> {
                $anonfun$feedTask$6(kafkaConsumerObservable, z2, kafkaConsumer, scheduler, context, callback, r14);
                return BoxedUnit.UNIT;
            }, scheduler);
        });
    }

    private final Task runLoop$1(KafkaConsumer kafkaConsumer, Subscriber subscriber, long j, boolean z, boolean z2) {
        return Task$.MODULE$.unsafeCreate((context, callback) -> {
            $anonfun$feedTask$3(this, subscriber, j, z, z2, kafkaConsumer, context, callback);
            return BoxedUnit.UNIT;
        }).flatMap(ack -> {
            Task runLoop$1;
            if (Ack$Stop$.MODULE$.equals(ack)) {
                runLoop$1 = Task$.MODULE$.unit();
            } else {
                if (!Ack$Continue$.MODULE$.equals(ack)) {
                    throw new MatchError(ack);
                }
                runLoop$1 = this.runLoop$1(kafkaConsumer, subscriber, j, z, z2);
            }
            return runLoop$1;
        });
    }

    private static final Task cancelTask$1(KafkaConsumer kafkaConsumer) {
        return Task$.MODULE$.apply(() -> {
            synchronized (kafkaConsumer) {
                package$.MODULE$.blocking(() -> {
                    kafkaConsumer.close();
                });
            }
        }).memoize();
    }

    public static final /* synthetic */ void $anonfun$feedTask$10(KafkaConsumerObservable kafkaConsumerObservable, Subscriber subscriber, long j, boolean z, boolean z2, Task.Context context, Callback callback) {
        Scheduler scheduler = context.scheduler();
        Task$.MODULE$.unsafeStartNow(kafkaConsumerObservable.consumer.flatMap(kafkaConsumer -> {
            if (kafkaConsumerObservable.config.observableSeekToEndOnStart()) {
                kafkaConsumer.seekToEnd(JavaConverters$.MODULE$.asJavaCollectionConverter(Nil$.MODULE$).asJavaCollection());
            }
            Task cancelTask$1 = cancelTask$1(kafkaConsumer);
            context.connection().push(Cancelable$.MODULE$.apply(() -> {
                cancelTask$1.runAsync(scheduler);
            }));
            return kafkaConsumerObservable.runLoop$1(kafkaConsumer, subscriber, j, z, z2).doOnFinish(option -> {
                return cancelTask$1;
            });
        }), context, callback);
    }

    public KafkaConsumerObservable(KafkaConsumerConfig kafkaConsumerConfig, Task<KafkaConsumer<K, V>> task) {
        this.config = kafkaConsumerConfig;
        this.consumer = task;
    }
}
