/*
 * Decompiled with CFR 0.152.
 */
package org.locationtech.geomesa.lambda.stream.kafka;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.io.Closeable;
import java.io.Serializable;
import java.time.Clock;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.geotools.data.DataStore;
import org.geotools.data.FeatureWriter;
import org.geotools.data.Transaction;
import org.locationtech.geomesa.filter.package$;
import org.locationtech.geomesa.index.utils.Releasable;
import org.locationtech.geomesa.lambda.stream.OffsetManager;
import org.locationtech.geomesa.lambda.stream.kafka.DataStorePersistence$;
import org.locationtech.geomesa.lambda.stream.kafka.KafkaFeatureCache;
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties;
import org.locationtech.geomesa.utils.geotools.FeatureUtils$;
import org.locationtech.geomesa.utils.io.IsCloseable$;
import org.locationtech.geomesa.utils.io.package;
import org.locationtech.geomesa.utils.stats.MethodProfiling;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import org.opengis.filter.Filter;
import org.opengis.filter.Id;
import org.opengis.filter.identity.FeatureId;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;
import scala.runtime.java8.JFunction2;
import scala.util.Random$;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\u0005ug\u0001B\r\u001b\u0001\u001dB\u0001b\u0013\u0001\u0003\u0002\u0003\u0006I\u0001\u0014\u0005\t)\u0002\u0011\t\u0011)A\u0005+\"Aq\f\u0001B\u0001B\u0003%\u0001\r\u0003\u0005e\u0001\t\u0005\t\u0015!\u0003f\u0011!Y\bA!A!\u0002\u0013a\bBCA\u0007\u0001\t\u0005\t\u0015!\u0003\u0002\u0010!Q\u0011q\u0003\u0001\u0003\u0002\u0003\u0006I!!\u0007\t\u0015\u0005}\u0001A!A!\u0002\u0017\t\t\u0003C\u0004\u0002.\u0001!\t!a\f\t\u0013\u0005\u001d\u0003A1A\u0005\n\u0005%\u0003\u0002CA&\u0001\u0001\u0006I!a\u0004\t\u0013\u00055\u0003A1A\u0005\n\u0005%\u0003\u0002CA(\u0001\u0001\u0006I!a\u0004\t\u0013\u0005E\u0003A1A\u0005\n\u0005M\u0003\u0002CA3\u0001\u0001\u0006I!!\u0016\t\u0013\u0005\u001d\u0004A1A\u0005\n\u0005%\u0004\u0002CA>\u0001\u0001\u0006I!a\u001b\t\u000f\u0005-\u0005\u0001\"\u0011\u0002\u000e\"9\u0011Q\u0013\u0001\u0005\n\u0005]\u0005bBAT\u0001\u0011\u0005\u0013QR\u0004\n\u0003SS\u0012\u0011!E\u0001\u0003W3\u0001\"\u0007\u000e\u0002\u0002#\u0005\u0011Q\u0016\u0005\b\u0003[1B\u0011AA[\u0011%\t9LFI\u0001\n\u0003\tIL\u0001\u000bECR\f7\u000b^8sKB+'o]5ti\u0016t7-\u001a\u0006\u00037q\tQa[1gW\u0006T!!\b\u0010\u0002\rM$(/Z1n\u0015\ty\u0002%\u0001\u0004mC6\u0014G-\u0019\u0006\u0003C\t\nqaZ3p[\u0016\u001c\u0018M\u0003\u0002$I\u0005aAn\\2bi&|g\u000e^3dQ*\tQ%A\u0002pe\u001e\u001c\u0001a\u0005\u0004\u0001QA\u001a\u0014(\u0011\t\u0003S9j\u0011A\u000b\u0006\u0003W1\nA\u0001\\1oO*\tQ&\u0001\u0003kCZ\f\u0017BA\u0018+\u0005\u0019y%M[3diB\u0011\u0011&M\u0005\u0003e)\u0012\u0001BU;o]\u0006\u0014G.\u001a\t\u0003i]j\u0011!\u000e\u0006\u0003m1\n!![8\n\u0005a*$!C\"m_N,\u0017M\u00197f!\tQt(D\u0001<\u0015\taT(A\u0003ti\u0006$8O\u0003\u0002?A\u0005)Q\u000f^5mg&\u0011\u0001i\u000f\u0002\u0010\u001b\u0016$\bn\u001c3Qe>4\u0017\u000e\\5oOB\u0011!)S\u0007\u0002\u0007*\u0011A)R\u0001\rg\u000e\fG.\u00197pO\u001eLgn\u001a\u0006\u0003\r\u001e\u000b\u0001\u0002^=qKN\fg-\u001a\u0006\u0002\u0011\u0006\u00191m\\7\n\u0005)\u001b%a\u0003'bufdunZ4j]\u001e\f!\u0001Z:\u0011\u00055\u0013V\"\u0001(\u000b\u0005=\u0003\u0016\u0001\u00023bi\u0006T!!\u0015\u0013\u0002\u0011\u001d,w\u000e^8pYNL!a\u0015(\u0003\u0013\u0011\u000bG/Y*u_J,\u0017aA:giB\u0011a+X\u0007\u0002/*\u0011\u0001,W\u0001\u0007g&l\u0007\u000f\\3\u000b\u0005i[\u0016a\u00024fCR,(/\u001a\u0006\u00039\u0012\nqa\u001c9f]\u001eL7/\u0003\u0002_/\n\t2+[7qY\u00164U-\u0019;ve\u0016$\u0016\u0010]3\u0002\u001b=4gm]3u\u001b\u0006t\u0017mZ3s!\t\t'-D\u0001\u001d\u0013\t\u0019GDA\u0007PM\u001a\u001cX\r^'b]\u0006<WM]\u0001\u0006G\u0006\u001c\u0007.\u001a\t\u0003Mbt!a\u001a<\u000f\u0005!,hBA5u\u001d\tQ7O\u0004\u0002le:\u0011A.\u001d\b\u0003[Bl\u0011A\u001c\u0006\u0003_\u001a\na\u0001\u0010:p_Rt\u0014\"A\u0013\n\u0005\r\"\u0013BA\u0011#\u0013\ty\u0002%\u0003\u0002\u001e=%\u00111\u0004H\u0005\u0003oj\t\u0011cS1gW\u00064U-\u0019;ve\u0016\u001c\u0015m\u00195f\u0013\tI(P\u0001\u000bFqBL'/\u001b8h\r\u0016\fG/\u001e:f\u0007\u0006\u001c\u0007.\u001a\u0006\u0003oj\tQ\u0001^8qS\u000e\u00042!`A\u0004\u001d\rq\u00181\u0001\t\u0003[~T!!!\u0001\u0002\u000bM\u001c\u0017\r\\1\n\u0007\u0005\u0015q0\u0001\u0004Qe\u0016$WMZ\u0005\u0005\u0003\u0013\tYA\u0001\u0004TiJLgn\u001a\u0006\u0004\u0003\u000by\u0018\u0001D1hK>3g-T5mY&\u001c\b\u0003BA\t\u0003'i\u0011a`\u0005\u0004\u0003+y(\u0001\u0002'p]\u001e\fa\u0002]3sg&\u001cH/\u0012=qSJ,G\r\u0005\u0003\u0002\u0012\u0005m\u0011bAA\u000f\u007f\n9!i\\8mK\u0006t\u0017!B2m_\u000e\\\u0007\u0003BA\u0012\u0003Si!!!\n\u000b\u0007\u0005\u001dB&\u0001\u0003uS6,\u0017\u0002BA\u0016\u0003K\u0011Qa\u00117pG.\fa\u0001P5oSRtD\u0003EA\u0019\u0003s\tY$!\u0010\u0002@\u0005\u0005\u00131IA#)\u0011\t\u0019$a\u000e\u0011\u0007\u0005U\u0002!D\u0001\u001b\u0011%\ty\"\u0003I\u0001\u0002\b\t\t\u0003C\u0003L\u0013\u0001\u0007A\nC\u0003U\u0013\u0001\u0007Q\u000bC\u0003`\u0013\u0001\u0007\u0001\rC\u0003e\u0013\u0001\u0007Q\rC\u0003|\u0013\u0001\u0007A\u0010C\u0004\u0002\u000e%\u0001\r!a\u0004\t\u000f\u0005]\u0011\u00021\u0001\u0002\u001a\u0005IaM]3rk\u0016t7-_\u000b\u0003\u0003\u001f\t!B\u001a:fcV,gnY=!\u0003-awnY6US6,w.\u001e;\u0002\u00191|7m\u001b+j[\u0016|W\u000f\u001e\u0011\u0002\u0011\u0015DXmY;u_J,\"!!\u0016\u0011\t\u0005]\u0013\u0011M\u0007\u0003\u00033RA!a\u0017\u0002^\u0005Q1m\u001c8dkJ\u0014XM\u001c;\u000b\u0007\u0005}C&\u0001\u0003vi&d\u0017\u0002BA2\u00033\u0012\u0001dU2iK\u0012,H.\u001a3Fq\u0016\u001cW\u000f^8s'\u0016\u0014h/[2f\u0003%)\u00070Z2vi>\u0014\b%\u0001\u0005tG\",G-\u001e7f+\t\tY\u0007\r\u0003\u0002n\u0005]\u0004CBA,\u0003_\n\u0019(\u0003\u0003\u0002r\u0005e#aD*dQ\u0016$W\u000f\\3e\rV$XO]3\u0011\t\u0005U\u0014q\u000f\u0007\u0001\t-\tI(EA\u0001\u0002\u0003\u0015\t!! \u0003\u0005}\u0002\u0014!C:dQ\u0016$W\u000f\\3!#\u0011\ty(!\"\u0011\t\u0005E\u0011\u0011Q\u0005\u0004\u0003\u0007{(a\u0002(pi\"Lgn\u001a\t\u0005\u0003#\t9)C\u0002\u0002\n~\u00141!\u00118z\u0003\r\u0011XO\u001c\u000b\u0003\u0003\u001f\u0003B!!\u0005\u0002\u0012&\u0019\u00111S@\u0003\tUs\u0017\u000e^\u0001\ba\u0016\u00148/[:u)\u0019\ty)!'\u0002$\"9\u00111T\nA\u0002\u0005u\u0015!\u00039beRLG/[8o!\u0011\t\t\"a(\n\u0007\u0005\u0005vPA\u0002J]RDq!!*\u0014\u0001\u0004\ty!\u0001\u0004fqBL'/_\u0001\u0006G2|7/Z\u0001\u0015\t\u0006$\u0018m\u0015;pe\u0016\u0004VM]:jgR,gnY3\u0011\u0007\u0005UbcE\u0002\u0017\u0003_\u0003B!!\u0005\u00022&\u0019\u00111W@\u0003\r\u0005s\u0017PU3g)\t\tY+A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H\u0005\u000f\u000b\u0011\u0003w\u000by-!5\u0002T\u0006U\u0017q[Am\u00037TC!!\t\u0002>.\u0012\u0011q\u0018\t\u0005\u0003\u0003\fY-\u0004\u0002\u0002D*!\u0011QYAd\u0003%)hn\u00195fG.,GMC\u0002\u0002J~\f!\"\u00198o_R\fG/[8o\u0013\u0011\ti-a1\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW\rC\u0003L1\u0001\u0007A\nC\u0003U1\u0001\u0007Q\u000bC\u0003`1\u0001\u0007\u0001\rC\u0003e1\u0001\u0007Q\rC\u0003|1\u0001\u0007A\u0010C\u0004\u0002\u000ea\u0001\r!a\u0004\t\u000f\u0005]\u0001\u00041\u0001\u0002\u001a\u0001")
public class DataStorePersistence
implements Runnable,
Closeable,
MethodProfiling {
    private final DataStore ds;
    private final SimpleFeatureType sft;
    private final OffsetManager offsetManager;
    private final KafkaFeatureCache.ExpiringFeatureCache cache;
    private final String topic;
    private final long ageOffMillis;
    private final boolean persistExpired;
    private final Clock clock;
    private final long frequency;
    private final long lockTimeout;
    private final ScheduledExecutorService executor;
    private final ScheduledFuture<?> schedule;
    private Logger logger;
    private volatile boolean bitmap$0;

    public static Clock $lessinit$greater$default$8(DataStore dataStore, SimpleFeatureType simpleFeatureType, OffsetManager offsetManager, KafkaFeatureCache.ExpiringFeatureCache expiringFeatureCache, String string, long l, boolean bl) {
        return DataStorePersistence$.MODULE$.$lessinit$greater$default$8(dataStore, simpleFeatureType, offsetManager, expiringFeatureCache, string, l, bl);
    }

    public <R> R profile(Function1<Object, BoxedUnit> onComplete, Function0<R> code) {
        return (R)MethodProfiling.profile$((MethodProfiling)this, onComplete, code);
    }

    public <R> R profile(Function2<R, Object, BoxedUnit> onComplete, Function0<R> code) {
        return (R)MethodProfiling.profile$((MethodProfiling)this, onComplete, code);
    }

    public <R> R profile(String message, Function0<R> code) {
        return (R)MethodProfiling.profile$((MethodProfiling)this, (String)message, code);
    }

    private Logger logger$lzycompute() {
        DataStorePersistence dataStorePersistence = this;
        synchronized (dataStorePersistence) {
            if (!this.bitmap$0) {
                this.logger = LazyLogging.logger$((LazyLogging)this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$0 ? this.logger$lzycompute() : this.logger;
    }

    private long frequency() {
        return this.frequency;
    }

    private long lockTimeout() {
        return this.lockTimeout;
    }

    private ScheduledExecutorService executor() {
        return this.executor;
    }

    private ScheduledFuture<?> schedule() {
        return this.schedule;
    }

    @Override
    public void run() {
        BoxedUnit boxedUnit;
        Seq<Object> expired = this.cache.expired(this.clock.millis() - this.ageOffMillis);
        if (this.logger().underlying().isTraceEnabled()) {
            this.logger().underlying().trace(new StringBuilder(47).append("Found partition(s) with expired entries in [").append(this.topic).append("]: ").append((Object)(expired.isEmpty() ? "none" : expired.mkString(","))).toString());
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        ((IterableLike)Random$.MODULE$.shuffle(expired, Seq$.MODULE$.canBuildFrom())).foreach((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)partition -> {
            BoxedUnit boxedUnit;
            if (this.logger().underlying().isTraceEnabled()) {
                this.logger().underlying().trace("Acquiring lock for [{}:{}]", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition)});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            Option<Releasable> option = $this.offsetManager.acquireLock($this.topic, partition, this.lockTimeout());
            if (None$.MODULE$.equals(option)) {
                BoxedUnit boxedUnit2;
                if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace("Could not acquire lock for [{}:{}] within {}ms", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)this.lockTimeout())});
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit3 = boxedUnit2;
            } else if (option instanceof Some) {
                BoxedUnit boxedUnit4;
                BoxedUnit boxedUnit5;
                Some some = (Some)option;
                Releasable lock = (Releasable)some.value();
                try {
                    BoxedUnit boxedUnit6;
                    if (this.logger().underlying().isTraceEnabled()) {
                        this.logger().underlying().trace("Acquired lock for [{}:{}]", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition)});
                        boxedUnit6 = BoxedUnit.UNIT;
                    } else {
                        boxedUnit6 = BoxedUnit.UNIT;
                    }
                    this.persist(partition, $this.clock.millis() - $this.ageOffMillis);
                    boxedUnit5 = BoxedUnit.UNIT;
                }
                catch (Throwable throwable) {
                    BoxedUnit boxedUnit7;
                    lock.release();
                    if (this.logger().underlying().isTraceEnabled()) {
                        this.logger().underlying().trace("Released lock for [{}:{}]", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition)});
                        boxedUnit7 = BoxedUnit.UNIT;
                    } else {
                        boxedUnit7 = BoxedUnit.UNIT;
                    }
                    throw throwable;
                }
                lock.release();
                if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace("Released lock for [{}:{}]", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition)});
                    boxedUnit4 = BoxedUnit.UNIT;
                } else {
                    boxedUnit4 = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit8 = boxedUnit5;
            } else {
                throw new MatchError(option);
            }
        });
    }

    private void persist(int partition, long expiry) {
        block15: {
            BoxedUnit boxedUnit;
            Object object;
            BoxedUnit boxedUnit2;
            BoxedUnit boxedUnit3;
            Tuple2<Object, Seq<Tuple2<Object, SimpleFeature>>> tuple2 = this.cache.expired(partition, expiry);
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            long nextOffset = tuple2._1$mcJ$sp();
            Seq expired = (Seq)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)BoxesRunTime.boxToLong((long)nextOffset), (Object)expired);
            Tuple2 tuple23 = tuple22;
            long nextOffset2 = tuple23._1$mcJ$sp();
            Seq expired2 = (Seq)tuple23._2();
            if (this.logger().underlying().isTraceEnabled()) {
                this.logger().underlying().trace(new StringBuilder(32).append("Found expired entries for [").append(this.topic).append(":").append(partition).append("]:\n\t").append(((TraversableOnce)expired2.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
                    Tuple2 tuple2 = x0$1;
                    if (tuple2 == null) {
                        throw new MatchError((Object)tuple2);
                    }
                    long o = tuple2._1$mcJ$sp();
                    SimpleFeature f = (SimpleFeature)tuple2._2();
                    String string = new StringBuilder(9).append("offset ").append(o).append(": ").append(f).toString();
                    return string;
                }, Seq$.MODULE$.canBuildFrom())).mkString("\n\t")).toString());
                boxedUnit3 = BoxedUnit.UNIT;
            } else {
                boxedUnit3 = BoxedUnit.UNIT;
            }
            long lastOffset = this.offsetManager.getOffset(this.topic, partition);
            if (this.logger().underlying().isTraceEnabled()) {
                this.logger().underlying().trace("Last persisted offsets for [{}:{}]: {}", new Object[]{this.topic, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)lastOffset)});
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            if (expired2.nonEmpty()) {
                BoxedUnit boxedUnit4;
                Map toPersist = (Map)Map$.MODULE$.apply((Seq)expired2.collect((PartialFunction)new scala.Serializable(null, lastOffset){
                    public static final long serialVersionUID = 0L;
                    private final long lastOffset$1;

                    /*
                     * Enabled aggressive block sorting
                     */
                    public final <A1 extends Tuple2<Object, SimpleFeature>, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                        Object object;
                        A1 A1 = x1;
                        if (A1 != null) {
                            long o = A1._1$mcJ$sp();
                            SimpleFeature f = (SimpleFeature)A1._2();
                            if (o > this.lastOffset$1) {
                                object = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)f.getID()), (Object)new Tuple2((Object)BoxesRunTime.boxToLong((long)o), (Object)f));
                                return (B1)object;
                            }
                        }
                        object = function1.apply(x1);
                        return (B1)object;
                    }

                    public final boolean isDefinedAt(Tuple2<Object, SimpleFeature> x1) {
                        long o;
                        Tuple2<Object, SimpleFeature> tuple2 = x1;
                        boolean bl = tuple2 != null && (o = tuple2._1$mcJ$sp()) > this.lastOffset$1;
                        return bl;
                    }
                    {
                        this.lastOffset$1 = lastOffset$1;
                    }
                }, Seq$.MODULE$.canBuildFrom()));
                if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace("Offsets to persist for [{}:{}]: {}", new Object[]{this.topic, BoxesRunTime.boxToInteger((int)partition), ((TraversableOnce)((TraversableOnce)toPersist.values().map((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToLong((long)x$4._1$mcJ$sp()), Iterable$.MODULE$.canBuildFrom())).toSeq().sorted((Ordering)Ordering.Long$.MODULE$)).mkString(",")});
                    boxedUnit4 = BoxedUnit.UNIT;
                } else {
                    boxedUnit4 = BoxedUnit.UNIT;
                }
                if (!this.persistExpired) {
                    if (this.logger().underlying().isTraceEnabled()) {
                        this.logger().underlying().trace("Persist disabled for {}", new Object[]{this.topic});
                        object = BoxedUnit.UNIT;
                    } else {
                        object = BoxedUnit.UNIT;
                    }
                } else {
                    this.profile((Function2)(JFunction2.mcVJJ.sp & Serializable & scala.Serializable)(modified, time) -> this.complete$1(modified, time), (Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> {
                        Id filter = package$.MODULE$.ff().id((FeatureId[])((TraversableOnce)toPersist.keys().map((Function1 & Serializable & scala.Serializable)x$1 -> package$.MODULE$.ff().featureId(x$1), Iterable$.MODULE$.canBuildFrom())).toSeq().toArray(ClassTag$.MODULE$.apply(FeatureId.class)));
                        return BoxesRunTime.unboxToLong((Object)package.WithClose$.MODULE$.apply((Object)$this.ds.getFeatureWriter($this.sft.getTypeName(), (Filter)filter, Transaction.AUTO_COMMIT), (Function1 & Serializable & scala.Serializable)writer -> BoxesRunTime.boxToLong((long)DataStorePersistence.$anonfun$persist$6(this, toPersist, partition, writer)), IsCloseable$.MODULE$.closeableIsCloseable()));
                    });
                    object = toPersist.nonEmpty() ? this.profile((Function2)(JFunction2.mcVJJ.sp & Serializable & scala.Serializable)(appended, time) -> this.complete$2(appended, time), (Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> BoxesRunTime.unboxToLong((Object)package.WithClose$.MODULE$.apply((Object)$this.ds.getFeatureWriterAppend($this.sft.getTypeName(), Transaction.AUTO_COMMIT), (Function1 & Serializable & scala.Serializable)writer -> BoxesRunTime.boxToLong((long)DataStorePersistence.$anonfun$persist$10(this, toPersist, partition, writer)), IsCloseable$.MODULE$.closeableIsCloseable()))) : BoxedUnit.UNIT;
                }
            } else {
                object = BoxedUnit.UNIT;
            }
            if (nextOffset2 <= lastOffset) break block15;
            if (this.logger().underlying().isTraceEnabled()) {
                this.logger().underlying().trace("Committing offset [{}:{}:{}]", new Object[]{this.topic, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)nextOffset2)});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            this.offsetManager.setOffset(this.topic, partition, nextOffset2);
        }
    }

    @Override
    public void close() {
        this.schedule().cancel(true);
        this.executor().shutdownNow();
        this.executor().awaitTermination(1L, TimeUnit.SECONDS);
    }

    private final void complete$1(long modified, long time) {
        BoxedUnit boxedUnit;
        if (this.logger().underlying().isDebugEnabled()) {
            this.logger().underlying().debug("Wrote {} updated feature(s) to persistent storage in {}ms", new Object[]{BoxesRunTime.boxToLong((long)modified), BoxesRunTime.boxToLong((long)time)});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ long $anonfun$persist$6(DataStorePersistence $this, Map toPersist$1, int partition$1, FeatureWriter writer) {
        long count = 0L;
        while (writer.hasNext()) {
            SimpleFeature next = (SimpleFeature)writer.next();
            toPersist$1.get((Object)next.getID()).foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
                SimpleFeature updated;
                Tuple2 tuple2 = x0$2;
                if (tuple2 != null) {
                    BoxedUnit boxedUnit;
                    long offset = tuple2._1$mcJ$sp();
                    updated = (SimpleFeature)tuple2._2();
                    if ($this.logger().underlying().isTraceEnabled()) {
                        $this.logger().underlying().trace("Persistent store modify [{}:{}:{}] {}", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition$1), BoxesRunTime.boxToLong((long)offset), updated});
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        boxedUnit = BoxedUnit.UNIT;
                    }
                    FeatureUtils$.MODULE$.copyToFeature(next, updated, true);
                    try {
                        writer.write();
                    }
                    catch (Throwable throwable) {
                        Throwable throwable2 = throwable;
                        Option option = NonFatal$.MODULE$.unapply(throwable2);
                        if (!option.isEmpty()) {
                            BoxedUnit boxedUnit2;
                            Throwable e = (Throwable)option.get();
                            if ($this.logger().underlying().isErrorEnabled()) {
                                $this.logger().underlying().error(new StringBuilder(26).append("Error persisting feature: ").append(updated).toString(), e);
                                boxedUnit2 = BoxedUnit.UNIT;
                            } else {
                                boxedUnit2 = BoxedUnit.UNIT;
                            }
                            BoxedUnit boxedUnit3 = boxedUnit2;
                        }
                        throw throwable;
                    }
                } else {
                    throw new MatchError((Object)tuple2);
                }
                Option option = toPersist$1.remove((Object)updated.getID());
                return option;
            });
            ++count;
        }
        return count;
    }

    private final void complete$2(long appended, long time) {
        BoxedUnit boxedUnit;
        if (this.logger().underlying().isDebugEnabled()) {
            this.logger().underlying().debug("Wrote {} new feature(s) to persistent storage in {}ms", new Object[]{BoxesRunTime.boxToLong((long)appended), BoxesRunTime.boxToLong((long)time)});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$persist$11(DataStorePersistence $this, int partition$1, FeatureWriter writer$2, LongRef count$1, Tuple2 x0$3) {
        Tuple2 tuple2 = x0$3;
        if (tuple2 != null) {
            SimpleFeature simpleFeature;
            BoxedUnit boxedUnit;
            long offset = tuple2._1$mcJ$sp();
            SimpleFeature updated = (SimpleFeature)tuple2._2();
            if ($this.logger().underlying().isTraceEnabled()) {
                $this.logger().underlying().trace("Persistent store append [{}:{}:{}] {}", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition$1), BoxesRunTime.boxToLong((long)offset), updated});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            try {
                simpleFeature = FeatureUtils$.MODULE$.write(writer$2, updated, true);
            }
            catch (Throwable throwable) {
                BoxedUnit boxedUnit2;
                Throwable throwable2 = throwable;
                Option option = NonFatal$.MODULE$.unapply(throwable2);
                if (!option.isEmpty()) {
                    Throwable e = (Throwable)option.get();
                    if ($this.logger().underlying().isErrorEnabled()) {
                        $this.logger().underlying().error(new StringBuilder(26).append("Error persisting feature: ").append(updated).toString(), e);
                        boxedUnit2 = BoxedUnit.UNIT;
                    } else {
                        boxedUnit2 = BoxedUnit.UNIT;
                    }
                } else {
                    throw throwable;
                }
                BoxedUnit boxedUnit3 = boxedUnit2;
                simpleFeature = boxedUnit3;
            }
            ++count$1.elem;
        } else {
            throw new MatchError((Object)tuple2);
        }
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ long $anonfun$persist$10(DataStorePersistence $this, Map toPersist$1, int partition$1, FeatureWriter writer) {
        LongRef count = LongRef.create((long)0L);
        toPersist$1.values().foreach((Function1 & Serializable & scala.Serializable)x0$3 -> {
            DataStorePersistence.$anonfun$persist$11($this, partition$1, writer, count, x0$3);
            return BoxedUnit.UNIT;
        });
        return count.elem;
    }

    public DataStorePersistence(DataStore ds, SimpleFeatureType sft, OffsetManager offsetManager, KafkaFeatureCache.ExpiringFeatureCache cache, String topic, long ageOffMillis, boolean persistExpired, Clock clock) {
        this.ds = ds;
        this.sft = sft;
        this.offsetManager = offsetManager;
        this.cache = cache;
        this.topic = topic;
        this.ageOffMillis = ageOffMillis;
        this.persistExpired = persistExpired;
        this.clock = clock;
        LazyLogging.$init$((LazyLogging)this);
        MethodProfiling.$init$((MethodProfiling)this);
        this.frequency = BoxesRunTime.unboxToLong((Object)new GeoMesaSystemProperties.SystemProperty("geomesa.lambda.persist.interval", GeoMesaSystemProperties.SystemProperty$.MODULE$.apply$default$2()).toDuration().map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToLong((long)x$1.toMillis())).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> 60000L));
        this.lockTimeout = BoxesRunTime.unboxToLong((Object)new GeoMesaSystemProperties.SystemProperty("geomesa.lambda.persist.lock.timeout", GeoMesaSystemProperties.SystemProperty$.MODULE$.apply$default$2()).toDuration().map((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToLong((long)x$2.toMillis())).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> 1000L));
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.schedule = this.executor().scheduleWithFixedDelay(this, this.frequency(), this.frequency(), TimeUnit.MILLISECONDS);
    }
}

