package io.fluo.core.impl;

import io.fluo.accumulo.iterators.NotificationSampleIterator;
import io.fluo.accumulo.util.ColumnConstants;
import io.fluo.api.client.TransactionBase;
import io.fluo.api.config.ObserverConfiguration;
import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
import io.fluo.api.data.RowColumn;
import io.fluo.api.data.Span;
import io.fluo.api.exceptions.CommitException;
import io.fluo.api.observer.Observer;
import io.fluo.core.exceptions.AlreadyAcknowledgedException;
import io.fluo.core.impl.RandomTabletChooser;
import io.fluo.core.util.ByteUtil;
import io.fluo.core.util.SpanUtil;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluo/core/impl/Worker.class */
public class Worker {
    private static long MAX_SLEEP_TIME = 300000;
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    private final Environment env;
    private final Random rand = new Random();
    private final RandomTabletChooser tabletChooser;

    public Worker(Environment environment, RandomTabletChooser randomTabletChooser) throws Exception {
        this.env = environment;
        this.tabletChooser = randomTabletChooser;
    }

    private Span pickRandomRow(Scanner scanner, Bytes bytes, Bytes bytes2) {
        scanner.clearScanIterators();
        scanner.clearColumns();
        scanner.addScanIterator(new IteratorSetting(20, "ver", VersioningIterator.class));
        IteratorSetting iteratorSetting = new IteratorSetting(100, NotificationSampleIterator.class);
        NotificationSampleIterator.setSampleSize(iteratorSetting, 256);
        scanner.addScanIterator(iteratorSetting);
        scanner.fetchColumnFamily(ByteUtil.toText(ColumnConstants.NOTIFY_CF));
        scanner.setRange(SpanUtil.toRange(new Span(bytes, false, bytes2, true)));
        ArrayList arrayList = new ArrayList();
        Iterator it = scanner.iterator();
        while (it.hasNext()) {
            arrayList.add(ByteUtil.toBytes(((Key) ((Map.Entry) it.next()).getKey()).getRow()));
        }
        if (arrayList.size() == 0) {
            return null;
        }
        return new Span((Bytes) arrayList.get(this.rand.nextInt(arrayList.size())), true, bytes2, true);
    }

    private Span pickRandomStartPoint(Scanner scanner) throws Exception {
        RandomTabletChooser.TabletInfo randomTablet = this.tabletChooser.getRandomTablet();
        if (randomTablet == null) {
            return null;
        }
        try {
            if (randomTablet.retryTime > System.currentTimeMillis()) {
                return null;
            }
            Span pickRandomRow = pickRandomRow(scanner, randomTablet.start == null ? Bytes.EMPTY : ByteUtil.toBytes(randomTablet.start), randomTablet.end == null ? Bytes.EMPTY : ByteUtil.toBytes(randomTablet.end));
            if (pickRandomRow == null) {
                randomTablet.retryTime = randomTablet.sleepTime + System.currentTimeMillis();
                if (randomTablet.sleepTime < MAX_SLEEP_TIME) {
                    randomTablet.sleepTime += (long) (randomTablet.sleepTime * Math.random());
                }
            } else {
                randomTablet.retryTime = 0L;
                randomTablet.sleepTime = 0L;
            }
            randomTablet.lock.unlock();
            return pickRandomRow;
        } finally {
            randomTablet.lock.unlock();
        }
    }

    /* JADX WARN: Finally extract failed */
    public long processUpdates(Map<Column, Observer> map) throws Exception {
        TransactionBase transactionBase;
        Scanner<Map.Entry> createScanner = this.env.getConnector().createScanner(this.env.getTable(), this.env.getAuthorizations());
        Span pickRandomStartPoint = pickRandomStartPoint(createScanner);
        if (pickRandomStartPoint == null) {
            return 0L;
        }
        createScanner.clearColumns();
        createScanner.clearScanIterators();
        createScanner.addScanIterator(new IteratorSetting(20, "ver", VersioningIterator.class));
        createScanner.fetchColumnFamily(ByteUtil.toText(ColumnConstants.NOTIFY_CF));
        createScanner.setRange(SpanUtil.toRange(pickRandomStartPoint));
        long j = 0;
        boolean z = false;
        for (Map.Entry entry : createScanner) {
            List split = Bytes.split(Bytes.wrap(((Key) entry.getKey()).getColumnQualifierData().toArray()));
            Column column = new Column((Bytes) split.get(0), (Bytes) split.get(1));
            column.setVisibility(ByteUtil.toBytes(((Key) entry.getKey()).getColumnVisibilityData()));
            Observer observer = getObserver(map, column);
            Bytes bytes = ByteUtil.toBytes(((Key) entry.getKey()).getRowData());
            if (!z) {
                log.debug("thread id: " + Thread.currentThread().getId() + "  row :" + bytes);
                z = true;
            }
            TransactionBase transactionBase2 = null;
            while (true) {
                try {
                    try {
                        try {
                            try {
                                try {
                                    transactionBase2 = new TransactionImpl(this.env, bytes, column);
                                    transactionBase = transactionBase2;
                                    break;
                                } catch (AlreadyAcknowledgedException e) {
                                    long j2 = j;
                                    if (transactionBase2 != null && TxLogger.isLoggingEnabled()) {
                                        TxLogger.logTx("AACKED", observer.getClass().getSimpleName(), transactionBase2.getStats(), bytes + ":" + column);
                                    }
                                    if (transactionBase2 != null) {
                                        transactionBase2.close();
                                    }
                                    return j2;
                                }
                            } catch (Throwable th) {
                                if (transactionBase2 != null && TxLogger.isLoggingEnabled()) {
                                    TxLogger.logTx("FAILED", observer.getClass().getSimpleName(), transactionBase2.getStats(), bytes + ":" + column);
                                }
                                throw th;
                            }
                        } catch (Exception e2) {
                            RowColumn rowColumn = SpanUtil.toRowColumn((Key) entry.getKey());
                            createScanner.setRange(SpanUtil.toRange(new Span(rowColumn, true, rowColumn, true)));
                            if (createScanner.iterator().hasNext()) {
                                throw e2;
                            }
                            log.debug("Failure processing notification concurrently ", e2);
                            long j3 = j;
                            if (transactionBase2 != null && TxLogger.isLoggingEnabled()) {
                                TxLogger.logTx("FAILED", observer.getClass().getSimpleName(), transactionBase2.getStats(), bytes + ":" + column);
                            }
                            if (transactionBase2 != null) {
                                transactionBase2.close();
                            }
                            return j3;
                        }
                    } catch (CommitException e3) {
                        if (transactionBase2 != null && TxLogger.isLoggingEnabled()) {
                            TxLogger.logTx("FAILED", observer.getClass().getSimpleName(), transactionBase2.getStats(), bytes + ":" + column);
                        }
                    }
                } catch (Throwable th2) {
                    if (transactionBase2 != null) {
                        transactionBase2.close();
                    }
                    throw th2;
                }
            }
            if (TracingTransaction.isTracingEnabled()) {
                transactionBase = new TracingTransaction(transactionBase);
            }
            observer.process(transactionBase, bytes, column);
            transactionBase2.commit();
            if (transactionBase2 != null && TxLogger.isLoggingEnabled()) {
                TxLogger.logTx("COMMITTED", observer.getClass().getSimpleName(), transactionBase2.getStats(), bytes + ":" + column);
            }
            if (transactionBase2 != null) {
                transactionBase2.close();
            }
            j++;
        }
        return j;
    }

    private Observer getObserver(Map<Column, Observer> map, Column column) throws Exception {
        Observer observer = map.get(column);
        if (observer == null) {
            ObserverConfiguration observerConfiguration = this.env.getObservers().get(column);
            if (observerConfiguration == null) {
                observerConfiguration = this.env.getWeakObservers().get(column);
            }
            if (observerConfiguration != null) {
                observer = (Observer) Class.forName(observerConfiguration.getClassName()).asSubclass(Observer.class).newInstance();
                observer.init(observerConfiguration.getParameters());
                if (!observer.getObservedColumn().getColumn().equals(column)) {
                    throw new IllegalStateException("Mismatch between configured column and class column " + observerConfiguration.getClassName() + " " + column + " " + observer.getObservedColumn().getColumn());
                }
                map.put(column, observer);
            }
        }
        return observer;
    }
}
