package org.locationtech.geomesa.lambda.stream.kafka;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.io.Closeable;
import java.time.Clock;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.geotools.data.DataStore;
import org.geotools.data.FeatureWriter;
import org.geotools.data.Transaction;
import org.locationtech.geomesa.filter.package$;
import org.locationtech.geomesa.index.utils.Releasable;
import org.locationtech.geomesa.lambda.stream.OffsetManager;
import org.locationtech.geomesa.lambda.stream.kafka.KafkaFeatureCache;
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties;
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties$SystemProperty$;
import org.locationtech.geomesa.utils.geotools.FeatureUtils$;
import org.locationtech.geomesa.utils.io.IsCloseable$;
import org.locationtech.geomesa.utils.io.package$WithClose$;
import org.locationtech.geomesa.utils.stats.MethodProfiling;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import org.opengis.filter.identity.FeatureId;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.math.Ordering$Long$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.util.Random$;
import scala.util.control.NonFatal$;

/* compiled from: DataStorePersistence.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ug\u0001B\r\u001b\u0001\u001dB\u0001b\u0013\u0001\u0003\u0002\u0003\u0006I\u0001\u0014\u0005\t)\u0002\u0011\t\u0011)A\u0005+\"Aq\f\u0001B\u0001B\u0003%\u0001\r\u0003\u0005e\u0001\t\u0005\t\u0015!\u0003f\u0011!Y\bA!A!\u0002\u0013a\bBCA\u0007\u0001\t\u0005\t\u0015!\u0003\u0002\u0010!Q\u0011q\u0003\u0001\u0003\u0002\u0003\u0006I!!\u0007\t\u0015\u0005}\u0001A!A!\u0002\u0017\t\t\u0003C\u0004\u0002.\u0001!\t!a\f\t\u0013\u0005\u001d\u0003A1A\u0005\n\u0005%\u0003\u0002CA&\u0001\u0001\u0006I!a\u0004\t\u0013\u00055\u0003A1A\u0005\n\u0005%\u0003\u0002CA(\u0001\u0001\u0006I!a\u0004\t\u0013\u0005E\u0003A1A\u0005\n\u0005M\u0003\u0002CA3\u0001\u0001\u0006I!!\u0016\t\u0013\u0005\u001d\u0004A1A\u0005\n\u0005%\u0004\u0002CA>\u0001\u0001\u0006I!a\u001b\t\u000f\u0005-\u0005\u0001\"\u0011\u0002\u000e\"9\u0011Q\u0013\u0001\u0005\n\u0005]\u0005bBAT\u0001\u0011\u0005\u0013QR\u0004\n\u0003SS\u0012\u0011!E\u0001\u0003W3\u0001\"\u0007\u000e\u0002\u0002#\u0005\u0011Q\u0016\u0005\b\u0003[1B\u0011AA[\u0011%\t9LFI\u0001\n\u0003\tIL\u0001\u000bECR\f7\u000b^8sKB+'o]5ti\u0016t7-\u001a\u0006\u00037q\tQa[1gW\u0006T!!\b\u0010\u0002\rM$(/Z1n\u0015\ty\u0002%\u0001\u0004mC6\u0014G-\u0019\u0006\u0003C\t\nqaZ3p[\u0016\u001c\u0018M\u0003\u0002$I\u0005aAn\\2bi&|g\u000e^3dQ*\tQ%A\u0002pe\u001e\u001c\u0001a\u0005\u0004\u0001QA\u001a\u0014(\u0011\t\u0003S9j\u0011A\u000b\u0006\u0003W1\nA\u0001\\1oO*\tQ&\u0001\u0003kCZ\f\u0017BA\u0018+\u0005\u0019y%M[3diB\u0011\u0011&M\u0005\u0003e)\u0012\u0001BU;o]\u0006\u0014G.\u001a\t\u0003i]j\u0011!\u000e\u0006\u0003m1\n!![8\n\u0005a*$!C\"m_N,\u0017M\u00197f!\tQt(D\u0001<\u0015\taT(A\u0003ti\u0006$8O\u0003\u0002?A\u0005)Q\u000f^5mg&\u0011\u0001i\u000f\u0002\u0010\u001b\u0016$\bn\u001c3Qe>4\u0017\u000e\\5oOB\u0011!)S\u0007\u0002\u0007*\u0011A)R\u0001\rg\u000e\fG.\u00197pO\u001eLgn\u001a\u0006\u0003\r\u001e\u000b\u0001\u0002^=qKN\fg-\u001a\u0006\u0002\u0011\u0006\u00191m\\7\n\u0005)\u001b%a\u0003'bufdunZ4j]\u001e\f!\u0001Z:\u0011\u00055\u0013V\"\u0001(\u000b\u0005=\u0003\u0016\u0001\u00023bi\u0006T!!\u0015\u0013\u0002\u0011\u001d,w\u000e^8pYNL!a\u0015(\u0003\u0013\u0011\u000bG/Y*u_J,\u0017aA:giB\u0011a+X\u0007\u0002/*\u0011\u0001,W\u0001\u0007g&l\u0007\u000f\\3\u000b\u0005i[\u0016a\u00024fCR,(/\u001a\u0006\u00039\u0012\nqa\u001c9f]\u001eL7/\u0003\u0002_/\n\t2+[7qY\u00164U-\u0019;ve\u0016$\u0016\u0010]3\u0002\u001b=4gm]3u\u001b\u0006t\u0017mZ3s!\t\t'-D\u0001\u001d\u0013\t\u0019GDA\u0007PM\u001a\u001cX\r^'b]\u0006<WM]\u0001\u0006G\u0006\u001c\u0007.\u001a\t\u0003Mbt!a\u001a<\u000f\u0005!,hBA5u\u001d\tQ7O\u0004\u0002le:\u0011A.\u001d\b\u0003[Bl\u0011A\u001c\u0006\u0003_\u001a\na\u0001\u0010:p_Rt\u0014\"A\u0013\n\u0005\r\"\u0013BA\u0011#\u0013\ty\u0002%\u0003\u0002\u001e=%\u00111\u0004H\u0005\u0003oj\t\u0011cS1gW\u00064U-\u0019;ve\u0016\u001c\u0015m\u00195f\u0013\tI(P\u0001\u000bFqBL'/\u001b8h\r\u0016\fG/\u001e:f\u0007\u0006\u001c\u0007.\u001a\u0006\u0003oj\tQ\u0001^8qS\u000e\u00042!`A\u0004\u001d\rq\u00181\u0001\t\u0003[~T!!!\u0001\u0002\u000bM\u001c\u0017\r\\1\n\u0007\u0005\u0015q0\u0001\u0004Qe\u0016$WMZ\u0005\u0005\u0003\u0013\tYA\u0001\u0004TiJLgn\u001a\u0006\u0004\u0003\u000by\u0018\u0001D1hK>3g-T5mY&\u001c\b\u0003BA\t\u0003'i\u0011a`\u0005\u0004\u0003+y(\u0001\u0002'p]\u001e\fa\u0002]3sg&\u001cH/\u0012=qSJ,G\r\u0005\u0003\u0002\u0012\u0005m\u0011bAA\u000f\u007f\n9!i\\8mK\u0006t\u0017!B2m_\u000e\\\u0007\u0003BA\u0012\u0003Si!!!\n\u000b\u0007\u0005\u001dB&\u0001\u0003uS6,\u0017\u0002BA\u0016\u0003K\u0011Qa\u00117pG.\fa\u0001P5oSRtD\u0003EA\u0019\u0003s\tY$!\u0010\u0002@\u0005\u0005\u00131IA#)\u0011\t\u0019$a\u000e\u0011\u0007\u0005U\u0002!D\u0001\u001b\u0011%\ty\"\u0003I\u0001\u0002\b\t\t\u0003C\u0003L\u0013\u0001\u0007A\nC\u0003U\u0013\u0001\u0007Q\u000bC\u0003`\u0013\u0001\u0007\u0001\rC\u0003e\u0013\u0001\u0007Q\rC\u0003|\u0013\u0001\u0007A\u0010C\u0004\u0002\u000e%\u0001\r!a\u0004\t\u000f\u0005]\u0011\u00021\u0001\u0002\u001a\u0005IaM]3rk\u0016t7-_\u000b\u0003\u0003\u001f\t!B\u001a:fcV,gnY=!\u0003-awnY6US6,w.\u001e;\u0002\u00191|7m\u001b+j[\u0016|W\u000f\u001e\u0011\u0002\u0011\u0015DXmY;u_J,\"!!\u0016\u0011\t\u0005]\u0013\u0011M\u0007\u0003\u00033RA!a\u0017\u0002^\u0005Q1m\u001c8dkJ\u0014XM\u001c;\u000b\u0007\u0005}C&\u0001\u0003vi&d\u0017\u0002BA2\u00033\u0012\u0001dU2iK\u0012,H.\u001a3Fq\u0016\u001cW\u000f^8s'\u0016\u0014h/[2f\u0003%)\u00070Z2vi>\u0014\b%\u0001\u0005tG\",G-\u001e7f+\t\tY\u0007\r\u0003\u0002n\u0005]\u0004CBA,\u0003_\n\u0019(\u0003\u0003\u0002r\u0005e#aD*dQ\u0016$W\u000f\\3e\rV$XO]3\u0011\t\u0005U\u0014q\u000f\u0007\u0001\t-\tI(EA\u0001\u0002\u0003\u0015\t!! \u0003\u0005}\u0002\u0014!C:dQ\u0016$W\u000f\\3!#\u0011\ty(!\"\u0011\t\u0005E\u0011\u0011Q\u0005\u0004\u0003\u0007{(a\u0002(pi\"Lgn\u001a\t\u0005\u0003#\t9)C\u0002\u0002\n~\u00141!\u00118z\u0003\r\u0011XO\u001c\u000b\u0003\u0003\u001f\u0003B!!\u0005\u0002\u0012&\u0019\u00111S@\u0003\tUs\u0017\u000e^\u0001\ba\u0016\u00148/[:u)\u0019\ty)!'\u0002$\"9\u00111T\nA\u0002\u0005u\u0015!\u00039beRLG/[8o!\u0011\t\t\"a(\n\u0007\u0005\u0005vPA\u0002J]RDq!!*\u0014\u0001\u0004\ty!\u0001\u0004fqBL'/_\u0001\u0006G2|7/Z\u0001\u0015\t\u0006$\u0018m\u0015;pe\u0016\u0004VM]:jgR,gnY3\u0011\u0007\u0005UbcE\u0002\u0017\u0003_\u0003B!!\u0005\u00022&\u0019\u00111W@\u0003\r\u0005s\u0017PU3g)\t\tY+A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H\u0005\u000f\u000b\u0011\u0003w\u000by-!5\u0002T\u0006U\u0017q[Am\u00037TC!!\t\u0002>.\u0012\u0011q\u0018\t\u0005\u0003\u0003\fY-\u0004\u0002\u0002D*!\u0011QYAd\u0003%)hn\u00195fG.,GMC\u0002\u0002J~\f!\"\u00198o_R\fG/[8o\u0013\u0011\ti-a1\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW\rC\u0003L1\u0001\u0007A\nC\u0003U1\u0001\u0007Q\u000bC\u0003`1\u0001\u0007\u0001\rC\u0003e1\u0001\u0007Q\rC\u0003|1\u0001\u0007A\u0010C\u0004\u0002\u000ea\u0001\r!a\u0004\t\u000f\u0005]\u0001\u00041\u0001\u0002\u001a\u0001")
/* loaded from: input_file:org/locationtech/geomesa/lambda/stream/kafka/DataStorePersistence.class */
public class DataStorePersistence implements Runnable, Closeable, MethodProfiling {
    private final DataStore ds;
    private final SimpleFeatureType sft;
    private final OffsetManager offsetManager;
    private final KafkaFeatureCache.ExpiringFeatureCache cache;
    private final String topic;
    private final long ageOffMillis;
    private final boolean persistExpired;
    private final Clock clock;
    private final long frequency;
    private final long lockTimeout;
    private final ScheduledExecutorService executor;
    private final ScheduledFuture<?> schedule;
    private Logger logger;
    private volatile boolean bitmap$0;

    public <R> R profile(Function1<Object, BoxedUnit> function1, Function0<R> function0) {
        return (R) MethodProfiling.profile$(this, function1, function0);
    }

    public <R> R profile(Function2<R, Object, BoxedUnit> function2, Function0<R> function0) {
        return (R) MethodProfiling.profile$(this, function2, function0);
    }

    public <R> R profile(String str, Function0<R> function0) {
        return (R) MethodProfiling.profile$(this, str, function0);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.locationtech.geomesa.lambda.stream.kafka.DataStorePersistence] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.logger = LazyLogging.logger$(this);
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$0 ? logger$lzycompute() : this.logger;
    }

    private long frequency() {
        return this.frequency;
    }

    private long lockTimeout() {
        return this.lockTimeout;
    }

    private ScheduledExecutorService executor() {
        return this.executor;
    }

    private ScheduledFuture<?> schedule() {
        return this.schedule;
    }

    @Override // java.lang.Runnable
    public void run() {
        Seq<Object> expired = this.cache.expired(this.clock.millis() - this.ageOffMillis);
        if (logger().underlying().isTraceEnabled()) {
            logger().underlying().trace(new StringBuilder(47).append("Found partition(s) with expired entries in [").append(this.topic).append("]: ").append((Object) (expired.isEmpty() ? "none" : expired.mkString(","))).toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        Random$.MODULE$.shuffle(expired, Seq$.MODULE$.canBuildFrom()).foreach(i -> {
            BoxedUnit boxedUnit3;
            if (this.logger().underlying().isTraceEnabled()) {
                this.logger().underlying().trace("Acquiring lock for [{}:{}]", new Object[]{this.topic, BoxesRunTime.boxToInteger(i)});
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            }
            Some acquireLock = this.offsetManager.acquireLock(this.topic, i, this.lockTimeout());
            if (None$.MODULE$.equals(acquireLock)) {
                if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace("Could not acquire lock for [{}:{}] within {}ms", new Object[]{this.topic, BoxesRunTime.boxToInteger(i), BoxesRunTime.boxToLong(this.lockTimeout())});
                    boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    boxedUnit3 = BoxedUnit.UNIT;
                }
                return;
            }
            if (!(acquireLock instanceof Some)) {
                throw new MatchError(acquireLock);
            }
            Releasable releasable = (Releasable) acquireLock.value();
            try {
                if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace("Acquired lock for [{}:{}]", new Object[]{this.topic, BoxesRunTime.boxToInteger(i)});
                    BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
                }
                this.persist(i, this.clock.millis() - this.ageOffMillis);
                BoxedUnit boxedUnit8 = BoxedUnit.UNIT;
                releasable.release();
                if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace("Released lock for [{}:{}]", new Object[]{this.topic, BoxesRunTime.boxToInteger(i)});
                    BoxedUnit boxedUnit9 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit10 = BoxedUnit.UNIT;
                }
            } catch (Throwable th) {
                releasable.release();
                if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace("Released lock for [{}:{}]", new Object[]{this.topic, BoxesRunTime.boxToInteger(i)});
                    BoxedUnit boxedUnit11 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit12 = BoxedUnit.UNIT;
                }
                throw th;
            }
        });
    }

    private void persist(int i, long j) {
        KafkaFeatureCache.ExpiredFeatures expired = this.cache.expired(i, j);
        if (expired == null) {
            throw new MatchError(expired);
        }
        Tuple2 tuple2 = new Tuple2(BoxesRunTime.boxToLong(expired.maxOffset()), expired.features());
        long _1$mcJ$sp = tuple2._1$mcJ$sp();
        Seq seq = (Seq) tuple2._2();
        if (logger().underlying().isTraceEnabled()) {
            logger().underlying().trace(new StringBuilder(33).append("Found ").append(seq.size()).append(" expired entries for [").append(this.topic).append(":").append(i).append("]:\n\t").append(((TraversableOnce) seq.map(offsetFeature -> {
                return new StringBuilder(9).append("offset ").append(offsetFeature.offset()).append(": ").append(offsetFeature.feature()).toString();
            }, Seq$.MODULE$.canBuildFrom())).mkString("\n\t")).toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        long offset = this.offsetManager.getOffset(this.topic, i);
        if (logger().underlying().isTraceEnabled()) {
            logger().underlying().trace("Last persisted offsets for [{}:{}]: {}", new Object[]{this.topic, BoxesRunTime.boxToInteger(i), BoxesRunTime.boxToLong(offset)});
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        if (!seq.nonEmpty()) {
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
        } else if (this.persistExpired) {
            Map empty = Map$.MODULE$.empty();
            seq.foreach(offsetFeature2 -> {
                return offsetFeature2.offset() > offset ? empty.$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(offsetFeature2.feature().getID()), offsetFeature2)) : BoxedUnit.UNIT;
            });
            if (logger().underlying().isTraceEnabled()) {
                logger().underlying().trace("Offsets to persist for [{}:{}]: {}", new Object[]{this.topic, BoxesRunTime.boxToInteger(i), ((TraversableOnce) ((TraversableOnce) empty.values().map(offsetFeature3 -> {
                    return BoxesRunTime.boxToLong(offsetFeature3.offset());
                }, Iterable$.MODULE$.canBuildFrom())).toSeq().sorted(Ordering$Long$.MODULE$)).mkString(",")});
                BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
            }
            profile((Function2) (j2, j3) -> {
                this.complete$1(j2, j3);
            }, (Function0) () -> {
                return BoxesRunTime.unboxToLong(package$WithClose$.MODULE$.apply(this.ds.getFeatureWriter(this.sft.getTypeName(), package$.MODULE$.ff().id((FeatureId[]) ((TraversableOnce) empty.keys().map(str -> {
                    return package$.MODULE$.ff().featureId(str);
                }, Iterable$.MODULE$.canBuildFrom())).toSeq().toArray(ClassTag$.MODULE$.apply(FeatureId.class))), Transaction.AUTO_COMMIT), featureWriter -> {
                    return BoxesRunTime.boxToLong($anonfun$persist$7(this, empty, i, featureWriter));
                }, IsCloseable$.MODULE$.closeableIsCloseable()));
            });
            if (empty.nonEmpty()) {
                profile((Function2) (j4, j5) -> {
                    this.complete$2(j4, j5);
                }, (Function0) () -> {
                    return BoxesRunTime.unboxToLong(package$WithClose$.MODULE$.apply(this.ds.getFeatureWriterAppend(this.sft.getTypeName(), Transaction.AUTO_COMMIT), featureWriter -> {
                        return BoxesRunTime.boxToLong($anonfun$persist$11(this, empty, i, featureWriter));
                    }, IsCloseable$.MODULE$.closeableIsCloseable()));
                });
            } else {
                BoxedUnit boxedUnit8 = BoxedUnit.UNIT;
            }
        } else if (logger().underlying().isTraceEnabled()) {
            logger().underlying().trace("Persist disabled for {}", new Object[]{this.topic});
            BoxedUnit boxedUnit9 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit10 = BoxedUnit.UNIT;
        }
        if (_1$mcJ$sp > offset) {
            if (logger().underlying().isTraceEnabled()) {
                logger().underlying().trace("Committing offset [{}:{}:{}]", new Object[]{this.topic, BoxesRunTime.boxToInteger(i), BoxesRunTime.boxToLong(_1$mcJ$sp)});
                BoxedUnit boxedUnit11 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit12 = BoxedUnit.UNIT;
            }
            this.offsetManager.setOffset(this.topic, i, _1$mcJ$sp);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        schedule().cancel(true);
        executor().shutdownNow();
        executor().awaitTermination(1L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void complete$1(long j, long j2) {
        if (!logger().underlying().isDebugEnabled()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            logger().underlying().debug("Wrote {} updated feature(s) to persistent storage in {}ms", new Object[]{BoxesRunTime.boxToLong(j), BoxesRunTime.boxToLong(j2)});
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ long $anonfun$persist$7(DataStorePersistence dataStorePersistence, Map map, int i, FeatureWriter featureWriter) {
        long j = 0;
        while (true) {
            long j2 = j;
            if (!featureWriter.hasNext()) {
                return j2;
            }
            SimpleFeature next = featureWriter.next();
            map.get(next.getID()).foreach(offsetFeature -> {
                BoxedUnit boxedUnit;
                if (dataStorePersistence.logger().underlying().isTraceEnabled()) {
                    dataStorePersistence.logger().underlying().trace("Persistent store modify [{}:{}:{}] {}", new Object[]{dataStorePersistence.topic, BoxesRunTime.boxToInteger(i), BoxesRunTime.boxToLong(offsetFeature.offset()), offsetFeature.feature()});
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                }
                FeatureUtils$.MODULE$.copyToFeature(next, offsetFeature.feature(), true);
                try {
                    featureWriter.write();
                } catch (Throwable th) {
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (unapply.isEmpty()) {
                        throw th;
                    }
                    Throwable th2 = (Throwable) unapply.get();
                    if (dataStorePersistence.logger().underlying().isErrorEnabled()) {
                        dataStorePersistence.logger().underlying().error(new StringBuilder(26).append("Error persisting feature: ").append(offsetFeature.feature()).toString(), th2);
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        boxedUnit = BoxedUnit.UNIT;
                    }
                }
                return map.remove(offsetFeature.feature().getID());
            });
            j = j2 + 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void complete$2(long j, long j2) {
        if (!logger().underlying().isDebugEnabled()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            logger().underlying().debug("Wrote {} new feature(s) to persistent storage in {}ms", new Object[]{BoxesRunTime.boxToLong(j), BoxesRunTime.boxToLong(j2)});
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$persist$12(DataStorePersistence dataStorePersistence, int i, FeatureWriter featureWriter, LongRef longRef, KafkaFeatureCache.OffsetFeature offsetFeature) {
        BoxedUnit boxedUnit;
        if (dataStorePersistence.logger().underlying().isTraceEnabled()) {
            dataStorePersistence.logger().underlying().trace("Persistent store append [{}:{}:{}] {}", new Object[]{dataStorePersistence.topic, BoxesRunTime.boxToInteger(i), BoxesRunTime.boxToLong(offsetFeature.offset()), offsetFeature.feature()});
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        try {
            FeatureUtils$.MODULE$.write(featureWriter, offsetFeature.feature(), true);
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            Throwable th2 = (Throwable) unapply.get();
            if (dataStorePersistence.logger().underlying().isErrorEnabled()) {
                dataStorePersistence.logger().underlying().error(new StringBuilder(26).append("Error persisting feature: ").append(offsetFeature.feature()).toString(), th2);
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        }
        longRef.elem++;
    }

    public static final /* synthetic */ long $anonfun$persist$11(DataStorePersistence dataStorePersistence, Map map, int i, FeatureWriter featureWriter) {
        LongRef create = LongRef.create(0L);
        map.values().foreach(offsetFeature -> {
            $anonfun$persist$12(dataStorePersistence, i, featureWriter, create, offsetFeature);
            return BoxedUnit.UNIT;
        });
        return create.elem;
    }

    public DataStorePersistence(DataStore dataStore, SimpleFeatureType simpleFeatureType, OffsetManager offsetManager, KafkaFeatureCache.ExpiringFeatureCache expiringFeatureCache, String str, long j, boolean z, Clock clock) {
        this.ds = dataStore;
        this.sft = simpleFeatureType;
        this.offsetManager = offsetManager;
        this.cache = expiringFeatureCache;
        this.topic = str;
        this.ageOffMillis = j;
        this.persistExpired = z;
        this.clock = clock;
        LazyLogging.$init$(this);
        MethodProfiling.$init$(this);
        this.frequency = BoxesRunTime.unboxToLong(new GeoMesaSystemProperties.SystemProperty("geomesa.lambda.persist.interval", GeoMesaSystemProperties$SystemProperty$.MODULE$.apply$default$2()).toDuration().map(duration -> {
            return BoxesRunTime.boxToLong(duration.toMillis());
        }).getOrElse(() -> {
            return 60000L;
        }));
        this.lockTimeout = BoxesRunTime.unboxToLong(new GeoMesaSystemProperties.SystemProperty("geomesa.lambda.persist.lock.timeout", GeoMesaSystemProperties$SystemProperty$.MODULE$.apply$default$2()).toDuration().map(duration2 -> {
            return BoxesRunTime.boxToLong(duration2.toMillis());
        }).getOrElse(() -> {
            return 1000L;
        }));
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.schedule = executor().scheduleWithFixedDelay(this, frequency(), frequency(), TimeUnit.MILLISECONDS);
    }
}
