package monix.kafka;

import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import monix.eval.Coeval;
import monix.eval.Task;
import monix.eval.Task$;
import monix.execution.Ack;
import monix.execution.Ack$Continue$;
import monix.execution.Ack$Stop$;
import monix.execution.Callback;
import monix.execution.Scheduler;
import monix.execution.cancelables.AssignableCancelable;
import monix.execution.cancelables.AssignableCancelable$;
import monix.reactive.Consumer;
import monix.reactive.observers.Subscriber;
import org.apache.kafka.clients.producer.ProducerRecord;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.BuildFrom$;
import scala.collection.immutable.Seq;
import scala.concurrent.Future;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.Statics;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: KafkaProducerSink.scala */
@ScalaSignature(bytes = "\u0006\u0005\t\u0015a\u0001\u0002\t\u0012\u0005YA\u0001b\f\u0001\u0003\u0002\u0003\u0006I\u0001\u0017\u0005\tE\u0002\u0011\t\u0011)A\u0005G\"Aa\r\u0001B\u0001B\u0003%q\r\u0003\u0005k\u0001\t\u0005\t\u0015!\u0003l\u0011\u0015Q\b\u0001\"\u0003|\u0011\u001d\t\u0019\u0001\u0001C\u0001\u0003\u000b9q!!\u0011\u0012\u0011\u0003\t\u0019E\u0002\u0004\u0011#!\u0005\u0011Q\t\u0005\u0007u\"!\t!a\u0017\t\u0011\u0005u\u0003\u0002)A\u0005\u0003?Bq!!\u001f\t\t\u0003\tY\bC\u0004\u0002z!!\t!!+\t\u000f\u0005e\u0004\u0002\"\u0001\u0002H\"9\u0011\u0011\u0010\u0005\u0005\u0002\u0005u\u0007\"CA{\u0011\u0005\u0005I\u0011BA|\u0005EY\u0015MZ6b!J|G-^2feNKgn\u001b\u0006\u0003%M\tQa[1gW\u0006T\u0011\u0001F\u0001\u0006[>t\u0017\u000e_\u0002\u0001+\r92HR\n\u0005\u0001aYU\u000b\u0005\u0003\u001a9yAU\"\u0001\u000e\u000b\u0005m\u0019\u0012\u0001\u0003:fC\u000e$\u0018N^3\n\u0005uQ\"\u0001C\"p]N,X.\u001a:\u0011\u0007}ICF\u0004\u0002!M9\u0011\u0011\u0005J\u0007\u0002E)\u00111%F\u0001\u0007yI|w\u000e\u001e \n\u0003\u0015\nQa]2bY\u0006L!a\n\u0015\u0002\u000fA\f7m[1hK*\tQ%\u0003\u0002+W\t\u00191+Z9\u000b\u0005\u001dB\u0003\u0003B\u00178s\u0015k\u0011A\f\u0006\u0003_A\n\u0001\u0002\u001d:pIV\u001cWM\u001d\u0006\u0003cI\nqa\u00197jK:$8O\u0003\u0002\u0013g)\u0011A'N\u0001\u0007CB\f7\r[3\u000b\u0003Y\n1a\u001c:h\u0013\tAdF\u0001\bQe>$WoY3s%\u0016\u001cwN\u001d3\u0011\u0005iZD\u0002\u0001\u0003\u0006y\u0001\u0011\r!\u0010\u0002\u0002\u0017F\u0011aH\u0011\t\u0003\u007f\u0001k\u0011\u0001K\u0005\u0003\u0003\"\u0012qAT8uQ&tw\r\u0005\u0002@\u0007&\u0011A\t\u000b\u0002\u0004\u0003:L\bC\u0001\u001eG\t\u00159\u0005A1\u0001>\u0005\u00051\u0006CA J\u0013\tQ\u0005F\u0001\u0003V]&$\bC\u0001'T\u001b\u0005i%B\u0001(P\u00031\u00198-\u00197bY><w-\u001b8h\u0015\t\u0001\u0016+\u0001\u0005usB,7/\u00194f\u0015\u0005\u0011\u0016aA2p[&\u0011A+\u0014\u0002\u000e'R\u0014\u0018n\u0019;M_\u001e<\u0017N\\4\u0011\u0005}1\u0016BA,,\u00051\u0019VM]5bY&T\u0018M\u00197f!\rIFLX\u0007\u00025*\u00111lE\u0001\u0005KZ\fG.\u0003\u0002^5\n11i\\3wC2\u0004Ba\u00181:\u000b6\t\u0011#\u0003\u0002b#\ti1*\u00194lCB\u0013x\u000eZ;dKJ\fqb\u001d5pk2$G+\u001a:nS:\fG/\u001a\t\u0003\u007f\u0011L!!\u001a\u0015\u0003\u000f\t{w\u000e\\3b]\u0006Y\u0001/\u0019:bY2,G.[:n!\ty\u0004.\u0003\u0002jQ\t\u0019\u0011J\u001c;\u0002\u0017=t7+\u001a8e\u000bJ\u0014xN\u001d\t\u0005\u007f1t\u0017/\u0003\u0002nQ\tIa)\u001e8di&|g.\r\t\u0003?=L!\u0001]\u0016\u0003\u0013QC'o\\<bE2,\u0007cA-si&\u00111O\u0017\u0002\u0005)\u0006\u001c8\u000e\u0005\u0002vq6\taO\u0003\u0002x'\u0005IQ\r_3dkRLwN\\\u0005\u0003sZ\u00141!Q2l\u0003\u0019a\u0014N\\5u}Q1A0 @��\u0003\u0003\u0001Ba\u0018\u0001:\u000b\")q&\u0002a\u00011\")!-\u0002a\u0001G\")a-\u0002a\u0001O\")!.\u0002a\u0001W\u0006\u00012M]3bi\u0016\u001cVOY:de&\u0014WM\u001d\u000b\u0007\u0003\u000f\ti#a\u000e\u0011\u000f}\nI!!\u0004\u0002\u001a%\u0019\u00111\u0002\u0015\u0003\rQ+\b\u000f\\33!\u0015\ty!!\u0006\u001f\u001b\t\t\tBC\u0002\u0002\u0014i\t\u0011b\u001c2tKJ4XM]:\n\t\u0005]\u0011\u0011\u0003\u0002\u000b'V\u00147o\u0019:jE\u0016\u0014\b\u0003BA\u000e\u0003OqA!!\b\u0002$5\u0011\u0011q\u0004\u0006\u0004\u0003C1\u0018aC2b]\u000e,G.\u00192mKNLA!!\n\u0002 \u0005!\u0012i]:jO:\f'\r\\3DC:\u001cW\r\\1cY\u0016LA!!\u000b\u0002,\t)Q*\u001e7uS*!\u0011QEA\u0010\u0011\u001d\tyC\u0002a\u0001\u0003c\t!a\u00192\u0011\u000bU\f\u0019D\u001c%\n\u0007\u0005UbO\u0001\u0005DC2d'-Y2l\u0011\u001d\tID\u0002a\u0001\u0003w\t\u0011a\u001d\t\u0004k\u0006u\u0012bAA m\nI1k\u00195fIVdWM]\u0001\u0012\u0017\u000647.\u0019)s_\u0012,8-\u001a:TS:\\\u0007CA0\t'\u0019A\u0011qI&\u0002NA\u0019q(!\u0013\n\u0007\u0005-\u0003F\u0001\u0004B]f\u0014VM\u001a\t\u0005\u0003\u001f\nI&\u0004\u0002\u0002R)!\u00111KA+\u0003\tIwN\u0003\u0002\u0002X\u0005!!.\u0019<b\u0013\r9\u0016\u0011\u000b\u000b\u0003\u0003\u0007\n!c\u001c8TK:$WI\u001d:pe\u0012+g-Y;miB)q\b\u001c8\u0002bA!\u0011L]A2\u001d\u0011\t)'a\u001d\u000f\t\u0005\u001d\u0014q\u000e\b\u0005\u0003S\niGD\u0002\"\u0003WJ\u0011\u0001F\u0005\u0003oNI1!!\u001dw\u0003\r\t5m[\u0005\u0005\u0003k\n9(\u0001\u0005D_:$\u0018N\\;f\u0015\r\t\tH^\u0001\u0006CB\u0004H._\u000b\u0007\u0003{\n))!#\u0015\r\u0005}\u00141TAS)\u0019\t\t)a#\u0002\u0016B1q\fAAB\u0003\u000f\u00032AOAC\t\u0015a4B1\u0001>!\rQ\u0014\u0011\u0012\u0003\u0006\u000f.\u0011\r!\u0010\u0005\b\u0003\u001b[\u00019AAH\u0003\u0005Y\u0005#B0\u0002\u0012\u0006\r\u0015bAAJ#\tQ1+\u001a:jC2L'0\u001a:\t\u000f\u0005]5\u0002q\u0001\u0002\u001a\u0006\ta\u000bE\u0003`\u0003#\u000b9\tC\u0004\u0002\u001e.\u0001\r!a(\u0002\r\r|gNZ5h!\ry\u0016\u0011U\u0005\u0004\u0003G\u000b\"aE&bM.\f\u0007K]8ek\u000e,'oQ8oM&<\u0007bBAT\u0017\u0001\u0007\u00111H\u0001\u0003g\u000e,b!a+\u00024\u0006]F\u0003CAW\u0003\u0003\f\u0019-!2\u0015\r\u0005=\u0016\u0011XA_!\u0019y\u0006!!-\u00026B\u0019!(a-\u0005\u000bqb!\u0019A\u001f\u0011\u0007i\n9\fB\u0003H\u0019\t\u0007Q\bC\u0004\u0002\u000e2\u0001\u001d!a/\u0011\u000b}\u000b\t*!-\t\u000f\u0005]E\u0002q\u0001\u0002@B)q,!%\u00026\"9\u0011Q\u0014\u0007A\u0002\u0005}\u0005bBAT\u0019\u0001\u0007\u00111\b\u0005\u0006U2\u0001\ra[\u000b\u0007\u0003\u0013\fy-a5\u0015\r\u0005-\u0017Q[An!\u0019y\u0006!!4\u0002RB\u0019!(a4\u0005\u000bqj!\u0019A\u001f\u0011\u0007i\n\u0019\u000eB\u0003H\u001b\t\u0007Q\b\u0003\u00040\u001b\u0001\u0007\u0011q\u001b\t\u00053r\u000bI\u000e\u0005\u0004`A\u00065\u0017\u0011\u001b\u0005\u0006M6\u0001\raZ\u000b\u0007\u0003?\f)/!;\u0015\u0011\u0005\u0005\u00181^Ay\u0003g\u0004ba\u0018\u0001\u0002d\u0006\u001d\bc\u0001\u001e\u0002f\u0012)AH\u0004b\u0001{A\u0019!(!;\u0005\u000b\u001ds!\u0019A\u001f\t\r=r\u0001\u0019AAw!\u0011IF,a<\u0011\r}\u0003\u00171]At\u0011\u00151g\u00021\u0001h\u0011\u0015Qg\u00021\u0001l\u000319(/\u001b;f%\u0016\u0004H.Y2f)\t\tI\u0010\u0005\u0003\u0002|\n\u0005QBAA\u007f\u0015\u0011\ty0!\u0016\u0002\t1\fgnZ\u0005\u0005\u0005\u0007\tiP\u0001\u0004PE*,7\r\u001e")
/* loaded from: input_file:monix/kafka/KafkaProducerSink.class */
public final class KafkaProducerSink<K, V> extends Consumer<Seq<ProducerRecord<K, V>>, BoxedUnit> implements StrictLogging {
    public final Coeval<KafkaProducer<K, V>> monix$kafka$KafkaProducerSink$$producer;
    public final boolean monix$kafka$KafkaProducerSink$$shouldTerminate;
    public final int monix$kafka$KafkaProducerSink$$parallelism;
    public final Function1<Throwable, Task<Ack>> monix$kafka$KafkaProducerSink$$onSendError;
    private Logger logger;

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public Tuple2<Subscriber<Seq<ProducerRecord<K, V>>>, AssignableCancelable.Multi> createSubscriber(final Callback<Throwable, BoxedUnit> callback, final Scheduler scheduler) {
        return new Tuple2<>(new Subscriber<Seq<ProducerRecord<K, V>>>(this, scheduler, callback) { // from class: monix.kafka.KafkaProducerSink$$anon$1
            private final Scheduler scheduler;
            private final Coeval<KafkaProducer<K, V>> p;
            private boolean isActive;
            private final /* synthetic */ KafkaProducerSink $outer;
            private final Callback cb$2;

            public Scheduler scheduler() {
                return this.scheduler;
            }

            public synchronized Future<Ack> onNext(Seq<ProducerRecord<K, V>> seq) {
                if (this.isActive) {
                    return (this.$outer.monix$kafka$KafkaProducerSink$$parallelism == 1 ? Task$.MODULE$.traverse(seq, producerRecord -> {
                        return ((KafkaProducer) this.p.value()).send(producerRecord);
                    }, BuildFrom$.MODULE$.buildFromIterableOps()) : Task$.MODULE$.wanderN(this.$outer.monix$kafka$KafkaProducerSink$$parallelism, seq, producerRecord2 -> {
                        return ((KafkaProducer) this.p.value()).send(producerRecord2);
                    })).redeemWith(th -> {
                        return (Task) this.$outer.monix$kafka$KafkaProducerSink$$onSendError.apply(th);
                    }, seq2 -> {
                        return Task$.MODULE$.pure(Ack$Continue$.MODULE$);
                    }).runToFuture(scheduler());
                }
                return Ack$Stop$.MODULE$;
            }

            public synchronized void terminate(Function0<BoxedUnit> function0) {
                if (this.isActive) {
                    this.isActive = false;
                    if (this.$outer.monix$kafka$KafkaProducerSink$$shouldTerminate) {
                        Task$.MODULE$.apply(() -> {
                            return ((KafkaProducer) this.p.value()).close();
                        }).flatten($less$colon$less$.MODULE$.refl()).materialize().foreach(r6 -> {
                            $anonfun$terminate$2(this, function0, r6);
                            return BoxedUnit.UNIT;
                        }, scheduler());
                    } else {
                        function0.apply$mcV$sp();
                    }
                }
            }

            public void onError(Throwable th) {
                terminate(() -> {
                    this.cb$2.onError(th);
                });
            }

            public void onComplete() {
                terminate(() -> {
                    this.cb$2.onSuccess(BoxedUnit.UNIT);
                });
            }

            public static final /* synthetic */ void $anonfun$terminate$2(KafkaProducerSink$$anon$1 kafkaProducerSink$$anon$1, Function0 function0, Try r6) {
                if (r6 instanceof Success) {
                    function0.apply$mcV$sp();
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    if (!(r6 instanceof Failure)) {
                        throw new MatchError(r6);
                    }
                    Throwable exception = ((Failure) r6).exception();
                    if (kafkaProducerSink$$anon$1.$outer.logger().underlying().isErrorEnabled()) {
                        kafkaProducerSink$$anon$1.$outer.logger().underlying().error("Unexpected error in KafkaProducerSink", exception);
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    }
                    function0.apply$mcV$sp();
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.cb$2 = callback;
                this.scheduler = scheduler;
                this.p = this.monix$kafka$KafkaProducerSink$$producer.memoize();
                this.isActive = true;
            }
        }, AssignableCancelable$.MODULE$.dummy());
    }

    public KafkaProducerSink(Coeval<KafkaProducer<K, V>> coeval, boolean z, int i, Function1<Throwable, Task<Ack>> function1) {
        this.monix$kafka$KafkaProducerSink$$producer = coeval;
        this.monix$kafka$KafkaProducerSink$$shouldTerminate = z;
        this.monix$kafka$KafkaProducerSink$$parallelism = i;
        this.monix$kafka$KafkaProducerSink$$onSendError = function1;
        StrictLogging.$init$(this);
        Predef$.MODULE$.require(i >= 1, () -> {
            return "parallelism >= 1";
        });
        Statics.releaseFence();
    }
}
