package monix.kafka;

import monix.eval.Task;
import monix.eval.Task$;
import monix.eval.Task$AsyncBuilder$;
import monix.eval.Task$AsyncBuilder$CreatePartiallyApplied$;
import monix.execution.Ack;
import monix.execution.Ack$;
import monix.execution.Ack$AckExtensions$;
import monix.execution.Ack$Stop$;
import monix.execution.Callback;
import monix.execution.Callback$;
import monix.execution.Cancelable;
import monix.execution.Scheduler;
import monix.execution.Scheduler$;
import monix.execution.cancelables.BooleanCancelable;
import monix.execution.cancelables.BooleanCancelable$;
import monix.kafka.config.ObservableCommitType;
import monix.kafka.config.ObservableCommitType$Async$;
import monix.kafka.config.ObservableCommitType$Sync$;
import monix.reactive.Observable;
import monix.reactive.Observer$;
import monix.reactive.observers.Subscriber;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import scala.MatchError;
import scala.Option;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.concurrent.Future$;
import scala.concurrent.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

/* compiled from: KafkaConsumerObservableAutoCommit.scala */
@ScalaSignature(bytes = "\u0006\u0005i4AAD\b\u0003)!Aa\b\u0001BC\u0002\u0013Es\b\u0003\u0005D\u0001\t\u0005\t\u0015!\u0003A\u0011!y\u0002A!b\u0001\n#\"\u0005\u0002\u0003(\u0001\u0005\u0003\u0005\u000b\u0011B#\t\r=\u0003A\u0011A\bQ\u0011\u0015!\u0006\u0001\"\u0003V\u0011\u001dQ\u0006A1A\u0005\nmCaa\u0018\u0001!\u0002\u0013a\u0006b\u00021\u0001\u0005\u0004%I!\u0019\u0005\u0007K\u0002\u0001\u000b\u0011\u00022\t\u000f\u0019\u0004!\u0019!C\u0005C\"1q\r\u0001Q\u0001\n\tDQ\u0001\u001b\u0001\u0005R%\u0014\u0011eS1gW\u0006\u001cuN\\:v[\u0016\u0014xJY:feZ\f'\r\\3BkR|7i\\7nSRT!\u0001E\t\u0002\u000b-\fgm[1\u000b\u0003I\tQ!\\8oSb\u001c\u0001!F\u0002\u0016Wa\u001a2\u0001\u0001\f;!\r9\"\u0004H\u0007\u00021)\u0011\u0011$E\u0001\te\u0016\f7\r^5wK&\u00111\u0004\u0007\u0002\u000b\u001f\n\u001cXM\u001d<bE2,\u0007\u0003B\u000f(S]j\u0011A\b\u0006\u0003?\u0001\n\u0001bY8ogVlWM\u001d\u0006\u0003C\t\nqa\u00197jK:$8O\u0003\u0002\u0011G)\u0011A%J\u0001\u0007CB\f7\r[3\u000b\u0003\u0019\n1a\u001c:h\u0013\tAcD\u0001\bD_:\u001cX/\\3s%\u0016\u001cwN\u001d3\u0011\u0005)ZC\u0002\u0001\u0003\u0006Y\u0001\u0011\r!\f\u0002\u0002\u0017F\u0011a\u0006\u000e\t\u0003_Ij\u0011\u0001\r\u0006\u0002c\u0005)1oY1mC&\u00111\u0007\r\u0002\b\u001d>$\b.\u001b8h!\tyS'\u0003\u00027a\t\u0019\u0011I\\=\u0011\u0005)BD!B\u001d\u0001\u0005\u0004i#!\u0001,\u0011\u000bmb\u0014f\u000e\u000f\u000e\u0003=I!!P\b\u0003/-\u000bgm[1D_:\u001cX/\\3s\u001f\n\u001cXM\u001d<bE2,\u0017AB2p]\u001aLw-F\u0001A!\tY\u0014)\u0003\u0002C\u001f\t\u00192*\u00194lC\u000e{gn];nKJ\u001cuN\u001c4jO\u000691m\u001c8gS\u001e\u0004S#A#\u0011\u0007\u0019K5*D\u0001H\u0015\tA\u0015#\u0001\u0003fm\u0006d\u0017B\u0001&H\u0005\u0011!\u0016m]6\u0011\tua\u0015fN\u0005\u0003\u001bz\u0011QbS1gW\u0006\u001cuN\\:v[\u0016\u0014\u0018!C2p]N,X.\u001a:!\u0003\u0019a\u0014N\\5u}Q\u0019\u0011KU*\u0011\tm\u0002\u0011f\u000e\u0005\u0006}\u0015\u0001\r\u0001\u0011\u0005\u0006?\u0015\u0001\r!R\u0001\u000fG>t7/^7fe\u000e{W.\\5u)\t1\u0016\f\u0005\u00020/&\u0011\u0001\f\r\u0002\u0005+:LG\u000fC\u0003 \r\u0001\u00071*A\tq_2dG+[7f_V$X*\u001b7mSN,\u0012\u0001\u0018\t\u0003_uK!A\u0018\u0019\u0003\t1{gnZ\u0001\u0013a>dG\u000eV5nK>,H/T5mY&\u001c\b%\u0001\ntQ>,H\u000eZ\"p[6LGOQ3g_J,W#\u00012\u0011\u0005=\u001a\u0017B\u000131\u0005\u001d\u0011un\u001c7fC:\f1c\u001d5pk2$7i\\7nSR\u0014UMZ8sK\u0002\n\u0011c\u001d5pk2$7i\\7nSR\fe\r^3s\u0003I\u0019\bn\\;mI\u000e{W.\\5u\u0003\u001a$XM\u001d\u0011\u0002\u000f\u0005\u001c7\u000eV1tWR\u0019!.\u001d:\u0011\u0007\u0019K5\u000e\u0005\u0002m_6\tQN\u0003\u0002o#\u0005IQ\r_3dkRLwN\\\u0005\u0003a6\u00141!Q2l\u0011\u0015yR\u00021\u0001L\u0011\u0015\u0019X\u00021\u0001u\u0003\ryW\u000f\u001e\t\u0004kbdR\"\u0001<\u000b\u0005]D\u0012!C8cg\u0016\u0014h/\u001a:t\u0013\tIhO\u0001\u0006Tk\n\u001c8M]5cKJ\u0004")
/* loaded from: input_file:monix/kafka/KafkaConsumerObservableAutoCommit.class */
public final class KafkaConsumerObservableAutoCommit<K, V> extends Observable<ConsumerRecord<K, V>> implements KafkaConsumerObservable<K, V, ConsumerRecord<K, V>> {
    private final KafkaConsumerConfig config;
    private final Task<KafkaConsumer<K, V>> consumer;
    private final long pollTimeoutMillis;
    private final boolean shouldCommitBefore;
    private final boolean shouldCommitAfter;

    @Override // monix.kafka.KafkaConsumerObservable
    public final Cancelable unsafeSubscribeFn(Subscriber<ConsumerRecord<K, V>> subscriber) {
        Cancelable unsafeSubscribeFn;
        unsafeSubscribeFn = unsafeSubscribeFn(subscriber);
        return unsafeSubscribeFn;
    }

    @Override // monix.kafka.KafkaConsumerObservable
    public KafkaConsumerConfig config() {
        return this.config;
    }

    @Override // monix.kafka.KafkaConsumerObservable
    public Task<KafkaConsumer<K, V>> consumer() {
        return this.consumer;
    }

    private void consumerCommit(KafkaConsumer<K, V> kafkaConsumer) {
        ObservableCommitType observableCommitType = config().observableCommitType();
        if (ObservableCommitType$Sync$.MODULE$.equals(observableCommitType)) {
        } else {
            if (!ObservableCommitType$Async$.MODULE$.equals(observableCommitType)) {
                throw new MatchError(observableCommitType);
            }
        }
    }

    private long pollTimeoutMillis() {
        return this.pollTimeoutMillis;
    }

    private boolean shouldCommitBefore() {
        return this.shouldCommitBefore;
    }

    private boolean shouldCommitAfter() {
        return this.shouldCommitAfter;
    }

    @Override // monix.kafka.KafkaConsumerObservable
    public Task<Ack> ackTask(KafkaConsumer<K, V> kafkaConsumer, Subscriber<ConsumerRecord<K, V>> subscriber) {
        return Task$AsyncBuilder$CreatePartiallyApplied$.MODULE$.apply$extension(Task$.MODULE$.create(), (scheduler, callback) -> {
            Callback forked = Callback$.MODULE$.forked(callback, scheduler);
            BooleanCancelable apply = BooleanCancelable$.MODULE$.apply();
            new Scheduler.Extensions(Scheduler$.MODULE$.Extensions(scheduler)).executeAsync(() -> {
                Ack$Stop$ failed;
                Ack$Stop$ feed;
                Ack$Stop$ ack$Stop$;
                try {
                    synchronized (kafkaConsumer) {
                        if (apply.isCanceled()) {
                            feed = Ack$Stop$.MODULE$;
                        } else {
                            ConsumerRecords consumerRecords = (ConsumerRecords) package$.MODULE$.blocking(() -> {
                                return kafkaConsumer.poll(this.pollTimeoutMillis());
                            });
                            if (this.shouldCommitBefore()) {
                                this.consumerCommit(kafkaConsumer);
                            }
                            feed = Observer$.MODULE$.feed(subscriber, (Iterable) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(consumerRecords).asScala(), subscriber.scheduler());
                        }
                        ack$Stop$ = feed;
                    }
                    failed = ack$Stop$;
                } catch (Throwable th) {
                    if (th != null) {
                        Option unapply = NonFatal$.MODULE$.unapply(th);
                        if (!unapply.isEmpty()) {
                            failed = Future$.MODULE$.failed((Throwable) unapply.get());
                        }
                    }
                    throw th;
                }
                Ack$AckExtensions$.MODULE$.syncOnComplete$extension(Ack$.MODULE$.AckExtensions(failed), r12 -> {
                    $anonfun$ackTask$4(this, kafkaConsumer, apply, forked, scheduler, r12);
                    return BoxedUnit.UNIT;
                }, scheduler);
            });
            return apply;
        }, Task$AsyncBuilder$.MODULE$.forCancelable());
    }

    public static final /* synthetic */ void $anonfun$ackTask$4(KafkaConsumerObservableAutoCommit kafkaConsumerObservableAutoCommit, KafkaConsumer kafkaConsumer, BooleanCancelable booleanCancelable, Callback callback, Scheduler scheduler, Try r9) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        if (!(r9 instanceof Success)) {
            if (!(r9 instanceof Failure)) {
                throw new MatchError(r9);
            }
            callback.onError(((Failure) r9).exception());
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        Ack ack = (Ack) ((Success) r9).value();
        try {
            synchronized (kafkaConsumer) {
                if (booleanCancelable.isCanceled()) {
                    callback.onSuccess(Ack$Stop$.MODULE$);
                } else {
                    if (kafkaConsumerObservableAutoCommit.shouldCommitAfter()) {
                        kafkaConsumerObservableAutoCommit.consumerCommit(kafkaConsumer);
                    }
                    callback.onSuccess(ack);
                }
            }
            boxedUnit2 = BoxedUnit.UNIT;
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    Throwable th2 = (Throwable) unapply.get();
                    if (1 != 0) {
                        callback.onError(th2);
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        scheduler.reportFailure(th2);
                        boxedUnit = BoxedUnit.UNIT;
                    }
                    boxedUnit2 = BoxedUnit.UNIT;
                }
            }
            throw th;
        }
    }

    public KafkaConsumerObservableAutoCommit(KafkaConsumerConfig kafkaConsumerConfig, Task<KafkaConsumer<K, V>> task) {
        this.config = kafkaConsumerConfig;
        this.consumer = task;
        KafkaConsumerObservable.$init$(this);
        this.pollTimeoutMillis = kafkaConsumerConfig.fetchMaxWaitTime().toMillis();
        this.shouldCommitBefore = !kafkaConsumerConfig.enableAutoCommit() && kafkaConsumerConfig.observableCommitOrder().isBefore();
        this.shouldCommitAfter = !kafkaConsumerConfig.enableAutoCommit() && kafkaConsumerConfig.observableCommitOrder().isAfter();
    }
}
