/*
 * Decompiled with CFR 0.152.
 */
package org.locationtech.geomesa.lambda.stream.kafka;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.io.Closeable;
import java.io.Serializable;
import java.time.Clock;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.geotools.api.data.DataStore;
import org.geotools.api.data.FeatureWriter;
import org.geotools.api.data.Transaction;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.filter.Filter;
import org.geotools.api.filter.Id;
import org.geotools.api.filter.identity.FeatureId;
import org.locationtech.geomesa.filter.package$;
import org.locationtech.geomesa.lambda.stream.OffsetManager;
import org.locationtech.geomesa.lambda.stream.kafka.DataStorePersistence$;
import org.locationtech.geomesa.lambda.stream.kafka.KafkaFeatureCache;
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties;
import org.locationtech.geomesa.utils.geotools.FeatureUtils$;
import org.locationtech.geomesa.utils.io.IsCloseable$;
import org.locationtech.geomesa.utils.io.package;
import org.locationtech.geomesa.utils.stats.MethodProfiling;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;
import scala.runtime.java8.JFunction2;
import scala.util.Random$;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\u0005ug\u0001B\r\u001b\u0001\u001dB\u0001b\u0013\u0001\u0003\u0002\u0003\u0006I\u0001\u0014\u0005\t-\u0002\u0011\t\u0011)A\u0005/\"Aq\f\u0001B\u0001B\u0003%\u0001\r\u0003\u0005e\u0001\t\u0005\t\u0015!\u0003f\u0011!Y\bA!A!\u0002\u0013a\bBCA\u0007\u0001\t\u0005\t\u0015!\u0003\u0002\u0010!Q\u0011q\u0003\u0001\u0003\u0002\u0003\u0006I!!\u0007\t\u0015\u0005}\u0001A!A!\u0002\u0017\t\t\u0003C\u0004\u0002.\u0001!\t!a\f\t\u0013\u0005\u001d\u0003A1A\u0005\n\u0005%\u0003\u0002CA&\u0001\u0001\u0006I!a\u0004\t\u0013\u00055\u0003A1A\u0005\n\u0005%\u0003\u0002CA(\u0001\u0001\u0006I!a\u0004\t\u0013\u0005E\u0003A1A\u0005\n\u0005M\u0003\u0002CA3\u0001\u0001\u0006I!!\u0016\t\u0013\u0005\u001d\u0004A1A\u0005\n\u0005%\u0004\u0002CA>\u0001\u0001\u0006I!a\u001b\t\u000f\u0005-\u0005\u0001\"\u0011\u0002\u000e\"9\u0011Q\u0013\u0001\u0005\n\u0005]\u0005bBAT\u0001\u0011\u0005\u0013QR\u0004\n\u0003SS\u0012\u0011!E\u0001\u0003W3\u0001\"\u0007\u000e\u0002\u0002#\u0005\u0011Q\u0016\u0005\b\u0003[1B\u0011AA[\u0011%\t9LFI\u0001\n\u0003\tIL\u0001\u000bECR\f7\u000b^8sKB+'o]5ti\u0016t7-\u001a\u0006\u00037q\tQa[1gW\u0006T!!\b\u0010\u0002\rM$(/Z1n\u0015\ty\u0002%\u0001\u0004mC6\u0014G-\u0019\u0006\u0003C\t\nqaZ3p[\u0016\u001c\u0018M\u0003\u0002$I\u0005aAn\\2bi&|g\u000e^3dQ*\tQ%A\u0002pe\u001e\u001c\u0001a\u0005\u0004\u0001QA\u001a\u0014(\u0011\t\u0003S9j\u0011A\u000b\u0006\u0003W1\nA\u0001\\1oO*\tQ&\u0001\u0003kCZ\f\u0017BA\u0018+\u0005\u0019y%M[3diB\u0011\u0011&M\u0005\u0003e)\u0012\u0001BU;o]\u0006\u0014G.\u001a\t\u0003i]j\u0011!\u000e\u0006\u0003m1\n!![8\n\u0005a*$!C\"m_N,\u0017M\u00197f!\tQt(D\u0001<\u0015\taT(A\u0003ti\u0006$8O\u0003\u0002?A\u0005)Q\u000f^5mg&\u0011\u0001i\u000f\u0002\u0010\u001b\u0016$\bn\u001c3Qe>4\u0017\u000e\\5oOB\u0011!)S\u0007\u0002\u0007*\u0011A)R\u0001\rg\u000e\fG.\u00197pO\u001eLgn\u001a\u0006\u0003\r\u001e\u000b\u0001\u0002^=qKN\fg-\u001a\u0006\u0002\u0011\u0006\u00191m\\7\n\u0005)\u001b%a\u0003'bufdunZ4j]\u001e\f!\u0001Z:\u0011\u00055#V\"\u0001(\u000b\u0005=\u0003\u0016\u0001\u00023bi\u0006T!!\u0015*\u0002\u0007\u0005\u0004\u0018N\u0003\u0002TI\u0005Aq-Z8u_>d7/\u0003\u0002V\u001d\nIA)\u0019;b'R|'/Z\u0001\u0004g\u001a$\bC\u0001-^\u001b\u0005I&B\u0001.\\\u0003\u0019\u0019\u0018.\u001c9mK*\u0011A\fU\u0001\bM\u0016\fG/\u001e:f\u0013\tq\u0016LA\tTS6\u0004H.\u001a$fCR,(/\u001a+za\u0016\fQb\u001c4gg\u0016$X*\u00198bO\u0016\u0014\bCA1c\u001b\u0005a\u0012BA2\u001d\u00055yeMZ:fi6\u000bg.Y4fe\u0006)1-Y2iKB\u0011a\r\u001f\b\u0003OZt!\u0001[;\u000f\u0005%$hB\u00016t\u001d\tY'O\u0004\u0002mc:\u0011Q\u000e]\u0007\u0002]*\u0011qNJ\u0001\u0007yI|w\u000e\u001e \n\u0003\u0015J!a\t\u0013\n\u0005\u0005\u0012\u0013BA\u0010!\u0013\tib$\u0003\u0002\u001c9%\u0011qOG\u0001\u0012\u0017\u000647.\u0019$fCR,(/Z\"bG\",\u0017BA={\u0005Q)\u0005\u0010]5sS:<g)Z1ukJ,7)Y2iK*\u0011qOG\u0001\u0006i>\u0004\u0018n\u0019\t\u0004{\u0006\u001dab\u0001@\u0002\u0004A\u0011Qn \u0006\u0003\u0003\u0003\tQa]2bY\u0006L1!!\u0002\u0000\u0003\u0019\u0001&/\u001a3fM&!\u0011\u0011BA\u0006\u0005\u0019\u0019FO]5oO*\u0019\u0011QA@\u0002\u0019\u0005<Wm\u00144g\u001b&dG.[:\u0011\t\u0005E\u00111C\u0007\u0002\u007f&\u0019\u0011QC@\u0003\t1{gnZ\u0001\u000fa\u0016\u00148/[:u\u000bb\u0004\u0018N]3e!\u0011\t\t\"a\u0007\n\u0007\u0005uqPA\u0004C_>dW-\u00198\u0002\u000b\rdwnY6\u0011\t\u0005\r\u0012\u0011F\u0007\u0003\u0003KQ1!a\n-\u0003\u0011!\u0018.\\3\n\t\u0005-\u0012Q\u0005\u0002\u0006\u00072|7m[\u0001\u0007y%t\u0017\u000e\u001e \u0015!\u0005E\u0012\u0011HA\u001e\u0003{\ty$!\u0011\u0002D\u0005\u0015C\u0003BA\u001a\u0003o\u00012!!\u000e\u0001\u001b\u0005Q\u0002\"CA\u0010\u0013A\u0005\t9AA\u0011\u0011\u0015Y\u0015\u00021\u0001M\u0011\u00151\u0016\u00021\u0001X\u0011\u0015y\u0016\u00021\u0001a\u0011\u0015!\u0017\u00021\u0001f\u0011\u0015Y\u0018\u00021\u0001}\u0011\u001d\ti!\u0003a\u0001\u0003\u001fAq!a\u0006\n\u0001\u0004\tI\"A\u0005ge\u0016\fX/\u001a8dsV\u0011\u0011qB\u0001\u000bMJ,\u0017/^3oGf\u0004\u0013a\u00037pG.$\u0016.\\3pkR\fA\u0002\\8dWRKW.Z8vi\u0002\n\u0001\"\u001a=fGV$xN]\u000b\u0003\u0003+\u0002B!a\u0016\u0002b5\u0011\u0011\u0011\f\u0006\u0005\u00037\ni&\u0001\u0006d_:\u001cWO\u001d:f]RT1!a\u0018-\u0003\u0011)H/\u001b7\n\t\u0005\r\u0014\u0011\f\u0002\u0019'\u000eDW\rZ;mK\u0012,\u00050Z2vi>\u00148+\u001a:wS\u000e,\u0017!C3yK\u000e,Ho\u001c:!\u0003!\u00198\r[3ek2,WCAA6a\u0011\ti'a\u001e\u0011\r\u0005]\u0013qNA:\u0013\u0011\t\t(!\u0017\u0003\u001fM\u001b\u0007.\u001a3vY\u0016$g)\u001e;ve\u0016\u0004B!!\u001e\u0002x1\u0001AaCA=#\u0005\u0005\t\u0011!B\u0001\u0003{\u0012!a\u0010\u0019\u0002\u0013M\u001c\u0007.\u001a3vY\u0016\u0004\u0013\u0003BA@\u0003\u000b\u0003B!!\u0005\u0002\u0002&\u0019\u00111Q@\u0003\u000f9{G\u000f[5oOB!\u0011\u0011CAD\u0013\r\tIi \u0002\u0004\u0003:L\u0018a\u0001:v]R\u0011\u0011q\u0012\t\u0005\u0003#\t\t*C\u0002\u0002\u0014~\u0014A!\u00168ji\u00069\u0001/\u001a:tSN$HCBAH\u00033\u000b\u0019\u000bC\u0004\u0002\u001cN\u0001\r!!(\u0002\u0013A\f'\u000f^5uS>t\u0007\u0003BA\t\u0003?K1!!)\u0000\u0005\rIe\u000e\u001e\u0005\b\u0003K\u001b\u0002\u0019AA\b\u0003\u0019)\u0007\u0010]5ss\u0006)1\r\\8tK\u0006!B)\u0019;b'R|'/\u001a)feNL7\u000f^3oG\u0016\u00042!!\u000e\u0017'\r1\u0012q\u0016\t\u0005\u0003#\t\t,C\u0002\u00024~\u0014a!\u00118z%\u00164GCAAV\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%qQ\u0001\u00121XAh\u0003#\f\u0019.!6\u0002X\u0006e\u00171\u001c\u0016\u0005\u0003C\til\u000b\u0002\u0002@B!\u0011\u0011YAf\u001b\t\t\u0019M\u0003\u0003\u0002F\u0006\u001d\u0017!C;oG\",7m[3e\u0015\r\tIm`\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BAg\u0003\u0007\u0014\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0011\u0015Y\u0005\u00041\u0001M\u0011\u00151\u0006\u00041\u0001X\u0011\u0015y\u0006\u00041\u0001a\u0011\u0015!\u0007\u00041\u0001f\u0011\u0015Y\b\u00041\u0001}\u0011\u001d\ti\u0001\u0007a\u0001\u0003\u001fAq!a\u0006\u0019\u0001\u0004\tI\u0002")
public class DataStorePersistence
implements Runnable,
Closeable,
MethodProfiling {
    private final DataStore ds;
    private final SimpleFeatureType sft;
    private final OffsetManager offsetManager;
    private final KafkaFeatureCache.ExpiringFeatureCache cache;
    private final String topic;
    private final long ageOffMillis;
    private final boolean persistExpired;
    private final Clock clock;
    private final long frequency;
    private final long lockTimeout;
    private final ScheduledExecutorService executor;
    private final ScheduledFuture<?> schedule;
    private transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    public static Clock $lessinit$greater$default$8(DataStore dataStore, SimpleFeatureType simpleFeatureType, OffsetManager offsetManager, KafkaFeatureCache.ExpiringFeatureCache expiringFeatureCache, String string, long l, boolean bl) {
        return DataStorePersistence$.MODULE$.$lessinit$greater$default$8(dataStore, simpleFeatureType, offsetManager, expiringFeatureCache, string, l, bl);
    }

    public <R> R profile(Function1<Object, BoxedUnit> onComplete, Function0<R> code) {
        return (R)MethodProfiling.profile$((MethodProfiling)this, onComplete, code);
    }

    public <R> R profile(Function2<R, Object, BoxedUnit> onComplete, Function0<R> code) {
        return (R)MethodProfiling.profile$((MethodProfiling)this, onComplete, code);
    }

    public <R> R profile(String message, Function0<R> code) {
        return (R)MethodProfiling.profile$((MethodProfiling)this, (String)message, code);
    }

    private Logger logger$lzycompute() {
        DataStorePersistence dataStorePersistence = this;
        synchronized (dataStorePersistence) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$((LazyLogging)this);
                this.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        if (!this.bitmap$trans$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    private long frequency() {
        return this.frequency;
    }

    private long lockTimeout() {
        return this.lockTimeout;
    }

    private ScheduledExecutorService executor() {
        return this.executor;
    }

    private ScheduledFuture<?> schedule() {
        return this.schedule;
    }

    @Override
    public void run() {
        BoxedUnit boxedUnit;
        Seq<Object> expired = this.cache.expired(this.clock.millis() - this.ageOffMillis);
        if (this.logger().underlying().isTraceEnabled()) {
            this.logger().underlying().trace(new StringBuilder(47).append("Found partition(s) with expired entries in [").append(this.topic).append("]: ").append((Object)(expired.isEmpty() ? "none" : expired.mkString(","))).toString());
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        ((IterableLike)Random$.MODULE$.shuffle(expired, Seq$.MODULE$.canBuildFrom())).foreach((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)partition -> {
            BoxedUnit boxedUnit;
            if (this.logger().underlying().isTraceEnabled()) {
                this.logger().underlying().trace("Acquiring lock for [{}:{}]", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition)});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            Option<Closeable> option = $this.offsetManager.acquireLock($this.topic, partition, this.lockTimeout());
            if (None$.MODULE$.equals(option)) {
                if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace("Could not acquire lock for [{}:{}] within {}ms", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)this.lockTimeout())});
                    return;
                }
                return;
            }
            if (option instanceof Some) {
                BoxedUnit boxedUnit2;
                Some some = (Some)option;
                Closeable lock = (Closeable)some.value();
                try {
                    BoxedUnit boxedUnit3;
                    if (this.logger().underlying().isTraceEnabled()) {
                        this.logger().underlying().trace("Acquired lock for [{}:{}]", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition)});
                        boxedUnit3 = BoxedUnit.UNIT;
                    } else {
                        boxedUnit3 = BoxedUnit.UNIT;
                    }
                    this.persist(partition, $this.clock.millis() - $this.ageOffMillis);
                }
                catch (Throwable throwable) {
                    BoxedUnit boxedUnit4;
                    lock.close();
                    if (this.logger().underlying().isTraceEnabled()) {
                        this.logger().underlying().trace("Released lock for [{}:{}]", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition)});
                        boxedUnit4 = BoxedUnit.UNIT;
                    } else {
                        boxedUnit4 = BoxedUnit.UNIT;
                    }
                    throw throwable;
                }
                lock.close();
                if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace("Released lock for [{}:{}]", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition)});
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = BoxedUnit.UNIT;
                }
                return;
            }
            throw new MatchError(option);
        });
    }

    private void persist(int partition, long expiry) {
        Object object;
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        KafkaFeatureCache.ExpiredFeatures expiredFeatures = this.cache.expired(partition, expiry);
        if (expiredFeatures == null) {
            throw new MatchError((Object)expiredFeatures);
        }
        long nextOffset = expiredFeatures.maxOffset();
        Seq<KafkaFeatureCache.OffsetFeature> expired = expiredFeatures.features();
        Tuple2 tuple2 = new Tuple2((Object)BoxesRunTime.boxToLong((long)nextOffset), expired);
        long nextOffset2 = tuple2._1$mcJ$sp();
        Seq expired2 = (Seq)tuple2._2();
        if (this.logger().underlying().isTraceEnabled()) {
            this.logger().underlying().trace(new StringBuilder(33).append("Found ").append(expired2.size()).append(" expired entries for [").append(this.topic).append(":").append(partition).append("]:\n\t").append(((TraversableOnce)expired2.map((Function1 & Serializable & scala.Serializable)e -> new StringBuilder(9).append("offset ").append(e.offset()).append(": ").append(e.feature()).toString(), Seq$.MODULE$.canBuildFrom())).mkString("\n\t")).toString());
            boxedUnit2 = BoxedUnit.UNIT;
        } else {
            boxedUnit2 = BoxedUnit.UNIT;
        }
        long lastOffset = this.offsetManager.getOffset(this.topic, partition);
        if (this.logger().underlying().isTraceEnabled()) {
            this.logger().underlying().trace("Last persisted offsets for [{}:{}]: {}", new Object[]{this.topic, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)lastOffset)});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        if (expired2.nonEmpty()) {
            if (!this.persistExpired) {
                if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace("Persist disabled for {}", new Object[]{this.topic});
                    object = BoxedUnit.UNIT;
                } else {
                    object = BoxedUnit.UNIT;
                }
            } else {
                BoxedUnit boxedUnit3;
                Map toPersist = Map$.MODULE$.empty();
                expired2.foreach((Function1 & Serializable & scala.Serializable)e -> {
                    if (e.offset() > lastOffset) {
                        return toPersist.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)e.feature().getID()), e));
                    }
                    return BoxedUnit.UNIT;
                });
                if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace("Offsets to persist for [{}:{}]: {}", new Object[]{this.topic, BoxesRunTime.boxToInteger((int)partition), ((TraversableOnce)((TraversableOnce)toPersist.values().map((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToLong((long)x$4.offset()), Iterable$.MODULE$.canBuildFrom())).toSeq().sorted((Ordering)Ordering.Long$.MODULE$)).mkString(",")});
                    boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    boxedUnit3 = BoxedUnit.UNIT;
                }
                this.profile((Function2)(JFunction2.mcVJJ.sp & Serializable & scala.Serializable)(modified, time) -> this.complete$1(modified, time), (Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> {
                    Id filter = package$.MODULE$.ff().id((FeatureId[])((TraversableOnce)toPersist.keys().map((Function1 & Serializable & scala.Serializable)x$1 -> package$.MODULE$.ff().featureId(x$1), Iterable$.MODULE$.canBuildFrom())).toSeq().toArray(ClassTag$.MODULE$.apply(FeatureId.class)));
                    return BoxesRunTime.unboxToLong((Object)package.WithClose$.MODULE$.apply((Object)$this.ds.getFeatureWriter($this.sft.getTypeName(), (Filter)filter, Transaction.AUTO_COMMIT), (Function1 & Serializable & scala.Serializable)writer -> BoxesRunTime.boxToLong((long)DataStorePersistence.$anonfun$persist$7(this, toPersist, partition, writer)), IsCloseable$.MODULE$.closeableIsCloseable()));
                });
                object = toPersist.nonEmpty() ? this.profile((Function2)(JFunction2.mcVJJ.sp & Serializable & scala.Serializable)(appended, time) -> this.complete$2(appended, time), (Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> BoxesRunTime.unboxToLong((Object)package.WithClose$.MODULE$.apply((Object)$this.ds.getFeatureWriterAppend($this.sft.getTypeName(), Transaction.AUTO_COMMIT), (Function1 & Serializable & scala.Serializable)writer -> BoxesRunTime.boxToLong((long)DataStorePersistence.$anonfun$persist$11(this, toPersist, partition, writer)), IsCloseable$.MODULE$.closeableIsCloseable()))) : BoxedUnit.UNIT;
            }
        } else {
            object = BoxedUnit.UNIT;
        }
        if (nextOffset2 > lastOffset) {
            BoxedUnit boxedUnit4;
            if (this.logger().underlying().isTraceEnabled()) {
                this.logger().underlying().trace("Committing offset [{}:{}:{}]", new Object[]{this.topic, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)nextOffset2)});
                boxedUnit4 = BoxedUnit.UNIT;
            } else {
                boxedUnit4 = BoxedUnit.UNIT;
            }
            this.offsetManager.setOffset(this.topic, partition, nextOffset2);
            return;
        }
    }

    @Override
    public void close() {
        this.schedule().cancel(true);
        this.executor().shutdownNow();
        this.executor().awaitTermination(1L, TimeUnit.SECONDS);
    }

    private final void complete$1(long modified, long time) {
        BoxedUnit boxedUnit;
        if (this.logger().underlying().isDebugEnabled()) {
            this.logger().underlying().debug("Wrote {} updated feature(s) to persistent storage in {}ms", new Object[]{BoxesRunTime.boxToLong((long)modified), BoxesRunTime.boxToLong((long)time)});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ long $anonfun$persist$7(DataStorePersistence $this, Map toPersist$1, int partition$1, FeatureWriter writer) {
        long count = 0L;
        while (writer.hasNext()) {
            SimpleFeature next = (SimpleFeature)writer.next();
            toPersist$1.get((Object)next.getID()).foreach((Function1 & Serializable & scala.Serializable)p -> {
                BoxedUnit boxedUnit;
                if ($this.logger().underlying().isTraceEnabled()) {
                    $this.logger().underlying().trace("Persistent store modify [{}:{}:{}] {}", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition$1), BoxesRunTime.boxToLong((long)p.offset()), p.feature()});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                FeatureUtils$.MODULE$.copyToFeature(next, p.feature(), true);
                try {
                    writer.write();
                }
                catch (Throwable throwable) {
                    Throwable throwable2 = throwable;
                    Option option = NonFatal$.MODULE$.unapply(throwable2);
                    if (!option.isEmpty()) {
                        Throwable e = (Throwable)option.get();
                        if ($this.logger().underlying().isErrorEnabled()) {
                            $this.logger().underlying().error(new StringBuilder(26).append("Error persisting feature: ").append(p.feature()).toString(), e);
                        }
                    }
                    throw throwable;
                }
                return toPersist$1.remove((Object)p.feature().getID());
            });
            ++count;
        }
        return count;
    }

    private final void complete$2(long appended, long time) {
        BoxedUnit boxedUnit;
        if (this.logger().underlying().isDebugEnabled()) {
            this.logger().underlying().debug("Wrote {} new feature(s) to persistent storage in {}ms", new Object[]{BoxesRunTime.boxToLong((long)appended), BoxesRunTime.boxToLong((long)time)});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$persist$12(DataStorePersistence $this, int partition$1, FeatureWriter writer$2, LongRef count$1, KafkaFeatureCache.OffsetFeature p) {
        SimpleFeature simpleFeature;
        BoxedUnit boxedUnit;
        if ($this.logger().underlying().isTraceEnabled()) {
            $this.logger().underlying().trace("Persistent store append [{}:{}:{}] {}", new Object[]{$this.topic, BoxesRunTime.boxToInteger((int)partition$1), BoxesRunTime.boxToLong((long)p.offset()), p.feature()});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        try {
            simpleFeature = FeatureUtils$.MODULE$.write(writer$2, p.feature(), true);
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (!option.isEmpty()) {
                Throwable e = (Throwable)option.get();
                if ($this.logger().underlying().isErrorEnabled()) {
                    $this.logger().underlying().error(new StringBuilder(26).append("Error persisting feature: ").append(p.feature()).toString(), e);
                    simpleFeature = BoxedUnit.UNIT;
                } else {
                    simpleFeature = BoxedUnit.UNIT;
                }
            }
            throw throwable;
        }
        ++count$1.elem;
    }

    public static final /* synthetic */ long $anonfun$persist$11(DataStorePersistence $this, Map toPersist$1, int partition$1, FeatureWriter writer) {
        LongRef count = LongRef.create((long)0L);
        toPersist$1.values().foreach((Function1 & Serializable & scala.Serializable)p -> {
            DataStorePersistence.$anonfun$persist$12($this, partition$1, writer, count, p);
            return BoxedUnit.UNIT;
        });
        return count.elem;
    }

    public DataStorePersistence(DataStore ds, SimpleFeatureType sft, OffsetManager offsetManager, KafkaFeatureCache.ExpiringFeatureCache cache, String topic, long ageOffMillis, boolean persistExpired, Clock clock) {
        this.ds = ds;
        this.sft = sft;
        this.offsetManager = offsetManager;
        this.cache = cache;
        this.topic = topic;
        this.ageOffMillis = ageOffMillis;
        this.persistExpired = persistExpired;
        this.clock = clock;
        LazyLogging.$init$((LazyLogging)this);
        MethodProfiling.$init$((MethodProfiling)this);
        this.frequency = BoxesRunTime.unboxToLong((Object)new GeoMesaSystemProperties.SystemProperty("geomesa.lambda.persist.interval", GeoMesaSystemProperties.SystemProperty$.MODULE$.apply$default$2()).toDuration().map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToLong((long)x$1.toMillis())).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> 60000L));
        this.lockTimeout = BoxesRunTime.unboxToLong((Object)new GeoMesaSystemProperties.SystemProperty("geomesa.lambda.persist.lock.timeout", GeoMesaSystemProperties.SystemProperty$.MODULE$.apply$default$2()).toDuration().map((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToLong((long)x$2.toMillis())).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> 1000L));
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.schedule = this.executor().scheduleWithFixedDelay(this, this.frequency(), this.frequency(), TimeUnit.MILLISECONDS);
    }
}

