package com.coxautodata.waimak.azure.table;

import com.coxautodata.waimak.log.Logging;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.table.CloudTable;
import com.microsoft.azure.storage.table.DynamicTableEntity;
import com.microsoft.azure.storage.table.TableBatchOperation;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration$;
import scala.math.Numeric$IntIsIntegral$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try$;

/* compiled from: AzureTableMultiWriter.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00055d\u0001B\u0001\u0003\u00015\u0011Q#\u0011>ve\u0016$\u0016M\u00197f\u001bVdG/[,sSR,'O\u0003\u0002\u0004\t\u0005)A/\u00192mK*\u0011QAB\u0001\u0006Cj,(/\u001a\u0006\u0003\u000f!\taa^1j[\u0006\\'BA\u0005\u000b\u0003-\u0019w\u000e_1vi>$\u0017\r^1\u000b\u0003-\t1aY8n\u0007\u0001\u00192\u0001\u0001\b\u0015!\ty!#D\u0001\u0011\u0015\u0005\t\u0012!B:dC2\f\u0017BA\n\u0011\u0005\u0019\te.\u001f*fMB\u0011Q\u0003G\u0007\u0002-)\u0011qCB\u0001\u0004Y><\u0017BA\r\u0017\u0005\u001daunZ4j]\u001eD\u0001b\u0001\u0001\u0003\u0006\u0004%\taG\u000b\u00029A\u0011Q\u0004\n\b\u0003=\t\u0002\"a\b\t\u000e\u0003\u0001R!!\t\u0007\u0002\rq\u0012xn\u001c;?\u0013\t\u0019\u0003#\u0001\u0004Qe\u0016$WMZ\u0005\u0003K\u0019\u0012aa\u0015;sS:<'BA\u0012\u0011\u0011!A\u0003A!A!\u0002\u0013a\u0012A\u0002;bE2,\u0007\u0005\u0003\u0005+\u0001\t\u0015\r\u0011\"\u0001\u001c\u0003)\u0019wN\u001c8fGRLwN\u001c\u0005\tY\u0001\u0011\t\u0011)A\u00059\u0005Y1m\u001c8oK\u000e$\u0018n\u001c8!\u0011!q\u0003A!b\u0001\n\u0003y\u0013!\u0003;ie\u0016\fGMT;n+\u0005\u0001\u0004CA\b2\u0013\t\u0011\u0004CA\u0002J]RD\u0001\u0002\u000e\u0001\u0003\u0002\u0003\u0006I\u0001M\u0001\u000bi\"\u0014X-\u00193Ok6\u0004\u0003\u0002\u0003\u001c\u0001\u0005\u000b\u0007I\u0011A\u001c\u0002\u0013QLW.Z8vi6\u001bX#\u0001\u001d\u0011\u0005=I\u0014B\u0001\u001e\u0011\u0005\u0011auN\\4\t\u0011q\u0002!\u0011!Q\u0001\na\n!\u0002^5nK>,H/T:!\u0011!q\u0004A!b\u0001\n\u0003y\u0014\u0001\u0004:fiJLH)\u001a7bs6\u001bX#\u0001!\u0011\u0007\u00053\u0005H\u0004\u0002C\t:\u0011qdQ\u0005\u0002#%\u0011Q\tE\u0001\ba\u0006\u001c7.Y4f\u0013\t9\u0005JA\u0002TKFT!!\u0012\t\t\u0011)\u0003!\u0011!Q\u0001\n\u0001\u000bQB]3uef$U\r\\1z\u001bN\u0004\u0003\"\u0002'\u0001\t\u0003i\u0015A\u0002\u001fj]&$h\b\u0006\u0004O!F\u00136\u000b\u0016\t\u0003\u001f\u0002i\u0011A\u0001\u0005\u0006\u0007-\u0003\r\u0001\b\u0005\u0006U-\u0003\r\u0001\b\u0005\u0006]-\u0003\r\u0001\r\u0005\u0006m-\u0003\r\u0001\u000f\u0005\u0006}-\u0003\r\u0001\u0011\u0005\b-\u0002\u0011\r\u0011\"\u0001X\u0003\u0015\tX/Z;f+\u0005A\u0006cA-aE6\t!L\u0003\u0002\\9\u0006Q1m\u001c8dkJ\u0014XM\u001c;\u000b\u0005us\u0016\u0001B;uS2T\u0011aX\u0001\u0005U\u00064\u0018-\u0003\u0002b5\ni!\t\\8dW&tw-U;fk\u0016\u00042!\u0011$d!\t!7.D\u0001f\u0015\t\u0019aM\u0003\u0002hQ\u000691\u000f^8sC\u001e,'BA\u0003j\u0015\tQ'\"A\u0005nS\u000e\u0014xn]8gi&\u0011A.\u001a\u0002\u0013\tft\u0017-\\5d)\u0006\u0014G.Z#oi&$\u0018\u0010\u0003\u0004o\u0001\u0001\u0006I\u0001W\u0001\u0007cV,W/\u001a\u0011\t\u000fA\u0004!\u0019!C\tc\u0006\u0019B\u000f[3sK^KG\u000e\u001c\"f\u001b>\u0014X\rR1uCV\t!\u000f\u0005\u0002tm6\tAO\u0003\u0002v5\u00061\u0011\r^8nS\u000eL!a\u001e;\u0003\u001b\u0005#x.\\5d\u0005>|G.Z1o\u0011\u0019I\b\u0001)A\u0005e\u0006!B\u000f[3sK^KG\u000e\u001c\"f\u001b>\u0014X\rR1uC\u0002Bqa\u001f\u0001C\u0002\u0013\u0005\u0011/\u0001\u0007uQJ,\u0017\r\u001a$bS2,G\r\u0003\u0004~\u0001\u0001\u0006IA]\u0001\u000ei\"\u0014X-\u00193GC&dW\r\u001a\u0011\t\u0011}\u0004!\u0019!C\u0001\u0003\u0003\t!\u0002\u001e5sK\u0006$\u0007k\\8m+\t\t\u0019\u0001E\u0002Z\u0003\u000bI1!a\u0002[\u0005=)\u00050Z2vi>\u00148+\u001a:wS\u000e,\u0007\u0002CA\u0006\u0001\u0001\u0006I!a\u0001\u0002\u0017QD'/Z1e!>|G\u000e\t\u0005\f\u0003\u001f\u0001\u0001\u0019!a\u0001\n#\t\t\"A\u0004gkR,(/Z:\u0016\u0005\u0005M\u0001\u0003B!G\u0003+\u0001R!a\u0006\u0002\u001cAj!!!\u0007\u000b\u0005m\u0003\u0012\u0002BA\u000f\u00033\u0011aAR;ukJ,\u0007bCA\u0011\u0001\u0001\u0007\t\u0019!C\t\u0003G\t1BZ;ukJ,7o\u0018\u0013fcR!\u0011QEA\u0016!\ry\u0011qE\u0005\u0004\u0003S\u0001\"\u0001B+oSRD!\"!\f\u0002 \u0005\u0005\t\u0019AA\n\u0003\rAH%\r\u0005\f\u0003c\u0001\u0001\u0019!A!B\u0013\t\u0019\"\u0001\u0005gkR,(/Z:!\u0011\u001d\t)\u0004\u0001C\u0001\u0003o\t1A];o)\t\t)\u0003C\u0004\u0002<\u0001!\t!!\u0010\u0002\u000bI,GO]=\u0016\t\u0005}\u0012Q\t\u000b\t\u0003\u0003\n9&a\u0019\u0002fA!\u00111IA#\u0019\u0001!\u0001\"a\u0012\u0002:\t\u0007\u0011\u0011\n\u0002\u0002)F!\u00111JA)!\ry\u0011QJ\u0005\u0004\u0003\u001f\u0002\"a\u0002(pi\"Lgn\u001a\t\u0004\u001f\u0005M\u0013bAA+!\t\u0019\u0011I\\=\t\u0011\u0005e\u0013\u0011\ba\u0001\u00037\n\u0011A\u001a\t\u0006\u001f\u0005u\u0013\u0011M\u0005\u0004\u0003?\u0002\"!\u0003$v]\u000e$\u0018n\u001c81!\u0019\t9\"a\u0007\u0002B!1a'!\u000fA\u0002aBq!a\u001a\u0002:\u0001\u0007\u0001)\u0001\u0005eK2\f\u0017p]'t\u0011\u0019\tY\u0007\u0001C\u0001_\u00051a-\u001b8jg\"\u0004")
/* loaded from: input_file:com/coxautodata/waimak/azure/table/AzureTableMultiWriter.class */
public class AzureTableMultiWriter implements Logging {
    private final String table;
    private final String connection;
    private final int threadNum;
    private final long timeoutMs;
    private final Seq<Object> retryDelayMs;
    private final BlockingQueue<Seq<DynamicTableEntity>> queue;
    private final AtomicBoolean thereWillBeMoreData;
    private final AtomicBoolean threadFailed;
    private final ExecutorService threadPool;
    private Seq<Future<Object>> futures;
    private final Logger com$coxautodata$waimak$log$Logging$$log;

    public String logName() {
        return Logging.logName$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public Logger com$coxautodata$waimak$log$Logging$$log() {
        return this.com$coxautodata$waimak$log$Logging$$log;
    }

    public final void com$coxautodata$waimak$log$Logging$_setter_$com$coxautodata$waimak$log$Logging$$log_$eq(Logger logger) {
        this.com$coxautodata$waimak$log$Logging$$log = logger;
    }

    public String table() {
        return this.table;
    }

    public String connection() {
        return this.connection;
    }

    public int threadNum() {
        return this.threadNum;
    }

    public long timeoutMs() {
        return this.timeoutMs;
    }

    public Seq<Object> retryDelayMs() {
        return this.retryDelayMs;
    }

    public BlockingQueue<Seq<DynamicTableEntity>> queue() {
        return this.queue;
    }

    public AtomicBoolean thereWillBeMoreData() {
        return this.thereWillBeMoreData;
    }

    public AtomicBoolean threadFailed() {
        return this.threadFailed;
    }

    public ExecutorService threadPool() {
        return this.threadPool;
    }

    public Seq<Future<Object>> futures() {
        return this.futures;
    }

    public void futures_$eq(Seq<Future<Object>> seq) {
        this.futures = seq;
    }

    public void run() {
        ExecutionContextExecutor fromExecutor = ExecutionContext$.MODULE$.fromExecutor(threadPool());
        futures_$eq((Seq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), threadNum()).map(obj -> {
            return $anonfun$run$1(this, fromExecutor, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom()));
    }

    public <T> T retry(Function0<Future<T>> function0, long j, Seq<Object> seq) {
        Success recover = Try$.MODULE$.apply(() -> {
            return Await$.MODULE$.result((Awaitable) function0.apply(), Duration$.MODULE$.apply(j, TimeUnit.MILLISECONDS));
        }).recover(new AzureTableMultiWriter$$anonfun$1(this, function0, j, seq));
        if (recover instanceof Success) {
            return (T) recover.value();
        }
        if (!(recover instanceof Failure)) {
            throw new MatchError(recover);
        }
        Throwable exception = ((Failure) recover).exception();
        threadFailed().set(true);
        throw exception;
    }

    public int finish() {
        thereWillBeMoreData().set(false);
        int unboxToInt = BoxesRunTime.unboxToInt(((TraversableOnce) futures().map(future -> {
            return BoxesRunTime.boxToInteger($anonfun$finish$1(future));
        }, Seq$.MODULE$.canBuildFrom())).sum(Numeric$IntIsIntegral$.MODULE$));
        threadPool().shutdown();
        return unboxToInt;
    }

    public static final /* synthetic */ Future $anonfun$run$1(AzureTableMultiWriter azureTableMultiWriter, ExecutionContextExecutor executionContextExecutor, int i) {
        return Future$.MODULE$.apply(() -> {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
            ExecutionContextExecutor fromExecutor = ExecutionContext$.MODULE$.fromExecutor(newFixedThreadPool);
            CloudTable tableReference = CloudStorageAccount.parse(azureTableMultiWriter.connection()).createCloudTableClient().getTableReference(azureTableMultiWriter.table());
            int i2 = 0;
            while (true) {
                if ((azureTableMultiWriter.queue().size() > 0 || azureTableMultiWriter.thereWillBeMoreData().get()) && !azureTableMultiWriter.threadFailed().get()) {
                    Seq<DynamicTableEntity> poll = azureTableMultiWriter.queue().poll(1L, TimeUnit.SECONDS);
                    if (poll != null) {
                        i2 += poll.size();
                        TableBatchOperation tableBatchOperation = (TableBatchOperation) poll.foldLeft(new TableBatchOperation(), (tableBatchOperation2, dynamicTableEntity) -> {
                            tableBatchOperation2.insertOrReplace(dynamicTableEntity);
                            return tableBatchOperation2;
                        });
                        Function0 function0 = () -> {
                            return Future$.MODULE$.apply(() -> {
                                return tableReference.execute(tableBatchOperation);
                            }, fromExecutor);
                        };
                        Failure apply = Try$.MODULE$.apply(() -> {
                            return (ArrayList) azureTableMultiWriter.retry(function0, azureTableMultiWriter.timeoutMs(), azureTableMultiWriter.retryDelayMs());
                        });
                        if (!(apply instanceof Success)) {
                            if (!(apply instanceof Failure)) {
                                throw new MatchError(apply);
                            }
                            Throwable exception = apply.exception();
                            newFixedThreadPool.shutdown();
                            throw exception;
                        }
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    }
                }
            }
            newFixedThreadPool.shutdown();
            return i2;
        }, executionContextExecutor);
    }

    public static final /* synthetic */ int $anonfun$finish$1(Future future) {
        return BoxesRunTime.unboxToInt(Await$.MODULE$.result(future, Duration$.MODULE$.Inf()));
    }

    public AzureTableMultiWriter(String str, String str2, int i, long j, Seq<Object> seq) {
        this.table = str;
        this.connection = str2;
        this.threadNum = i;
        this.timeoutMs = j;
        this.retryDelayMs = seq;
        Logging.$init$(this);
        this.queue = new LinkedBlockingQueue(i * 2);
        this.thereWillBeMoreData = new AtomicBoolean(true);
        this.threadFailed = new AtomicBoolean(false);
        this.threadPool = Executors.newFixedThreadPool(i);
    }
}
