package monix.kafka;

import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import monix.eval.Coeval;
import monix.eval.Task;
import monix.eval.Task$;
import monix.execution.Ack;
import monix.execution.Ack$Continue$;
import monix.execution.Ack$Stop$;
import monix.execution.Callback;
import monix.execution.Scheduler;
import monix.execution.cancelables.AssignableCancelable;
import monix.execution.cancelables.AssignableCancelable$;
import monix.reactive.Consumer;
import monix.reactive.observers.Subscriber;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List$;
import scala.concurrent.Future;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

/* compiled from: KafkaProducerSink.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%f\u0001\u0002\u0007\u000e\u0005IA\u0001b\u000b\u0001\u0003\u0002\u0003\u0006I\u0001\u0016\u0005\t=\u0002\u0011\t\u0011)A\u0005?\"A!\r\u0001B\u0001B\u0003%1\rC\u0003g\u0001\u0011%q\rC\u0003m\u0001\u0011\u0005QnB\u0004\u0002H5A\t!!\u0013\u0007\r1i\u0001\u0012AA&\u0011\u00191w\u0001\"\u0001\u0002N!9\u0011qJ\u0004\u0005\u0002\u0005E\u0003bBA(\u000f\u0011\u0005\u0011q\u0010\u0005\n\u0003+;\u0011\u0011!C\u0005\u0003/\u0013\u0011cS1gW\u0006\u0004&o\u001c3vG\u0016\u00148+\u001b8l\u0015\tqq\"A\u0003lC\u001a\\\u0017MC\u0001\u0011\u0003\u0015iwN\\5y\u0007\u0001)2aE\u001cC'\u0011\u0001AcR)\u0011\tUA\"\u0004R\u0007\u0002-)\u0011qcD\u0001\te\u0016\f7\r^5wK&\u0011\u0011D\u0006\u0002\t\u0007>t7/^7feB\u00191$\n\u0015\u000f\u0005q\u0011cBA\u000f!\u001b\u0005q\"BA\u0010\u0012\u0003\u0019a$o\\8u}%\t\u0011%A\u0003tG\u0006d\u0017-\u0003\u0002$I\u00059\u0001/Y2lC\u001e,'\"A\u0011\n\u0005\u0019:#aA*fc*\u00111\u0005\n\t\u0005SM*\u0014)D\u0001+\u0015\tYC&\u0001\u0005qe>$WoY3s\u0015\tic&A\u0004dY&,g\u000e^:\u000b\u00059y#B\u0001\u00192\u0003\u0019\t\u0007/Y2iK*\t!'A\u0002pe\u001eL!\u0001\u000e\u0016\u0003\u001dA\u0013x\u000eZ;dKJ\u0014VmY8sIB\u0011ag\u000e\u0007\u0001\t\u0015A\u0004A1\u0001:\u0005\u0005Y\u0015C\u0001\u001e?!\tYD(D\u0001%\u0013\tiDEA\u0004O_RD\u0017N\\4\u0011\u0005mz\u0014B\u0001!%\u0005\r\te.\u001f\t\u0003m\t#Qa\u0011\u0001C\u0002e\u0012\u0011A\u0016\t\u0003w\u0015K!A\u0012\u0013\u0003\tUs\u0017\u000e\u001e\t\u0003\u0011>k\u0011!\u0013\u0006\u0003\u0015.\u000bAb]2bY\u0006dwnZ4j]\u001eT!\u0001T'\u0002\u0011QL\b/Z:bM\u0016T\u0011AT\u0001\u0004G>l\u0017B\u0001)J\u00055\u0019FO]5di2{wmZ5oOB\u00111HU\u0005\u0003'\u0012\u0012AbU3sS\u0006d\u0017N_1cY\u0016\u00042!\u0016-[\u001b\u00051&BA,\u0010\u0003\u0011)g/\u00197\n\u0005e3&AB\"pKZ\fG\u000e\u0005\u0003\\9V\nU\"A\u0007\n\u0005uk!!D&bM.\f\u0007K]8ek\u000e,'/A\btQ>,H\u000e\u001a+fe6Lg.\u0019;f!\tY\u0004-\u0003\u0002bI\t9!i\\8mK\u0006t\u0017a\u00039be\u0006dG.\u001a7jg6\u0004\"a\u000f3\n\u0005\u0015$#aA%oi\u00061A(\u001b8jiz\"B\u0001[5kWB!1\fA\u001bB\u0011\u0015YC\u00011\u0001U\u0011\u0015qF\u00011\u0001`\u0011\u0015\u0011G\u00011\u0001d\u0003A\u0019'/Z1uKN+(m]2sS\n,'\u000fF\u0003o\u0003k\t\u0019\u0005E\u0003<_F\f\t#\u0003\u0002qI\t1A+\u001e9mKJ\u00122A\u001d<z\r\u0011\u0019H\u000fA9\u0003\u0019q\u0012XMZ5oK6,g\u000e\u001e \t\tU,\u0001!]\u0001\u0004_V$\bCA\u001ex\u0013\tAHE\u0001\u0004B]f\u0014VM\u001a\t\u0004uvTR\"A>\u000b\u0005q4\u0012!C8cg\u0016\u0014h/\u001a:t\u0013\tq8P\u0001\u0006Tk\n\u001c8M]5cKJD\u0011\"!\u0001s\u0005\u0004%\u0019!a\u0001\u0002\u0013M\u001c\u0007.\u001a3vY\u0016\u0014XCAA\u0003!\u0011\t9!!\u0004\u000e\u0005\u0005%!bAA\u0006\u001f\u0005IQ\r_3dkRLwN\\\u0005\u0005\u0003\u001f\tIAA\u0005TG\",G-\u001e7fe\"9\u00111\u0003:\u0005\u0002\u0005U\u0011!\u0003;fe6Lg.\u0019;f)\r!\u0015q\u0003\u0005\n\u00033\t\t\u0002\"a\u0001\u00037\t!a\u00192\u0011\tm\ni\u0002R\u0005\u0004\u0003?!#\u0001\u0003\u001fcs:\fW.\u001a \u0011\t\u0005\r\u0012q\u0006\b\u0005\u0003K\tY#\u0004\u0002\u0002()!\u0011\u0011FA\u0005\u0003-\u0019\u0017M\\2fY\u0006\u0014G.Z:\n\t\u00055\u0012qE\u0001\u0015\u0003N\u001c\u0018n\u001a8bE2,7)\u00198dK2\f'\r\\3\n\t\u0005E\u00121\u0007\u0002\u0006\u001bVdG/\u001b\u0006\u0005\u0003[\t9\u0003C\u0004\u0002\u001a\u0015\u0001\r!a\u000e\u0011\u000f\u0005\u001d\u0011\u0011HA\u001f\t&!\u00111HA\u0005\u0005!\u0019\u0015\r\u001c7cC\u000e\\\u0007cA\u000e\u0002@%\u0019\u0011\u0011I\u0014\u0003\u0013QC'o\\<bE2,\u0007bBA#\u000b\u0001\u0007\u0011QA\u0001\u0002g\u0006\t2*\u00194lCB\u0013x\u000eZ;dKJ\u001c\u0016N\\6\u0011\u0005m;1cA\u0004w#R\u0011\u0011\u0011J\u0001\u0006CB\u0004H._\u000b\u0007\u0003'\nY&a\u0018\u0015\r\u0005U\u0013\u0011OA>)\u0019\t9&!\u0019\u0002lA11\fAA-\u0003;\u00022ANA.\t\u0015A\u0014B1\u0001:!\r1\u0014q\f\u0003\u0006\u0007&\u0011\r!\u000f\u0005\b\u0003GJ\u00019AA3\u0003\u0005Y\u0005#B.\u0002h\u0005e\u0013bAA5\u001b\tQ1+\u001a:jC2L'0\u001a:\t\u000f\u00055\u0014\u0002q\u0001\u0002p\u0005\ta\u000bE\u0003\\\u0003O\ni\u0006C\u0004\u0002t%\u0001\r!!\u001e\u0002\r\r|gNZ5h!\rY\u0016qO\u0005\u0004\u0003sj!aE&bM.\f\u0007K]8ek\u000e,'oQ8oM&<\u0007bBA?\u0013\u0001\u0007\u0011QA\u0001\u0003g\u000e,b!!!\u0002\b\u0006-ECBAB\u0003\u001b\u000b\u0019\n\u0005\u0004\\\u0001\u0005\u0015\u0015\u0011\u0012\t\u0004m\u0005\u001dE!\u0002\u001d\u000b\u0005\u0004I\u0004c\u0001\u001c\u0002\f\u0012)1I\u0003b\u0001s!11F\u0003a\u0001\u0003\u001f\u0003B!\u0016-\u0002\u0012B11\fXAC\u0003\u0013CQA\u0019\u0006A\u0002\r\f1B]3bIJ+7o\u001c7wKR\u0011\u0011\u0011\u0014\t\u0005\u00037\u000b)+\u0004\u0002\u0002\u001e*!\u0011qTAQ\u0003\u0011a\u0017M\\4\u000b\u0005\u0005\r\u0016\u0001\u00026bm\u0006LA!a*\u0002\u001e\n1qJ\u00196fGR\u0004")
/* loaded from: input_file:monix/kafka/KafkaProducerSink.class */
public final class KafkaProducerSink<K, V> extends Consumer<Seq<ProducerRecord<K, V>>, BoxedUnit> implements StrictLogging {
    public final Coeval<KafkaProducer<K, V>> monix$kafka$KafkaProducerSink$$producer;
    public final boolean monix$kafka$KafkaProducerSink$$shouldTerminate;
    public final int monix$kafka$KafkaProducerSink$$parallelism;
    private final Logger logger;

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public Tuple2<Subscriber<Seq<ProducerRecord<K, V>>>, AssignableCancelable.Multi> createSubscriber(final Callback<Throwable, BoxedUnit> callback, final Scheduler scheduler) {
        return new Tuple2<>(new Subscriber<Seq<ProducerRecord<K, V>>>(this, scheduler, callback) { // from class: monix.kafka.KafkaProducerSink$$anon$1
            private final Scheduler scheduler;
            private final Coeval<KafkaProducer<K, V>> p;
            private boolean isActive;
            private final /* synthetic */ KafkaProducerSink $outer;
            private final Callback cb$2;

            public Scheduler scheduler() {
                return this.scheduler;
            }

            private Seq<Task<Option<RecordMetadata>>> sendAll(Seq<ProducerRecord<K, V>> seq) {
                return (Seq) seq.map(producerRecord -> {
                    try {
                        return ((KafkaProducer) this.p.value()).send(producerRecord);
                    } catch (Throwable th) {
                        Option unapply = NonFatal$.MODULE$.unapply(th);
                        if (unapply.isEmpty()) {
                            throw th;
                        }
                        return Task$.MODULE$.raiseError((Throwable) unapply.get());
                    }
                }, Seq$.MODULE$.canBuildFrom());
            }

            public synchronized Future<Ack> onNext(Seq<ProducerRecord<K, V>> seq) {
                Task map;
                if (!this.isActive) {
                    return Ack$Stop$.MODULE$;
                }
                if (this.$outer.monix$kafka$KafkaProducerSink$$parallelism == 1) {
                    map = Task$.MODULE$.sequence(sendAll(seq), Seq$.MODULE$.canBuildFrom());
                } else {
                    map = Task$.MODULE$.sequence(seq.sliding(this.$outer.monix$kafka$KafkaProducerSink$$parallelism, this.$outer.monix$kafka$KafkaProducerSink$$parallelism).map(seq2 -> {
                        return Task$.MODULE$.gather(this.sendAll(seq2), Seq$.MODULE$.canBuildFrom());
                    }).toList(), List$.MODULE$.canBuildFrom()).map(list -> {
                        return list.flatten(Predef$.MODULE$.$conforms());
                    });
                }
                return map.map(seq3 -> {
                    return Ack$Continue$.MODULE$;
                }).onErrorHandle(th -> {
                    if (this.$outer.logger().underlying().isErrorEnabled()) {
                        this.$outer.logger().underlying().error("Unexpected error in KafkaProducerSink", th);
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    }
                    return Ack$Continue$.MODULE$;
                }).runToFuture(scheduler());
            }

            public synchronized void terminate(Function0<BoxedUnit> function0) {
                if (this.isActive) {
                    this.isActive = false;
                    if (this.$outer.monix$kafka$KafkaProducerSink$$shouldTerminate) {
                        Task$.MODULE$.apply(() -> {
                            return ((KafkaProducer) this.p.value()).close();
                        }).flatten(Predef$.MODULE$.$conforms()).materialize().foreach(r6 -> {
                            $anonfun$terminate$2(this, function0, r6);
                            return BoxedUnit.UNIT;
                        }, scheduler());
                    } else {
                        function0.apply$mcV$sp();
                    }
                }
            }

            public void onError(Throwable th) {
                terminate(() -> {
                    this.cb$2.onError(th);
                });
            }

            public void onComplete() {
                terminate(() -> {
                    this.cb$2.onSuccess(BoxedUnit.UNIT);
                });
            }

            public static final /* synthetic */ void $anonfun$terminate$2(KafkaProducerSink$$anon$1 kafkaProducerSink$$anon$1, Function0 function0, Try r6) {
                if (r6 instanceof Success) {
                    function0.apply$mcV$sp();
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    if (!(r6 instanceof Failure)) {
                        throw new MatchError(r6);
                    }
                    Throwable exception = ((Failure) r6).exception();
                    if (kafkaProducerSink$$anon$1.$outer.logger().underlying().isErrorEnabled()) {
                        kafkaProducerSink$$anon$1.$outer.logger().underlying().error("Unexpected error in KafkaProducerSink", exception);
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    }
                    function0.apply$mcV$sp();
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.cb$2 = callback;
                this.scheduler = scheduler;
                this.p = this.monix$kafka$KafkaProducerSink$$producer.memoize();
                this.isActive = true;
            }
        }, AssignableCancelable$.MODULE$.dummy());
    }

    public KafkaProducerSink(Coeval<KafkaProducer<K, V>> coeval, boolean z, int i) {
        this.monix$kafka$KafkaProducerSink$$producer = coeval;
        this.monix$kafka$KafkaProducerSink$$shouldTerminate = z;
        this.monix$kafka$KafkaProducerSink$$parallelism = i;
        StrictLogging.$init$(this);
        Predef$.MODULE$.require(i >= 1, () -> {
            return "parallelism >= 1";
        });
    }
}
