/*
 * Decompiled with CFR 0.152.
 */
package kafkareactive.sink.batch;

import com.typesafe.config.Config;
import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import kafkareactive.sink.Put;
import kafkareactive.sink.SinkEvent;
import kafkareactive.sink.SinkPipe;
import kafkareactive.sink.SinkProcess;
import kafkareactive.sink.batch.Append;
import kafkareactive.sink.batch.BatchEvent;
import kafkareactive.sink.batch.BatchEventHandler;
import kafkareactive.sink.batch.FlushBatchIfNonEmpty$;
import kafkareactive.sink.batch.TopicBatch;
import kafkareactive.sink.batch.TopicBatchSettings;
import kafkareactive.sink.batch.TopicBatchSinkProcess$;
import monix.execution.Cancelable;
import monix.execution.CancelableFuture;
import monix.reactive.Observable;
import monix.reactive.Observable$;
import monix.reactive.Observer;
import monix.reactive.subjects.ConcurrentSubject;
import monix.reactive.subjects.ConcurrentSubject$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction1;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

@ScalaSignature(bytes="\u0006\u0001\u0005%d\u0001B\u0001\u0003\u0005%\u0011Q\u0003V8qS\u000e\u0014\u0015\r^2i'&t7\u000e\u0015:pG\u0016\u001c8O\u0003\u0002\u0004\t\u0005)!-\u0019;dQ*\u0011QAB\u0001\u0005g&t7NC\u0001\b\u00035Y\u0017MZ6be\u0016\f7\r^5wK\u000e\u00011c\u0001\u0001\u000b!A\u00111BD\u0007\u0002\u0019)\tQ\"A\u0003tG\u0006d\u0017-\u0003\u0002\u0010\u0019\t1\u0011I\\=SK\u001a\u0004\"!\u0005\n\u000e\u0003\u0011I!a\u0005\u0003\u0003\u0017MKgn\u001b)s_\u000e,7o\u001d\u0005\u0006+\u0001!\tAF\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003]\u0001\"\u0001\u0007\u0001\u000e\u0003\t)AA\u0007\u0001!7\t1!+\u001a;WC2\u00042\u0001H\u0011$\u001b\u0005i\"B\u0001\u0010 \u0003%)\u00070Z2vi&|gNC\u0001!\u0003\u0015iwN\\5y\u0013\t\u0011SD\u0001\tDC:\u001cW\r\\1cY\u00164U\u000f^;sKB\u00111\u0002J\u0005\u0003K1\u0011A!\u00168ji\")q\u0005\u0001C!Q\u0005)1\u000f^1siR\u00191$\u000b\u001b\t\u000b)2\u0003\u0019A\u0016\u0002\r\r|gNZ5h!\ta#'D\u0001.\u0015\tQcF\u0003\u00020a\u0005AA/\u001f9fg\u00064WMC\u00012\u0003\r\u0019w.\\\u0005\u0003g5\u0012aaQ8oM&<\u0007\"B\u001b'\u0001\u00041\u0014!B6bM.\f\u0007CA\t8\u0013\tADA\u0001\u0005TS:\\\u0007+\u001b9f\u000f\u0015Q$\u0001#\u0001<\u0003U!v\u000e]5d\u0005\u0006$8\r[*j].\u0004&o\\2fgN\u0004\"\u0001\u0007\u001f\u0007\u000b\u0005\u0011\u0001\u0012A\u001f\u0014\u0007qRa\b\u0005\u0002@\u00056\t\u0001I\u0003\u0002B]\u0005a1oY1mC2|wmZ5oO&\u00111\t\u0011\u0002\u000e'R\u0014\u0018n\u0019;M_\u001e<\u0017N\\4\t\u000bUaD\u0011A#\u0015\u0003mBQa\n\u001f\u0005\u0002\u001d#2a\u0007%J\u0011\u0015Qc\t1\u0001,\u0011\u0015)d\t1\u00017\u0011\u0015YE\b\"\u0001M\u0003\u0015\t\u0007\u000f\u001d7z)\u0015i\u0015QMA4!\tqu*D\u0001=\r\u0011\u0001F\bA)\u0003\u0015\t\u000bGo\u00195GY><8o\u0005\u0002P\u0015!A1k\u0014BC\u0002\u0013\u0005A+\u0001\u0005tKR$\u0018N\\4t+\u0005)\u0006C\u0001\rW\u0013\t9&A\u0001\nU_BL7MQ1uG\"\u001cV\r\u001e;j]\u001e\u001c\b\u0002C-P\u0005\u0003\u0005\u000b\u0011B+\u0002\u0013M,G\u000f^5oON\u0004\u0003\"B\u000bP\t\u0003YFCA']\u0011\u0015\u0019&\f1\u0001V\u0011\u001dqvJ1A\u0005\u0002}\u000bqA]3d_J$7/F\u0001a!\r\tGMZ\u0007\u0002E*\u00111mH\u0001\te\u0016\f7\r^5wK&\u0011QM\u0019\u0002\u000b\u001f\n\u001cXM\u001d<bE2,\u0007CA4q\u001b\u0005A'BA\u0003j\u0015\tQ7.A\u0004d_:tWm\u0019;\u000b\u0005Ub'BA7o\u0003\u0019\t\u0007/Y2iK*\tq.A\u0002pe\u001eL!!\u001d5\u0003\u0015MKgn\u001b*fG>\u0014H\r\u0003\u0004t\u001f\u0002\u0006I\u0001Y\u0001\te\u0016\u001cwN\u001d3tA!AQo\u0014EC\u0002\u0013\u0005a/A\u0004m_\u001e\u0014\u0016\r^3\u0016\u0003mA\u0001\u0002_(\t\u0006\u0004%\t!_\u0001\u0015e\u0016\u001cwN\u001d3t\u0003N\u0014\u0015\r^2i\u000bZ,g\u000e^:\u0016\u0003i\u00042!\u00193|!\tAB0\u0003\u0002~\u0005\tQ!)\u0019;dQ\u00163XM\u001c;\t\u0011}|%\u0019!C\u0005\u0003\u0003\t!CY1uG\",e/\u001a8ugN+(M[3diV\u0011\u00111\u0001\t\u0007\u0003\u000b\tYa_>\u000e\u0005\u0005\u001d!bAA\u0005E\u0006A1/\u001e2kK\u000e$8/\u0003\u0003\u0002\u000e\u0005\u001d!!E\"p]\u000e,(O]3oiN+(M[3di\"A\u0011\u0011C(!\u0002\u0013\t\u0019!A\ncCR\u001c\u0007.\u0012<f]R\u001c8+\u001e2kK\u000e$\b\u0005\u0003\u0005\u0002\u0016=\u0013\r\u0011\"\u0005z\u0003A\u0011\u0017\r^2i\u000bZ,g\u000e^(viB,H\u000fC\u0004\u0002\u001a=\u0003\u000b\u0011\u0002>\u0002#\t\fGo\u00195Fm\u0016tGoT;uaV$\b\u0005C\u0005\u0002\u001e=\u0013\r\u0011\"\u0005\u0002 \u0005y!-\u0019;dQ\u00163XM\u001c;J]B,H/\u0006\u0002\u0002\"A!\u0011-a\t|\u0013\r\t)C\u0019\u0002\t\u001f\n\u001cXM\u001d<fe\"A\u0011\u0011F(!\u0002\u0013\t\t#\u0001\tcCR\u001c\u0007.\u0012<f]RLe\u000e];uA!Q\u0011QF(\t\u0006\u0004%\t!a\f\u0002\u000f\t\fGo\u00195fgV\u0011\u0011\u0011\u0007\t\u0005C\u0012\f\u0019\u0004\r\u0003\u00026\u0005}\u0002#\u0002\r\u00028\u0005m\u0012bAA\u001d\u0005\tQAk\u001c9jG\n\u000bGo\u00195\u0011\t\u0005u\u0012q\b\u0007\u0001\t1\t\t%a\u000b\u0002\u0002\u0003\u0005)\u0011AA\"\u0005\ryF%M\t\u0005\u0003\u000b\nY\u0005E\u0002\f\u0003\u000fJ1!!\u0013\r\u0005\u001dqu\u000e\u001e5j]\u001e\u00042aCA'\u0013\r\ty\u0005\u0004\u0002\u0004\u0003:L\b\"CA*\u001f\"\u0015\r\u0011\"\u0001z\u0003\u001d1G.^:iKND!\"a\u0016P\u0011\u000b\u0007I\u0011AA-\u0003E1G.^:i'V\u00147o\u0019:jaRLwN\\\u000b\u0003\u00037\u00022\u0001HA/\u0013\r\ty&\b\u0002\u000b\u0007\u0006t7-\u001a7bE2,\u0007\"CA2\u001f\"\u0015\r\u0011\"\u0001w\u0003-\u0011\u0017\r^2i\rV$XO]3\t\u000b)R\u0005\u0019A\u0016\t\u000bUR\u0005\u0019\u0001\u001c")
public final class TopicBatchSinkProcess
implements SinkProcess {
    public static BatchFlows apply(Config config, SinkPipe sinkPipe) {
        return TopicBatchSinkProcess$.MODULE$.apply(config, sinkPipe);
    }

    public static Logger logger() {
        return TopicBatchSinkProcess$.MODULE$.logger();
    }

    public CancelableFuture<BoxedUnit> start(Config config, SinkPipe kafka) {
        return TopicBatchSinkProcess$.MODULE$.start(config, kafka);
    }

    public static class BatchFlows {
        private CancelableFuture<BoxedUnit> logRate;
        private Observable<BatchEvent> recordsAsBatchEvents;
        private Observable<TopicBatch<?>> batches;
        private Observable<BatchEvent> flushes;
        private Cancelable flushSubscription;
        private CancelableFuture<BoxedUnit> batchFuture;
        private final TopicBatchSettings settings;
        private final Observable<SinkRecord> records;
        private final ConcurrentSubject<BatchEvent, BatchEvent> batchEventsSubject;
        private final Observable<BatchEvent> batchEventOutput;
        private final Observer<BatchEvent> batchEventInput;
        private volatile byte bitmap$0;

        public TopicBatchSettings settings() {
            return this.settings;
        }

        public Observable<SinkRecord> records() {
            return this.records;
        }

        private CancelableFuture<BoxedUnit> logRate$lzycompute() {
            BatchFlows batchFlows = this;
            synchronized (batchFlows) {
                if ((byte)(this.bitmap$0 & 1) == 0) {
                    this.logRate = this.records().map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToInteger((int)BatchFlows.$anonfun$logRate$1(x$1))).bufferTimed(new package.DurationInt(package$.MODULE$.DurationInt(1)).second()).map((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToInteger((int)x$2.size())).foreach((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)eventsPerSecond -> {
                        BoxedUnit boxedUnit;
                        if (TopicBatchSinkProcess$.MODULE$.logger().underlying().isInfoEnabled()) {
                            TopicBatchSinkProcess$.MODULE$.logger().underlying().info("Handled: {} per second", new Object[]{BoxesRunTime.boxToInteger((int)eventsPerSecond)});
                            boxedUnit = BoxedUnit.UNIT;
                        } else {
                            boxedUnit = BoxedUnit.UNIT;
                        }
                    }, this.settings().kafka().scheduler());
                    this.bitmap$0 = (byte)(this.bitmap$0 | 1);
                }
            }
            return this.logRate;
        }

        public CancelableFuture<BoxedUnit> logRate() {
            return (byte)(this.bitmap$0 & 1) == 0 ? this.logRate$lzycompute() : this.logRate;
        }

        private Observable<BatchEvent> recordsAsBatchEvents$lzycompute() {
            BatchFlows batchFlows = this;
            synchronized (batchFlows) {
                if ((byte)(this.bitmap$0 & 2) == 0) {
                    this.recordsAsBatchEvents = this.records().map((Function1 & Serializable & scala.Serializable)record -> {
                        byte[] bytes = this.settings().formatter().bytes(record);
                        TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition().intValue());
                        return new Append(topicPartition, record.kafkaOffset(), bytes);
                    });
                    this.bitmap$0 = (byte)(this.bitmap$0 | 2);
                }
            }
            return this.recordsAsBatchEvents;
        }

        public Observable<BatchEvent> recordsAsBatchEvents() {
            return (byte)(this.bitmap$0 & 2) == 0 ? this.recordsAsBatchEvents$lzycompute() : this.recordsAsBatchEvents;
        }

        private ConcurrentSubject<BatchEvent, BatchEvent> batchEventsSubject() {
            return this.batchEventsSubject;
        }

        public Observable<BatchEvent> batchEventOutput() {
            return this.batchEventOutput;
        }

        public Observer<BatchEvent> batchEventInput() {
            return this.batchEventInput;
        }

        private Observable<TopicBatch<?>> batches$lzycompute() {
            BatchFlows batchFlows = this;
            synchronized (batchFlows) {
                if ((byte)(this.bitmap$0 & 4) == 0) {
                    Observable batchState = this.batchEventOutput().scan((Function0 & Serializable & scala.Serializable)() -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicBatch.BuffersByTopicPartition((Map<TopicPartition, BatchEventHandler>)Predef$.MODULE$.Map().empty(), this.settings().handlerForPartition())), (Object)Seq$.MODULE$.empty()), (Function2 & Serializable & scala.Serializable)(x0$2, x1$1) -> {
                        BatchEvent event;
                        Tuple2 tuple2;
                        block3: {
                            Tuple2 tuple22;
                            block2: {
                                tuple22 = new Tuple2(x0$2, x1$1);
                                if (tuple22 == null) break block2;
                                tuple2 = (Tuple2)tuple22._1();
                                event = (BatchEvent)tuple22._2();
                                if (tuple2 != null) break block3;
                            }
                            throw new MatchError((Object)tuple22);
                        }
                        TopicBatch.BuffersByTopicPartition oldState = (TopicBatch.BuffersByTopicPartition)tuple2._1();
                        Tuple2<TopicBatch.BuffersByTopicPartition, Seq<BatchEventHandler.UpdateResult>> tuple23 = oldState.update(event);
                        return tuple23;
                    });
                    this.batches = batchState.flatMap((Function1 & Serializable & scala.Serializable)x0$3 -> {
                        Tuple2 tuple2 = x0$3;
                        if (tuple2 == null) {
                            throw new MatchError((Object)tuple2);
                        }
                        Seq updateResults = (Seq)tuple2._2();
                        Seq batches = (Seq)updateResults.collect((PartialFunction)new scala.Serializable(null){
                            public static final long serialVersionUID = 0L;

                            public final <A1 extends BatchEventHandler.UpdateResult, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                                TopicBatch<A> topicBatch;
                                BatchEventHandler.UpdatedStateWithZip updatedStateWithZip;
                                TopicBatch<A> batch;
                                A1 A1 = x1;
                                TopicBatch<A> topicBatch2 = A1 instanceof BatchEventHandler.UpdatedStateWithZip && (batch = (updatedStateWithZip = (BatchEventHandler.UpdatedStateWithZip)A1).entry()) != null ? (topicBatch = batch) : function1.apply(x1);
                                return (B1)topicBatch2;
                            }

                            public final boolean isDefinedAt(BatchEventHandler.UpdateResult x1) {
                                BatchEventHandler.UpdatedStateWithZip updatedStateWithZip;
                                TopicBatch<A> batch;
                                BatchEventHandler.UpdateResult updateResult = x1;
                                boolean bl = updateResult instanceof BatchEventHandler.UpdatedStateWithZip && (batch = (updatedStateWithZip = (BatchEventHandler.UpdatedStateWithZip)updateResult).entry()) != null;
                                return bl;
                            }
                        }, Seq$.MODULE$.canBuildFrom());
                        Observable observable = Observable$.MODULE$.fromIterable((Iterable)batches);
                        return observable;
                    }).share(this.settings().kafka().scheduler());
                    this.bitmap$0 = (byte)(this.bitmap$0 | 4);
                }
            }
            return this.batches;
        }

        public Observable<TopicBatch<?>> batches() {
            return (byte)(this.bitmap$0 & 4) == 0 ? this.batches$lzycompute() : this.batches;
        }

        private Observable<BatchEvent> flushes$lzycompute() {
            BatchFlows batchFlows = this;
            synchronized (batchFlows) {
                if ((byte)(this.bitmap$0 & 8) == 0) {
                    FlushBatchIfNonEmpty$ flushBatchIfNonEmpty$ = FlushBatchIfNonEmpty$.MODULE$;
                    Observable flushStream = (Observable)Predef$.MODULE$.locally((Object)this.batches().map((Function1 & Serializable & scala.Serializable)x$3 -> FlushBatchIfNonEmpty$.MODULE$).$plus$colon((Object)flushBatchIfNonEmpty$));
                    this.flushes = flushStream.share(this.settings().kafka().scheduler()).debounce(this.settings().flushFrequency());
                    this.bitmap$0 = (byte)(this.bitmap$0 | 8);
                }
            }
            return this.flushes;
        }

        public Observable<BatchEvent> flushes() {
            return (byte)(this.bitmap$0 & 8) == 0 ? this.flushes$lzycompute() : this.flushes;
        }

        private Cancelable flushSubscription$lzycompute() {
            BatchFlows batchFlows = this;
            synchronized (batchFlows) {
                if ((byte)(this.bitmap$0 & 0x10) == 0) {
                    this.flushSubscription = this.flushes().subscribe(this.batchEventInput(), this.settings().kafka().scheduler());
                    this.bitmap$0 = (byte)(this.bitmap$0 | 0x10);
                }
            }
            return this.flushSubscription;
        }

        public Cancelable flushSubscription() {
            return (byte)(this.bitmap$0 & 0x10) == 0 ? this.flushSubscription$lzycompute() : this.flushSubscription;
        }

        /*
         * WARNING - void declaration
         */
        private CancelableFuture<BoxedUnit> batchFuture$lzycompute() {
            BatchFlows batchFlows = this;
            synchronized (batchFlows) {
                if ((byte)(this.bitmap$0 & 0x20) == 0) {
                    void var2_2;
                    CancelableFuture future = this.batches().foreach((Function1 & Serializable & scala.Serializable)batchedZip -> {
                        BatchFlows.$anonfun$batchFuture$1(batchedZip);
                        return BoxedUnit.UNIT;
                    }, this.settings().kafka().scheduler());
                    future.onComplete((Function1 & Serializable & scala.Serializable)x0$4 -> {
                        BatchFlows.$anonfun$batchFuture$2(x0$4);
                        return BoxedUnit.UNIT;
                    }, (ExecutionContext)this.settings().kafka().scheduler());
                    this.batchFuture = var2_2;
                    this.bitmap$0 = (byte)(this.bitmap$0 | 0x20);
                }
            }
            return this.batchFuture;
        }

        public CancelableFuture<BoxedUnit> batchFuture() {
            return (byte)(this.bitmap$0 & 0x20) == 0 ? this.batchFuture$lzycompute() : this.batchFuture;
        }

        public static final /* synthetic */ int $anonfun$logRate$1(SinkRecord x$1) {
            return 1;
        }

        public static final /* synthetic */ void $anonfun$batchFuture$1(TopicBatch batchedZip) {
            BoxedUnit boxedUnit;
            if (TopicBatchSinkProcess$.MODULE$.logger().underlying().isInfoEnabled()) {
                TopicBatchSinkProcess$.MODULE$.logger().underlying().info("Created {}", new Object[]{batchedZip});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        }

        public static final /* synthetic */ void $anonfun$batchFuture$2(Try x0$4) {
            Try try_ = x0$4;
            if (try_ instanceof Success) {
                BoxedUnit boxedUnit;
                Success success = (Success)try_;
                BoxedUnit done = (BoxedUnit)success.value();
                if (TopicBatchSinkProcess$.MODULE$.logger().underlying().isInfoEnabled()) {
                    TopicBatchSinkProcess$.MODULE$.logger().underlying().info("Completed with {}", new Object[]{done});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit2 = boxedUnit;
            } else if (try_ instanceof Failure) {
                BoxedUnit boxedUnit;
                Failure failure = (Failure)try_;
                Throwable err = failure.exception();
                if (TopicBatchSinkProcess$.MODULE$.logger().underlying().isErrorEnabled()) {
                    TopicBatchSinkProcess$.MODULE$.logger().underlying().error("Completed with {}", err);
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit3 = boxedUnit;
            } else {
                throw new MatchError((Object)try_);
            }
        }

        /*
         * WARNING - void declaration
         */
        public BatchFlows(TopicBatchSettings settings) {
            void var2_2;
            this.settings = settings;
            this.records = settings.kafka().events().flatMap((Function1 & Serializable & scala.Serializable)x0$1 -> {
                Observable observable;
                SinkEvent sinkEvent = x0$1;
                if (sinkEvent instanceof Put) {
                    Put put = (Put)sinkEvent;
                    Seq<SinkRecord> records = put.records();
                    observable = Observable$.MODULE$.fromIterable(records);
                } else {
                    observable = Observable$.MODULE$.empty();
                }
                return observable;
            }).share(settings.kafka().scheduler());
            ConcurrentSubject shared = ConcurrentSubject$.MODULE$.publish(settings.kafka().scheduler());
            this.recordsAsBatchEvents().subscribe((Observer)shared, settings.kafka().scheduler());
            this.batchEventsSubject = var2_2;
            this.batchEventOutput = this.batchEventsSubject().share(settings.kafka().scheduler());
            this.batchEventInput = this.batchEventsSubject();
        }
    }
}

