package org.fcrepo.persistence.ocfl.impl;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.fcrepo.common.db.DbTransactionExecutor;
import org.fcrepo.config.OcflPropsConfig;
import org.fcrepo.kernel.api.Transaction;
import org.fcrepo.kernel.api.TransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/fcrepo/persistence/ocfl/impl/ReindexManager.class */
public class ReindexManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReindexManager.class);
    private static final long REPORTING_INTERVAL_SECS = 300;
    private final Iterator<String> ocflIter;
    private final Stream<String> ocflStream;
    private final ReindexService reindexService;
    private final long batchSize;
    private final boolean failOnError;
    private TransactionManager txManager;
    private DbTransactionExecutor dbTransactionExecutor;
    private Transaction transaction = null;
    private final List<ReindexWorker> workers = new ArrayList();
    private final AtomicInteger completedCount = new AtomicInteger(0);
    private final AtomicInteger errorCount = new AtomicInteger(0);

    public ReindexManager(Stream<String> stream, ReindexService reindexService, OcflPropsConfig ocflPropsConfig, TransactionManager transactionManager, DbTransactionExecutor dbTransactionExecutor) {
        this.ocflStream = stream;
        this.ocflIter = this.ocflStream.iterator();
        this.reindexService = reindexService;
        this.batchSize = ocflPropsConfig.getReindexBatchSize();
        this.failOnError = ocflPropsConfig.isReindexFailOnError();
        this.txManager = transactionManager;
        this.dbTransactionExecutor = dbTransactionExecutor;
        long reindexingThreads = ocflPropsConfig.getReindexingThreads();
        if (reindexingThreads < 1) {
            throw new IllegalStateException(String.format("Reindexing requires at least 1 thread. Found: %s", Long.valueOf(reindexingThreads)));
        }
        for (int i = 0; i < reindexingThreads; i++) {
            this.workers.add(new ReindexWorker("ReindexWorker-" + i, this, this.reindexService, this.txManager, this.dbTransactionExecutor, this.failOnError));
        }
    }

    public void start() throws InterruptedException {
        Thread startReporter = startReporter();
        try {
            try {
                this.workers.forEach((v0) -> {
                    v0.start();
                });
                Iterator<ReindexWorker> it = this.workers.iterator();
                while (it.hasNext()) {
                    it.next().join();
                }
                if (!this.failOnError || this.errorCount.get() == 0) {
                    indexMembership();
                } else {
                    LOGGER.error("Reindex did not complete successfully");
                }
            } catch (Exception e) {
                LOGGER.error("Error while rebuilding index", e);
                stop();
                throw e;
            }
        } finally {
            startReporter.interrupt();
        }
    }

    public void stop() {
        LOGGER.debug("Stop worker threads");
        this.workers.forEach((v0) -> {
            v0.stopThread();
        });
    }

    public synchronized List<String> getIds() {
        ArrayList arrayList = new ArrayList((int) this.batchSize);
        for (int i = 0; this.ocflIter.hasNext() && i < this.batchSize; i++) {
            arrayList.add(this.ocflIter.next());
        }
        return arrayList;
    }

    public void updateComplete(int i, int i2) {
        this.completedCount.addAndGet(i);
        this.errorCount.addAndGet(i2);
    }

    public int getCompletedCount() {
        return this.completedCount.get();
    }

    public int getErrorCount() {
        return this.errorCount.get();
    }

    private void indexMembership() {
        Transaction transaction = transaction();
        LOGGER.info("Starting membership indexing");
        this.reindexService.indexMembership(transaction);
        transaction.commit();
        LOGGER.debug("Completed membership indexing");
    }

    public void shutdown() {
        this.ocflStream.close();
    }

    private Thread startReporter() {
        Thread thread = new Thread(() -> {
            Instant now = Instant.now();
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(REPORTING_INTERVAL_SECS);
                    int i = this.completedCount.get();
                    int i2 = this.errorCount.get();
                    Duration between = Duration.between(now, Instant.now());
                    LOGGER.info("Index rebuild progress: Complete: {}; Errored: {}; Time: {}; Rate: {}/s", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), getDurationMessage(between), Long.valueOf((i + i2) / between.getSeconds())});
                } catch (InterruptedException e) {
                    return;
                }
            }
        });
        thread.start();
        return thread;
    }

    private String getDurationMessage(Duration duration) {
        String format = String.format("%d secs", Integer.valueOf(duration.toSecondsPart()));
        if (duration.getSeconds() > 60) {
            format = String.format("%d mins, ", Integer.valueOf(duration.toMinutesPart())) + format;
        }
        if (duration.getSeconds() > 3600) {
            format = String.format("%d hours, ", Long.valueOf(duration.getSeconds() / 3600)) + format;
        }
        return format;
    }

    private Transaction transaction() {
        if (this.transaction == null) {
            this.transaction = this.txManager.create();
            this.transaction.setShortLived(true);
        }
        return this.transaction;
    }
}
