package com.blacklocus.jres;

import com.blacklocus.jres.request.JresBulkable;
import com.blacklocus.jres.request.bulk.JresBulk;
import com.blacklocus.jres.response.bulk.JresBulkItemResult;
import com.blacklocus.jres.util.DaemonThreadFactory;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/blacklocus/jres/JresBulkRequestor.class */
public class JresBulkRequestor implements Runnable, Closeable {
    private static final Logger LOG;
    public static final int POLLING_LENIENCY_SECONDS = 5;
    public static final int CLOSING_LENIENCY_SECONDS = 60;
    private final int batchSize;
    private final int sleepIntervalMs;
    private final String targetIndex;
    private final String targetType;
    private final Jres jres;
    private final Integer numThreads;
    private final ExecutorService executorService;
    private final List<Future<?>> indexerWorkerFutures;
    private final BlockingQueue<FuturedDocument> q;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/blacklocus/jres/JresBulkRequestor$FuturedDocument.class */
    public static class FuturedDocument {
        final SettableFuture<JresBulkItemResult> future = SettableFuture.create();
        final JresBulkable bulkable;

        FuturedDocument(JresBulkable jresBulkable) {
            this.bulkable = jresBulkable;
        }
    }

    public JresBulkRequestor(int i, int i2, int i3, Jres jres) {
        this(i, i2, i3, null, null, jres);
    }

    public JresBulkRequestor(int i, int i2, int i3, @Nullable String str, Jres jres) {
        this(i, i2, i3, str, null, jres);
    }

    public JresBulkRequestor(int i, int i2, int i3, @Nullable String str, @Nullable String str2, Jres jres) {
        this.indexerWorkerFutures = new ArrayList();
        this.batchSize = i;
        this.sleepIntervalMs = i2;
        this.numThreads = Integer.valueOf(i3);
        this.targetIndex = str;
        this.targetType = str2;
        this.jres = jres;
        this.executorService = new ThreadPoolExecutor(2, 2, 1L, TimeUnit.SECONDS, new SynchronousQueue(), new DaemonThreadFactory());
        this.q = new SynchronousQueue(true);
    }

    public JresBulkRequestor start() {
        for (int i = 0; i < this.numThreads.intValue(); i++) {
            this.indexerWorkerFutures.add(this.executorService.submit(this));
        }
        return this;
    }

    public Future<?> put(JresBulkable jresBulkable) {
        FuturedDocument futuredDocument = new FuturedDocument(jresBulkable);
        try {
            this.q.put(futuredDocument);
            return futuredDocument.future;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public Future<?> offer(JresBulkable jresBulkable, long j, TimeUnit timeUnit) {
        FuturedDocument futuredDocument = new FuturedDocument(jresBulkable);
        try {
            if (this.q.offer(futuredDocument, j, timeUnit)) {
                return futuredDocument.future;
            }
            return null;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                indexNextBatch();
            } catch (InterruptedException e) {
                LOG.error("Worker interrupted ungracefully. Shutting worker down.", 3);
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                LOG.error("error in indexer worker, last batch may have been lost", e2);
            }
        }
        LOG.debug("Worker interrupted. Shutting worker down.");
    }

    private void indexNextBatch() throws InterruptedException {
        FuturedDocument poll;
        ArrayList arrayList = new ArrayList(this.batchSize);
        ArrayList arrayList2 = new ArrayList(this.batchSize);
        int i = 0;
        while (i < this.batchSize && null != (poll = poll())) {
            arrayList2.add(poll.future);
            arrayList.add(poll.bulkable);
            i++;
        }
        if (arrayList.size() <= 0) {
            Thread.sleep(this.sleepIntervalMs);
            return;
        }
        if (!$assertionsDisabled && i <= 0) {
            throw new AssertionError();
        }
        LOG.info("Submitting bulk index of " + i + " products.");
        if (LOG.isDebugEnabled()) {
            LOG.debug(arrayList.toString());
        }
        ArrayList newArrayList = Lists.newArrayList(this.jres.quest(new JresBulk(this.targetIndex, this.targetType, arrayList)).getResults());
        if (!$assertionsDisabled && arrayList2.size() != newArrayList.size()) {
            throw new AssertionError();
        }
        for (int i2 = 0; i2 < newArrayList.size(); i2++) {
            SettableFuture settableFuture = (SettableFuture) arrayList2.get(i2);
            JresBulkItemResult jresBulkItemResult = (JresBulkItemResult) newArrayList.get(i2);
            if (jresBulkItemResult.getResult().hasError()) {
                settableFuture.setException(new RuntimeException(jresBulkItemResult.getResult().getError()));
            } else {
                settableFuture.set(jresBulkItemResult);
            }
        }
    }

    private FuturedDocument poll() {
        try {
            return this.q.poll(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOG.debug("Poll interrupted. Absorbing this exception. Main loop in this thread should be checking interrupted status and stop the worker in a moment.", e);
            Thread.currentThread().interrupt();
            return null;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        LOG.info("Shutting down BulkIndexers");
        while (!this.q.isEmpty()) {
            LOG.info("Waiting for work q to empty.");
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                LOG.warn("Interrupted while shutting down. You interrupted the interrupter! Ignoring.", e);
            }
        }
        try {
            Iterator<Future<?>> it = this.indexerWorkerFutures.iterator();
            while (it.hasNext()) {
                it.next().cancel(true);
            }
            this.executorService.shutdown();
            this.executorService.awaitTermination(60L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            LOG.warn("Failed to stop worker threads. Pool may fail to shutdown.", e2);
        }
        LOG.info("IndexerWorkers shut down.");
    }

    static {
        $assertionsDisabled = !JresBulkRequestor.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(JresBulkRequestor.class);
    }
}
