package kafkareactive.sink.batch;

import com.typesafe.config.Config;
import com.typesafe.scalalogging.Logger;
import kafkareactive.sink.Put;
import kafkareactive.sink.SinkPipe;
import kafkareactive.sink.SinkProcess;
import kafkareactive.sink.batch.TopicBatch;
import monix.execution.Cancelable;
import monix.execution.CancelableFuture;
import monix.reactive.Observable;
import monix.reactive.Observable$;
import monix.reactive.Observer;
import monix.reactive.subjects.ConcurrentSubject;
import monix.reactive.subjects.ConcurrentSubject$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: TopicBatchSinkProcess.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%d\u0001B\u0001\u0003\u0005%\u0011Q\u0003V8qS\u000e\u0014\u0015\r^2i'&t7\u000e\u0015:pG\u0016\u001c8O\u0003\u0002\u0004\t\u0005)!-\u0019;dQ*\u0011QAB\u0001\u0005g&t7NC\u0001\b\u00035Y\u0017MZ6be\u0016\f7\r^5wK\u000e\u00011c\u0001\u0001\u000b!A\u00111BD\u0007\u0002\u0019)\tQ\"A\u0003tG\u0006d\u0017-\u0003\u0002\u0010\u0019\t1\u0011I\\=SK\u001a\u0004\"!\u0005\n\u000e\u0003\u0011I!a\u0005\u0003\u0003\u0017MKgn\u001b)s_\u000e,7o\u001d\u0005\u0006+\u0001!\tAF\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003]\u0001\"\u0001\u0007\u0001\u000e\u0003\t)AA\u0007\u0001!7\t1!+\u001a;WC2\u00042\u0001H\u0011$\u001b\u0005i\"B\u0001\u0010 \u0003%)\u00070Z2vi&|gNC\u0001!\u0003\u0015iwN\\5y\u0013\t\u0011SD\u0001\tDC:\u001cW\r\\1cY\u00164U\u000f^;sKB\u00111\u0002J\u0005\u0003K1\u0011A!\u00168ji\")q\u0005\u0001C!Q\u0005)1\u000f^1siR\u00191$\u000b\u001b\t\u000b)2\u0003\u0019A\u0016\u0002\r\r|gNZ5h!\ta#'D\u0001.\u0015\tQcF\u0003\u00020a\u0005AA/\u001f9fg\u00064WMC\u00012\u0003\r\u0019w.\\\u0005\u0003g5\u0012aaQ8oM&<\u0007\"B\u001b'\u0001\u00041\u0014!B6bM.\f\u0007CA\t8\u0013\tADA\u0001\u0005TS:\\\u0007+\u001b9f\u000f\u0015Q$\u0001#\u0001<\u0003U!v\u000e]5d\u0005\u0006$8\r[*j].\u0004&o\\2fgN\u0004\"\u0001\u0007\u001f\u0007\u000b\u0005\u0011\u0001\u0012A\u001f\u0014\u0007qRa\b\u0005\u0002@\u00056\t\u0001I\u0003\u0002B]\u0005a1oY1mC2|wmZ5oO&\u00111\t\u0011\u0002\u000e'R\u0014\u0018n\u0019;M_\u001e<\u0017N\\4\t\u000bUaD\u0011A#\u0015\u0003mBQa\n\u001f\u0005\u0002\u001d#2a\u0007%J\u0011\u0015Qc\t1\u0001,\u0011\u0015)d\t1\u00017\u0011\u0015YE\b\"\u0001M\u0003\u0015\t\u0007\u000f\u001d7z)\u0015i\u0015QMA4!\tqu*D\u0001=\r\u0011\u0001F\bA)\u0003\u0015\t\u000bGo\u00195GY><8o\u0005\u0002P\u0015!A1k\u0014BC\u0002\u0013\u0005A+\u0001\u0005tKR$\u0018N\\4t+\u0005)\u0006C\u0001\rW\u0013\t9&A\u0001\nU_BL7MQ1uG\"\u001cV\r\u001e;j]\u001e\u001c\b\u0002C-P\u0005\u0003\u0005\u000b\u0011B+\u0002\u0013M,G\u000f^5oON\u0004\u0003\"B\u000bP\t\u0003YFCA']\u0011\u0015\u0019&\f1\u0001V\u0011\u001dqvJ1A\u0005\u0002}\u000bqA]3d_J$7/F\u0001a!\r\tGMZ\u0007\u0002E*\u00111mH\u0001\te\u0016\f7\r^5wK&\u0011QM\u0019\u0002\u000b\u001f\n\u001cXM\u001d<bE2,\u0007CA4q\u001b\u0005A'BA\u0003j\u0015\tQ7.A\u0004d_:tWm\u0019;\u000b\u0005Ub'BA7o\u0003\u0019\t\u0007/Y2iK*\tq.A\u0002pe\u001eL!!\u001d5\u0003\u0015MKgn\u001b*fG>\u0014H\r\u0003\u0004t\u001f\u0002\u0006I\u0001Y\u0001\te\u0016\u001cwN\u001d3tA!AQo\u0014EC\u0002\u0013\u0005a/A\u0004m_\u001e\u0014\u0016\r^3\u0016\u0003mA\u0001\u0002_(\t\u0006\u0004%\t!_\u0001\u0015e\u0016\u001cwN\u001d3t\u0003N\u0014\u0015\r^2i\u000bZ,g\u000e^:\u0016\u0003i\u00042!\u00193|!\tAB0\u0003\u0002~\u0005\tQ!)\u0019;dQ\u00163XM\u001c;\t\u0011}|%\u0019!C\u0005\u0003\u0003\t!CY1uG\",e/\u001a8ugN+(M[3diV\u0011\u00111\u0001\t\u0007\u0003\u000b\tYa_>\u000e\u0005\u0005\u001d!bAA\u0005E\u0006A1/\u001e2kK\u000e$8/\u0003\u0003\u0002\u000e\u0005\u001d!!E\"p]\u000e,(O]3oiN+(M[3di\"A\u0011\u0011C(!\u0002\u0013\t\u0019!A\ncCR\u001c\u0007.\u0012<f]R\u001c8+\u001e2kK\u000e$\b\u0005\u0003\u0005\u0002\u0016=\u0013\r\u0011\"\u0005z\u0003A\u0011\u0017\r^2i\u000bZ,g\u000e^(viB,H\u000fC\u0004\u0002\u001a=\u0003\u000b\u0011\u0002>\u0002#\t\fGo\u00195Fm\u0016tGoT;uaV$\b\u0005C\u0005\u0002\u001e=\u0013\r\u0011\"\u0005\u0002 \u0005y!-\u0019;dQ\u00163XM\u001c;J]B,H/\u0006\u0002\u0002\"A!\u0011-a\t|\u0013\r\t)C\u0019\u0002\t\u001f\n\u001cXM\u001d<fe\"A\u0011\u0011F(!\u0002\u0013\t\t#\u0001\tcCR\u001c\u0007.\u0012<f]RLe\u000e];uA!Q\u0011QF(\t\u0006\u0004%\t!a\f\u0002\u000f\t\fGo\u00195fgV\u0011\u0011\u0011\u0007\t\u0005C\u0012\f\u0019\u0004\r\u0003\u00026\u0005}\u0002#\u0002\r\u00028\u0005m\u0012bAA\u001d\u0005\tQAk\u001c9jG\n\u000bGo\u00195\u0011\t\u0005u\u0012q\b\u0007\u0001\t1\t\t%a\u000b\u0002\u0002\u0003\u0005)\u0011AA\"\u0005\ryF%M\t\u0005\u0003\u000b\nY\u0005E\u0002\f\u0003\u000fJ1!!\u0013\r\u0005\u001dqu\u000e\u001e5j]\u001e\u00042aCA'\u0013\r\ty\u0005\u0004\u0002\u0004\u0003:L\b\"CA*\u001f\"\u0015\r\u0011\"\u0001z\u0003\u001d1G.^:iKND!\"a\u0016P\u0011\u000b\u0007I\u0011AA-\u0003E1G.^:i'V\u00147o\u0019:jaRLwN\\\u000b\u0003\u00037\u00022\u0001HA/\u0013\r\ty&\b\u0002\u000b\u0007\u0006t7-\u001a7bE2,\u0007\"CA2\u001f\"\u0015\r\u0011\"\u0001w\u0003-\u0011\u0017\r^2i\rV$XO]3\t\u000b)R\u0005\u0019A\u0016\t\u000bUR\u0005\u0019\u0001\u001c")
/* loaded from: input_file:kafkareactive/sink/batch/TopicBatchSinkProcess.class */
public final class TopicBatchSinkProcess implements SinkProcess {

    /* compiled from: TopicBatchSinkProcess.scala */
    /* loaded from: input_file:kafkareactive/sink/batch/TopicBatchSinkProcess$BatchFlows.class */
    public static class BatchFlows {
        private CancelableFuture<BoxedUnit> logRate;
        private Observable<BatchEvent> recordsAsBatchEvents;
        private Observable<TopicBatch<?>> batches;
        private Observable<BatchEvent> flushes;
        private Cancelable flushSubscription;
        private CancelableFuture<BoxedUnit> batchFuture;
        private final TopicBatchSettings settings;
        private final Observable<SinkRecord> records;
        private final ConcurrentSubject<BatchEvent, BatchEvent> batchEventsSubject;
        private final Observable<BatchEvent> batchEventOutput;
        private final Observer<BatchEvent> batchEventInput;
        private volatile byte bitmap$0;

        public TopicBatchSettings settings() {
            return this.settings;
        }

        public Observable<SinkRecord> records() {
            return this.records;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v0 */
        /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v10, types: [kafkareactive.sink.batch.TopicBatchSinkProcess$BatchFlows] */
        private CancelableFuture<BoxedUnit> logRate$lzycompute() {
            ?? r0 = this;
            synchronized (r0) {
                if (((byte) (this.bitmap$0 & 1)) == 0) {
                    this.logRate = records().map(sinkRecord -> {
                        return BoxesRunTime.boxToInteger($anonfun$logRate$1(sinkRecord));
                    }).bufferTimed(new package.DurationInt(package$.MODULE$.DurationInt(1)).second()).map(seq -> {
                        return BoxesRunTime.boxToInteger(seq.size());
                    }).foreach(i -> {
                        if (!TopicBatchSinkProcess$.MODULE$.logger().underlying().isInfoEnabled()) {
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        } else {
                            TopicBatchSinkProcess$.MODULE$.logger().underlying().info("Handled: {} per second", new Object[]{BoxesRunTime.boxToInteger(i)});
                            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                        }
                    }, settings().kafka().scheduler());
                    r0 = this;
                    r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
                }
            }
            return this.logRate;
        }

        public CancelableFuture<BoxedUnit> logRate() {
            return ((byte) (this.bitmap$0 & 1)) == 0 ? logRate$lzycompute() : this.logRate;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v0 */
        /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v10, types: [kafkareactive.sink.batch.TopicBatchSinkProcess$BatchFlows] */
        private Observable<BatchEvent> recordsAsBatchEvents$lzycompute() {
            ?? r0 = this;
            synchronized (r0) {
                if (((byte) (this.bitmap$0 & 2)) == 0) {
                    this.recordsAsBatchEvents = records().map(sinkRecord -> {
                        return new Append(new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue()), sinkRecord.kafkaOffset(), this.settings().formatter().bytes(sinkRecord));
                    });
                    r0 = this;
                    r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
                }
            }
            return this.recordsAsBatchEvents;
        }

        public Observable<BatchEvent> recordsAsBatchEvents() {
            return ((byte) (this.bitmap$0 & 2)) == 0 ? recordsAsBatchEvents$lzycompute() : this.recordsAsBatchEvents;
        }

        private ConcurrentSubject<BatchEvent, BatchEvent> batchEventsSubject() {
            return this.batchEventsSubject;
        }

        public Observable<BatchEvent> batchEventOutput() {
            return this.batchEventOutput;
        }

        public Observer<BatchEvent> batchEventInput() {
            return this.batchEventInput;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v0 */
        /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v10, types: [kafkareactive.sink.batch.TopicBatchSinkProcess$BatchFlows] */
        private Observable<TopicBatch<?>> batches$lzycompute() {
            ?? r0 = this;
            synchronized (r0) {
                if (((byte) (this.bitmap$0 & 4)) == 0) {
                    this.batches = batchEventOutput().scan(() -> {
                        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new TopicBatch.BuffersByTopicPartition(Predef$.MODULE$.Map().empty(), this.settings().handlerForPartition())), Seq$.MODULE$.empty());
                    }, (tuple2, batchEvent) -> {
                        Tuple2 tuple2 = new Tuple2(tuple2, batchEvent);
                        if (tuple2 != null) {
                            Tuple2 tuple22 = (Tuple2) tuple2._1();
                            BatchEvent batchEvent = (BatchEvent) tuple2._2();
                            if (tuple22 != null) {
                                return ((TopicBatch.BuffersByTopicPartition) tuple22._1()).update(batchEvent);
                            }
                        }
                        throw new MatchError(tuple2);
                    }).flatMap(tuple22 -> {
                        if (tuple22 == null) {
                            throw new MatchError(tuple22);
                        }
                        return Observable$.MODULE$.fromIterable((Seq) ((Seq) tuple22._2()).collect(new TopicBatchSinkProcess$BatchFlows$$anonfun$1(null), Seq$.MODULE$.canBuildFrom()));
                    }).share(settings().kafka().scheduler());
                    r0 = this;
                    r0.bitmap$0 = (byte) (this.bitmap$0 | 4);
                }
            }
            return this.batches;
        }

        public Observable<TopicBatch<?>> batches() {
            return ((byte) (this.bitmap$0 & 4)) == 0 ? batches$lzycompute() : this.batches;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v0 */
        /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v10, types: [kafkareactive.sink.batch.TopicBatchSinkProcess$BatchFlows] */
        private Observable<BatchEvent> flushes$lzycompute() {
            ?? r0 = this;
            synchronized (r0) {
                if (((byte) (this.bitmap$0 & 8)) == 0) {
                    this.flushes = ((Observable) Predef$.MODULE$.locally(batches().map(topicBatch -> {
                        return FlushBatchIfNonEmpty$.MODULE$;
                    }).$plus$colon(FlushBatchIfNonEmpty$.MODULE$))).share(settings().kafka().scheduler()).debounce(settings().flushFrequency());
                    r0 = this;
                    r0.bitmap$0 = (byte) (this.bitmap$0 | 8);
                }
            }
            return this.flushes;
        }

        public Observable<BatchEvent> flushes() {
            return ((byte) (this.bitmap$0 & 8)) == 0 ? flushes$lzycompute() : this.flushes;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v0 */
        /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v10, types: [kafkareactive.sink.batch.TopicBatchSinkProcess$BatchFlows] */
        private Cancelable flushSubscription$lzycompute() {
            ?? r0 = this;
            synchronized (r0) {
                if (((byte) (this.bitmap$0 & 16)) == 0) {
                    this.flushSubscription = flushes().subscribe(batchEventInput(), settings().kafka().scheduler());
                    r0 = this;
                    r0.bitmap$0 = (byte) (this.bitmap$0 | 16);
                }
            }
            return this.flushSubscription;
        }

        public Cancelable flushSubscription() {
            return ((byte) (this.bitmap$0 & 16)) == 0 ? flushSubscription$lzycompute() : this.flushSubscription;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v0 */
        /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v10, types: [kafkareactive.sink.batch.TopicBatchSinkProcess$BatchFlows] */
        private CancelableFuture<BoxedUnit> batchFuture$lzycompute() {
            ?? r0 = this;
            synchronized (r0) {
                if (((byte) (this.bitmap$0 & 32)) == 0) {
                    CancelableFuture<BoxedUnit> foreach = batches().foreach(topicBatch -> {
                        $anonfun$batchFuture$1(topicBatch);
                        return BoxedUnit.UNIT;
                    }, settings().kafka().scheduler());
                    foreach.onComplete(r2 -> {
                        $anonfun$batchFuture$2(r2);
                        return BoxedUnit.UNIT;
                    }, settings().kafka().scheduler());
                    this.batchFuture = foreach;
                    r0 = this;
                    r0.bitmap$0 = (byte) (this.bitmap$0 | 32);
                }
            }
            return this.batchFuture;
        }

        public CancelableFuture<BoxedUnit> batchFuture() {
            return ((byte) (this.bitmap$0 & 32)) == 0 ? batchFuture$lzycompute() : this.batchFuture;
        }

        public static final /* synthetic */ int $anonfun$logRate$1(SinkRecord sinkRecord) {
            return 1;
        }

        public static final /* synthetic */ void $anonfun$batchFuture$1(TopicBatch topicBatch) {
            if (!TopicBatchSinkProcess$.MODULE$.logger().underlying().isInfoEnabled()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                TopicBatchSinkProcess$.MODULE$.logger().underlying().info("Created {}", new Object[]{topicBatch});
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }

        public static final /* synthetic */ void $anonfun$batchFuture$2(Try r7) {
            BoxedUnit boxedUnit;
            BoxedUnit boxedUnit2;
            if (r7 instanceof Success) {
                BoxedUnit boxedUnit3 = (BoxedUnit) ((Success) r7).value();
                if (TopicBatchSinkProcess$.MODULE$.logger().underlying().isInfoEnabled()) {
                    TopicBatchSinkProcess$.MODULE$.logger().underlying().info("Completed with {}", new Object[]{boxedUnit3});
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = BoxedUnit.UNIT;
                }
                return;
            }
            if (!(r7 instanceof Failure)) {
                throw new MatchError(r7);
            }
            Throwable exception = ((Failure) r7).exception();
            if (TopicBatchSinkProcess$.MODULE$.logger().underlying().isErrorEnabled()) {
                TopicBatchSinkProcess$.MODULE$.logger().underlying().error("Completed with {}", exception);
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        }

        public BatchFlows(TopicBatchSettings topicBatchSettings) {
            this.settings = topicBatchSettings;
            this.records = topicBatchSettings.kafka().events().flatMap(sinkEvent -> {
                Observable empty;
                if (sinkEvent instanceof Put) {
                    empty = Observable$.MODULE$.fromIterable(((Put) sinkEvent).records());
                } else {
                    empty = Observable$.MODULE$.empty();
                }
                return empty;
            }).share(topicBatchSettings.kafka().scheduler());
            ConcurrentSubject<BatchEvent, BatchEvent> publish = ConcurrentSubject$.MODULE$.publish(topicBatchSettings.kafka().scheduler());
            recordsAsBatchEvents().subscribe(publish, topicBatchSettings.kafka().scheduler());
            this.batchEventsSubject = publish;
            this.batchEventOutput = batchEventsSubject().share(topicBatchSettings.kafka().scheduler());
            this.batchEventInput = batchEventsSubject();
        }
    }

    public static BatchFlows apply(Config config, SinkPipe sinkPipe) {
        return TopicBatchSinkProcess$.MODULE$.apply(config, sinkPipe);
    }

    public static Logger logger() {
        return TopicBatchSinkProcess$.MODULE$.logger();
    }

    @Override // kafkareactive.sink.SinkProcess
    public CancelableFuture<BoxedUnit> start(Config config, SinkPipe sinkPipe) {
        return TopicBatchSinkProcess$.MODULE$.start(config, sinkPipe);
    }
}
