package org.killbill.queue;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.joda.time.DateTime;
import org.killbill.CreatorName;
import org.killbill.bus.dao.PersistentBusSqlDao;
import org.killbill.clock.Clock;
import org.killbill.commons.jdbi.notification.DatabaseTransactionEvent;
import org.killbill.commons.jdbi.notification.DatabaseTransactionEventType;
import org.killbill.commons.jdbi.notification.DatabaseTransactionNotificationApi;
import org.killbill.queue.DBBackedQueue;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dao.QueueSqlDao;
import org.osgi.service.repository.ContentNamespace;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/killbill/queue/DBBackedQueueWithInflightQueue.class */
public class DBBackedQueueWithInflightQueue<T extends EventEntryModelDao> extends DBBackedQueue<T> {
    private static final int MAX_FETCHED_RECORDS_ID = 1000;
    private static final long INFLIGHT_POLLING_TIMEOUT_MSEC = 100;
    private final LinkedBlockingQueue<Long> inflightEvents;
    private final DatabaseTransactionNotificationApi databaseTransactionNotificationApi;
    private final int queueId;
    private final TransientInflightQRowIdCache transientInflightQRowIdCache;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DBBackedQueueWithInflightQueue.class);
    private static final AtomicInteger QUEUE_ID_CNT = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/killbill/queue/DBBackedQueueWithInflightQueue$TransientInflightQRowIdCache.class */
    public static class TransientInflightQRowIdCache {
        private final ThreadLocal<RowRef> rowRefThreadLocal;
        private final int queueId;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/killbill/queue/DBBackedQueueWithInflightQueue$TransientInflightQRowIdCache$RowRef.class */
        public final class RowRef {
            private final int queueId;
            private final List<Long> rowIds = new ArrayList();

            public RowRef(int i) {
                this.queueId = i;
            }

            public void addRowId(long j) {
                this.rowIds.add(Long.valueOf(j));
            }

            public Iterator<Long> iterator() {
                return this.rowIds.iterator();
            }
        }

        private TransientInflightQRowIdCache(int i) {
            this.rowRefThreadLocal = new ThreadLocal<>();
            this.queueId = i;
        }

        public boolean isValid() {
            RowRef rowRef = this.rowRefThreadLocal.get();
            return rowRef != null && rowRef.queueId == this.queueId;
        }

        public void addRowId(Long l) {
            RowRef rowRef = this.rowRefThreadLocal.get();
            if (rowRef == null) {
                rowRef = new RowRef(this.queueId);
                this.rowRefThreadLocal.set(rowRef);
            }
            rowRef.addRowId(l.longValue());
        }

        public void reset() {
            this.rowRefThreadLocal.remove();
        }

        public Iterator<Long> iterator() {
            RowRef rowRef = this.rowRefThreadLocal.get();
            Preconditions.checkNotNull(rowRef);
            return rowRef.iterator();
        }
    }

    public DBBackedQueueWithInflightQueue(Clock clock, IDBI idbi, Class<? extends QueueSqlDao<T>> cls, PersistentQueueConfig persistentQueueConfig, String str, MetricRegistry metricRegistry, DatabaseTransactionNotificationApi databaseTransactionNotificationApi) {
        super(clock, idbi, cls, persistentQueueConfig, str, metricRegistry);
        Preconditions.checkArgument(persistentQueueConfig.getMinInFlightEntries() <= persistentQueueConfig.getMaxInFlightEntries());
        this.queueId = QUEUE_ID_CNT.incrementAndGet();
        this.inflightEvents = new LinkedBlockingQueue<>();
        this.databaseTransactionNotificationApi = databaseTransactionNotificationApi;
        databaseTransactionNotificationApi.registerForNotification(this);
        metricRegistry.register(MetricRegistry.name((Class<?>) DBBackedQueueWithInflightQueue.class, str, "inflightQ", ContentNamespace.CAPABILITY_SIZE_ATTRIBUTE), new Gauge<Integer>() { // from class: org.killbill.queue.DBBackedQueueWithInflightQueue.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.codahale.metrics.Gauge
            public Integer getValue() {
                return Integer.valueOf(DBBackedQueueWithInflightQueue.this.inflightEvents.size());
            }
        });
        this.transientInflightQRowIdCache = new TransientInflightQRowIdCache(this.queueId);
    }

    @Override // org.killbill.queue.DBBackedQueue
    public void initialize() {
        initializeInflightQueue();
        log.info("{} Initialized with queueId={}, mode={}", this.DB_QUEUE_LOG_ID, Integer.valueOf(this.queueId), this.config.getPersistentQueueMode());
    }

    @Override // org.killbill.queue.DBBackedQueue
    public void close() {
        this.databaseTransactionNotificationApi.unregisterForNotification(this);
    }

    @Override // org.killbill.queue.DBBackedQueue
    public void insertEntryFromTransaction(QueueSqlDao<T> queueSqlDao, T t) {
        Long safeInsertEntry = safeInsertEntry(queueSqlDao, t);
        if (safeInsertEntry.longValue() == 0) {
            log.warn("{} Failed to insert entry, lastInsertedId={}", this.DB_QUEUE_LOG_ID, safeInsertEntry);
        } else {
            this.transientInflightQRowIdCache.addRowId(safeInsertEntry);
        }
    }

    private long pollEntriesFromInflightQ(List<Long> list) {
        long j = 0;
        this.inflightEvents.drainTo(list, this.config.getMaxInFlightEntries());
        if (list.isEmpty()) {
            try {
                long nanoTime = System.nanoTime();
                Long poll = this.inflightEvents.poll(100L, TimeUnit.MILLISECONDS);
                j = System.nanoTime() - nanoTime;
                if (poll != null) {
                    list.add(poll);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.warn("{} Got interrupted", this.DB_QUEUE_LOG_ID);
                return 0L;
            }
        }
        return j;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v14, types: [java.util.List] */
    @Override // org.killbill.queue.DBBackedQueue
    public DBBackedQueue.ReadyEntriesWithMetrics<T> getReadyEntries() {
        long nanoTime = System.nanoTime();
        long j = 0;
        final ArrayList arrayList = new ArrayList(this.config.getMaxInFlightEntries());
        do {
            j += pollEntriesFromInflightQ(arrayList);
            if (arrayList.size() >= this.config.getMinInFlightEntries()) {
                break;
            }
        } while (j < 100);
        ImmutableList of = ImmutableList.of();
        if (!arrayList.isEmpty()) {
            log.debug("{} fetchReadyEntriesFromIds: {}", this.DB_QUEUE_LOG_ID, arrayList);
            of = (List) executeQuery(new DBBackedQueue.Query<List<T>, QueueSqlDao<T>>() { // from class: org.killbill.queue.DBBackedQueueWithInflightQueue.2
                @Override // org.killbill.queue.DBBackedQueue.Query
                public List<T> execute(QueueSqlDao<T> queueSqlDao) {
                    long nanoTime2 = System.nanoTime();
                    List<T> entriesFromIds = queueSqlDao.getEntriesFromIds(arrayList, DBBackedQueueWithInflightQueue.this.config.getTableName());
                    DBBackedQueueWithInflightQueue.this.rawGetEntriesTime.update(System.nanoTime() - nanoTime2, TimeUnit.NANOSECONDS);
                    return entriesFromIds;
                }
            });
        }
        return new DBBackedQueue.ReadyEntriesWithMetrics<>(of, (System.nanoTime() - nanoTime) - j);
    }

    @Override // org.killbill.queue.DBBackedQueue
    public void updateOnError(final T t) {
        executeTransaction(new Transaction<Void, QueueSqlDao<T>>() { // from class: org.killbill.queue.DBBackedQueueWithInflightQueue.3
            @Override // org.skife.jdbi.v2.Transaction
            public Void inTransaction(QueueSqlDao<T> queueSqlDao, TransactionStatus transactionStatus) throws Exception {
                queueSqlDao.updateOnError(t.getRecordId(), DBBackedQueueWithInflightQueue.this.clock.getUTCNow().toDate(), t.getErrorCount(), DBBackedQueueWithInflightQueue.this.config.getTableName());
                DBBackedQueueWithInflightQueue.this.transientInflightQRowIdCache.addRowId(t.getRecordId());
                return null;
            }
        });
    }

    @Override // org.killbill.queue.DBBackedQueue
    protected void insertReapedEntriesFromTransaction(QueueSqlDao<T> queueSqlDao, List<T> list, DateTime dateTime) {
        for (T t : list) {
            t.setCreatedDate(dateTime);
            t.setProcessingState(PersistentQueueEntryLifecycleState.AVAILABLE);
            t.setCreatingOwner(CreatorName.get());
            t.setProcessingOwner(null);
            insertEntryFromTransaction(queueSqlDao, t);
        }
    }

    @AllowConcurrentEvents
    @Subscribe
    public void handleDatabaseTransactionEvent(DatabaseTransactionEvent databaseTransactionEvent) {
        if (this.transientInflightQRowIdCache == null || !this.transientInflightQRowIdCache.isValid()) {
            return;
        }
        if (databaseTransactionEvent.getType() == DatabaseTransactionEventType.ROLLBACK) {
            this.transientInflightQRowIdCache.reset();
            return;
        }
        try {
            Iterator<Long> it = this.transientInflightQRowIdCache.iterator();
            while (it.hasNext()) {
                Long next = it.next();
                if (this.inflightEvents.offer(next)) {
                    log.debug("{} Inserting entry {} into inflightQ", this.DB_QUEUE_LOG_ID, next);
                } else {
                    log.warn("{} Inflight Q overflowed....", this.DB_QUEUE_LOG_ID, next);
                }
            }
        } finally {
            this.transientInflightQRowIdCache.reset();
        }
    }

    @VisibleForTesting
    public int getInflightQSize() {
        return this.inflightEvents.size();
    }

    private void initializeInflightQueue() {
        this.inflightEvents.clear();
        int i = 0;
        long j = -1;
        while (true) {
            List<Long> readyEntryIds = ((PersistentBusSqlDao) this.sqlDao).getReadyEntryIds(this.clock.getUTCNow().toDate(), j, 1000, CreatorName.get(), this.config.getTableName());
            if (readyEntryIds.isEmpty()) {
                break;
            }
            this.inflightEvents.addAll(readyEntryIds);
            i += readyEntryIds.size();
            if (readyEntryIds.size() < 1000) {
                break;
            } else {
                j = readyEntryIds.get(readyEntryIds.size() - 1).longValue() + 1;
            }
        }
        log.info("{} Inserting {} entries into inflightQ during initialization", this.DB_QUEUE_LOG_ID, Integer.valueOf(i));
    }
}
