package kafka4m.consumer;

import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import monix.execution.ExecutionModel$SynchronousExecution$;
import monix.execution.Scheduler;
import monix.execution.Scheduler$;
import monix.execution.schedulers.SchedulerService;
import monix.reactive.Observable;
import monix.reactive.Observable$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.runtime.ModuleSerializationProxy;

/* compiled from: AckableRecord.scala */
/* loaded from: input_file:kafka4m/consumer/AckableRecord$.class */
public final class AckableRecord$ implements Serializable {
    public static final AckableRecord$ MODULE$ = new AckableRecord$();

    public <K, V> Observable<AckableRecord<ConsumerRecord<K, V>>> apply(boolean z, Scheduler scheduler, Function0<RichKafkaConsumer<K, V>> function0) {
        return withOffsets(singleThreadObservable(z, scheduler, function0), HasRecord$.MODULE$.fromTuple2()).map(tuple2 -> {
            if (tuple2 != null) {
                PartitionOffsetState partitionOffsetState = (PartitionOffsetState) tuple2._1();
                Tuple2 tuple2 = (Tuple2) tuple2._2();
                if (tuple2 != null) {
                    return new AckableRecord((RichKafkaConsumer) tuple2._1(), partitionOffsetState, (ConsumerRecord) tuple2._2());
                }
            }
            throw new MatchError(tuple2);
        });
    }

    public <K, V> Scheduler apply$default$2() {
        return singleThreadedScheduler(singleThreadedScheduler$default$1());
    }

    public <K, V> Observable<Tuple2<RichKafkaConsumer<K, V>, ConsumerRecord<K, V>>> singleThreadObservable(boolean z, Scheduler scheduler, Function0<RichKafkaConsumer<K, V>> function0) {
        return Observable$.MODULE$.delay(function0).executeOn(scheduler, true).flatMap(richKafkaConsumer -> {
            return richKafkaConsumer.asObservable(z, scheduler).map(consumerRecord -> {
                return new Tuple2(richKafkaConsumer, consumerRecord);
            });
        });
    }

    public <K, V> Scheduler singleThreadObservable$default$2() {
        return singleThreadedScheduler(singleThreadedScheduler$default$1());
    }

    public <A> Observable<Tuple2<PartitionOffsetState, A>> withOffsets(Observable<A> observable, HasRecord<A> hasRecord) {
        Tuple2 tuple2 = new Tuple2(PartitionOffsetState$.MODULE$.apply((Seq<Tuple2<String, Map<Object, Object>>>) Nil$.MODULE$), Option$.MODULE$.empty());
        return observable.scan(() -> {
            return tuple2;
        }, (tuple22, obj) -> {
            Tuple2 tuple22 = new Tuple2(tuple22, obj);
            if (tuple22 != null) {
                Tuple2 tuple23 = (Tuple2) tuple22._1();
                Object _2 = tuple22._2();
                if (tuple23 != null) {
                    return new Tuple2(((PartitionOffsetState) tuple23._1()).update(HasRecord$.MODULE$.apply(hasRecord).recordFor(_2)), Option$.MODULE$.apply(_2));
                }
            }
            throw new MatchError(tuple22);
        }).collect(new AckableRecord$$anonfun$withOffsets$3());
    }

    public SchedulerService singleThreadedScheduler(String str) {
        return Scheduler$.MODULE$.apply(singleThreadedExecutor(str), ExecutionModel$SynchronousExecution$.MODULE$);
    }

    public String singleThreadedScheduler$default$1() {
        return "SingleThreadForKafkaRead";
    }

    public ExecutorService singleThreadedExecutor(String str) {
        return singleThreadedExecutor(thread -> {
            thread.setName(str);
            thread.setDaemon(true);
            return thread;
        });
    }

    public ExecutorService singleThreadedExecutor(final Function1<Thread, Thread> function1) {
        return Executors.newSingleThreadExecutor(new ThreadFactory(function1) { // from class: kafka4m.consumer.AckableRecord$$anon$1
            private final Function1 prepare$1;

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return (Thread) this.prepare$1.apply(new Thread(runnable));
            }

            {
                this.prepare$1 = function1;
            }
        });
    }

    public <A> AckableRecord<A> apply(RichKafkaConsumer<?, ?> richKafkaConsumer, PartitionOffsetState partitionOffsetState, A a) {
        return new AckableRecord<>(richKafkaConsumer, partitionOffsetState, a);
    }

    public <A> Option<Tuple3<RichKafkaConsumer<?, ?>, PartitionOffsetState, A>> unapply(AckableRecord<A> ackableRecord) {
        return ackableRecord == null ? None$.MODULE$ : new Some(new Tuple3(ackableRecord.consumer(), ackableRecord.offset(), ackableRecord.record()));
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(AckableRecord$.class);
    }

    private AckableRecord$() {
    }
}
