package org.killbill.queue;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.killbill.CreatorName;
import org.killbill.clock.Clock;
import org.killbill.commons.jdbi.notification.DatabaseTransactionEvent;
import org.killbill.commons.jdbi.notification.DatabaseTransactionEventType;
import org.killbill.commons.jdbi.notification.DatabaseTransactionNotificationApi;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dao.QueueSqlDao;
import org.osgi.service.repository.ContentNamespace;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/killbill/queue/DBBackedQueue.class */
public class DBBackedQueue<T extends EventEntryModelDao> implements Observer {
    private static final int RATIO_INFLIGHT_SIZE_TO_REOPEN_Q_FOR_WRITE = 10;
    private static final long INFLIGHT_POLLING_TIMEOUT_MSEC = 50;
    private static final long POLLING_ORPHANS_MSEC = 300000;
    private final String DB_QUEUE_LOG_ID;
    private final QueueSqlDao<T> sqlDao;
    private final Clock clock;
    private final PersistentQueueConfig config;
    private final boolean useInflightQueue;
    private final LinkedBlockingQueue<Long> inflightEvents;
    private final AtomicBoolean isQueueOpenForWrite;
    private final AtomicBoolean isQueueOpenForRead;
    private final int thresholdToReopenQForWrite;
    private final Counter totalInflightInsert;
    private final Counter totalInflightFetched;
    private final Counter totalInsert;
    private final Counter totalFetched;
    private final Counter totalClaimed;
    private final Counter totalProcessedFirstFailures;
    private final Counter totalProcessedSuccess;
    private final Counter totalProcessedAborted;
    private final AtomicLong lastPollingOrphanTime;
    private final AtomicBoolean isRunningOrphanQuery;
    private final AtomicLong lowestOrphanEntry;
    private final int queueId = QUEUE_ID_CNT.incrementAndGet();
    private final TransientInflightQRowIdCache transientInflightQRowIdCache;
    private static final Logger log = LoggerFactory.getLogger(DBBackedQueue.class);
    private static final AtomicInteger QUEUE_ID_CNT = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/killbill/queue/DBBackedQueue$TransientInflightQRowIdCache.class */
    public static class TransientInflightQRowIdCache {
        private final ThreadLocal<RowRef> rowRefThreadLocal;
        private final int queueId;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/killbill/queue/DBBackedQueue$TransientInflightQRowIdCache$RowRef.class */
        public final class RowRef {
            private final int queueId;
            private final List<Long> rowIds = new ArrayList();

            public RowRef(int i) {
                this.queueId = i;
            }

            public void addRowId(long j) {
                this.rowIds.add(Long.valueOf(j));
            }

            public Iterator<Long> iterator() {
                return this.rowIds.iterator();
            }
        }

        private TransientInflightQRowIdCache(int i) {
            this.rowRefThreadLocal = new ThreadLocal<>();
            this.queueId = i;
        }

        public boolean isValid() {
            RowRef rowRef = this.rowRefThreadLocal.get();
            return rowRef != null && rowRef.queueId == this.queueId;
        }

        public void addRowId(Long l) {
            RowRef rowRef = this.rowRefThreadLocal.get();
            if (rowRef == null) {
                rowRef = new RowRef(this.queueId);
                this.rowRefThreadLocal.set(rowRef);
            }
            rowRef.addRowId(l.longValue());
        }

        public void reset() {
            this.rowRefThreadLocal.remove();
        }

        public Iterator<Long> iterator() {
            RowRef rowRef = this.rowRefThreadLocal.get();
            Preconditions.checkNotNull(rowRef);
            return rowRef.iterator();
        }
    }

    public DBBackedQueue(Clock clock, QueueSqlDao<T> queueSqlDao, PersistentQueueConfig persistentQueueConfig, String str, MetricRegistry metricRegistry, @Nullable DatabaseTransactionNotificationApi databaseTransactionNotificationApi) {
        this.useInflightQueue = persistentQueueConfig.isUsingInflightQueue();
        this.sqlDao = queueSqlDao;
        this.config = persistentQueueConfig;
        this.inflightEvents = this.useInflightQueue ? new LinkedBlockingQueue<>(persistentQueueConfig.getQueueCapacity()) : null;
        this.isQueueOpenForWrite = new AtomicBoolean(false);
        this.isQueueOpenForRead = new AtomicBoolean(false);
        this.clock = clock;
        if (this.useInflightQueue && databaseTransactionNotificationApi != null) {
            databaseTransactionNotificationApi.registerForNotification(this);
        }
        this.totalInflightInsert = metricRegistry.counter(MetricRegistry.name((Class<?>) DBBackedQueue.class, str, "totalInflightInsert"));
        this.totalInflightFetched = metricRegistry.counter(MetricRegistry.name((Class<?>) DBBackedQueue.class, str, "totalInflightFetched"));
        this.totalInsert = metricRegistry.counter(MetricRegistry.name((Class<?>) DBBackedQueue.class, str, "totalInsert"));
        this.totalFetched = metricRegistry.counter(MetricRegistry.name((Class<?>) DBBackedQueue.class, str, "totalFetched"));
        this.totalClaimed = metricRegistry.counter(MetricRegistry.name((Class<?>) DBBackedQueue.class, str, "totalClaimed"));
        this.totalProcessedSuccess = metricRegistry.counter(MetricRegistry.name((Class<?>) DBBackedQueue.class, str, "totalProcessedSuccess"));
        this.totalProcessedFirstFailures = metricRegistry.counter(MetricRegistry.name((Class<?>) DBBackedQueue.class, str, "totalProcessedFirstFailures"));
        this.totalProcessedAborted = metricRegistry.counter(MetricRegistry.name((Class<?>) DBBackedQueue.class, str, "totalProcessedAborted"));
        metricRegistry.register(MetricRegistry.name((Class<?>) DBBackedQueue.class, str, "inflightQ", ContentNamespace.CAPABILITY_SIZE_ATTRIBUTE), new Gauge<Integer>() { // from class: org.killbill.queue.DBBackedQueue.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.codahale.metrics.Gauge
            public Integer getValue() {
                return Integer.valueOf(DBBackedQueue.this.useInflightQueue ? DBBackedQueue.this.inflightEvents.size() : 0);
            }
        });
        metricRegistry.register(MetricRegistry.name((Class<?>) DBBackedQueue.class, str, "inflightQ", "isOpenForRead"), new Gauge<Boolean>() { // from class: org.killbill.queue.DBBackedQueue.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.codahale.metrics.Gauge
            public Boolean getValue() {
                return Boolean.valueOf(DBBackedQueue.this.isQueueOpenForRead.get());
            }
        });
        metricRegistry.register(MetricRegistry.name((Class<?>) DBBackedQueue.class, str, "inflightQ", "isOpenForWrite"), new Gauge<Boolean>() { // from class: org.killbill.queue.DBBackedQueue.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.codahale.metrics.Gauge
            public Boolean getValue() {
                return Boolean.valueOf(DBBackedQueue.this.isQueueOpenForWrite.get());
            }
        });
        metricRegistry.register(MetricRegistry.name((Class<?>) DBBackedQueue.class, str, "inflightQ", "lowestOrphanEntry"), new Gauge<Long>() { // from class: org.killbill.queue.DBBackedQueue.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.codahale.metrics.Gauge
            public Long getValue() {
                return Long.valueOf(DBBackedQueue.this.lowestOrphanEntry.get());
            }
        });
        this.thresholdToReopenQForWrite = persistentQueueConfig.getQueueCapacity() / 10;
        this.lastPollingOrphanTime = new AtomicLong(clock.getUTCNow().getMillis());
        this.isRunningOrphanQuery = new AtomicBoolean(false);
        this.lowestOrphanEntry = new AtomicLong(-1L);
        this.transientInflightQRowIdCache = this.useInflightQueue ? new TransientInflightQRowIdCache(this.queueId) : null;
        this.DB_QUEUE_LOG_ID = "DBBackedQueue-" + str + ": ";
    }

    public void initialize() {
        if (this.useInflightQueue) {
            this.inflightEvents.clear();
            List<T> fetchReadyEntries = fetchReadyEntries(this.thresholdToReopenQForWrite);
            if (fetchReadyEntries.size() == 0) {
                this.isQueueOpenForRead.set(true);
                this.isQueueOpenForWrite.set(true);
            } else {
                this.isQueueOpenForRead.set(false);
                this.isQueueOpenForWrite.set(fetchReadyEntries.size() < this.thresholdToReopenQForWrite);
            }
        } else {
            this.isQueueOpenForRead.set(false);
            this.isQueueOpenForWrite.set(false);
        }
        this.totalInflightFetched.dec(this.totalInflightFetched.getCount());
        this.totalFetched.dec(this.totalFetched.getCount());
        this.totalInflightInsert.dec(this.totalInflightInsert.getCount());
        this.totalInsert.dec(this.totalInsert.getCount());
        this.totalClaimed.dec(this.totalClaimed.getCount());
        this.totalProcessedSuccess.dec(this.totalProcessedSuccess.getCount());
        this.totalProcessedFirstFailures.dec(this.totalProcessedFirstFailures.getCount());
        this.totalProcessedAborted.dec(this.totalProcessedAborted.getCount());
        log.info(this.DB_QUEUE_LOG_ID + "Initialized with useInflightQueue = " + this.useInflightQueue + ", queueId = " + this.queueId + ", isSticky = " + this.config.isSticky() + ", isQueueOpenForWrite = " + this.isQueueOpenForWrite.get() + ", isQueueOpenForRead = " + this.isQueueOpenForRead.get());
    }

    public void insertEntry(final T t) {
        this.sqlDao.inTransaction(new Transaction<Void, QueueSqlDao<T>>() { // from class: org.killbill.queue.DBBackedQueue.5
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.skife.jdbi.v2.Transaction
            public Void inTransaction(QueueSqlDao<T> queueSqlDao, TransactionStatus transactionStatus) throws Exception {
                DBBackedQueue.this.insertEntryFromTransaction(queueSqlDao, t);
                return null;
            }
        });
    }

    public void insertEntryFromTransaction(QueueSqlDao<T> queueSqlDao, T t) {
        Long safeInsertEntry = safeInsertEntry(queueSqlDao, t);
        if (safeInsertEntry.longValue() == 0) {
            log.warn(this.DB_QUEUE_LOG_ID + "Failed to insert entry, lastInsertedId " + safeInsertEntry);
            return;
        }
        if (this.useInflightQueue && this.isQueueOpenForWrite.get()) {
            this.transientInflightQRowIdCache.addRowId(safeInsertEntry);
        }
        this.totalInsert.inc();
    }

    public synchronized List<T> getReadyEntries() {
        List<T> of = ImmutableList.of();
        if (!this.useInflightQueue) {
            List<T> fetchReadyEntries = fetchReadyEntries(this.config.getMaxEntriesClaimed());
            this.totalFetched.inc(fetchReadyEntries.size());
            if (fetchReadyEntries.size() > 0) {
                of = claimEntries(fetchReadyEntries);
            }
            return of;
        }
        if (this.isQueueOpenForRead.get()) {
            checkForOrphanEntries();
            List<T> fetchReadyEntriesFromIds = fetchReadyEntriesFromIds();
            if (fetchReadyEntriesFromIds.size() > 0) {
                this.totalInflightFetched.inc(fetchReadyEntriesFromIds.size());
                this.totalFetched.inc(fetchReadyEntriesFromIds.size());
                return claimEntries(fetchReadyEntriesFromIds);
            }
            if (!this.isQueueOpenForWrite.get() && this.isQueueOpenForRead.compareAndSet(true, false)) {
                log.info(this.DB_QUEUE_LOG_ID + " Closing Q for read");
            }
        }
        if (this.isQueueOpenForRead.get()) {
            return ImmutableList.of();
        }
        List<T> fetchReadyEntries2 = fetchReadyEntries(this.thresholdToReopenQForWrite > this.config.getMaxEntriesClaimed() ? this.thresholdToReopenQForWrite : this.config.getMaxEntriesClaimed());
        if (fetchReadyEntries2.size() < this.thresholdToReopenQForWrite && this.isQueueOpenForWrite.compareAndSet(false, true)) {
            log.info(this.DB_QUEUE_LOG_ID + " Opening Q for write");
        }
        if (fetchReadyEntries2.size() > this.config.getMaxEntriesClaimed()) {
            fetchReadyEntries2 = fetchReadyEntries2.subList(0, this.config.getMaxEntriesClaimed());
        }
        if (removeInflightEventsWhenSwitchingToQueueOpenForRead(fetchReadyEntries2) && this.isQueueOpenForRead.compareAndSet(false, true)) {
            log.info(this.DB_QUEUE_LOG_ID + " Opening Q for read");
        }
        this.totalFetched.inc(fetchReadyEntries2.size());
        return claimEntries(fetchReadyEntries2);
    }

    private void checkForOrphanEntries() {
        if (this.clock.getUTCNow().getMillis() <= this.lastPollingOrphanTime.get() + POLLING_ORPHANS_MSEC || !this.isRunningOrphanQuery.compareAndSet(false, true)) {
            return;
        }
        List<T> fetchReadyEntries = fetchReadyEntries(1);
        Long valueOf = Long.valueOf(this.lowestOrphanEntry.getAndSet(fetchReadyEntries.size() == 0 ? -1L : fetchReadyEntries.get(0).getRecordId().longValue()));
        this.lastPollingOrphanTime.set(this.clock.getUTCNow().getMillis());
        if (valueOf.longValue() > 0 && valueOf.longValue() == this.lowestOrphanEntry.get()) {
            log.warn(this.DB_QUEUE_LOG_ID + "Detected unprocessed bus event {}, may need to restart server...", valueOf);
        }
        this.isRunningOrphanQuery.set(false);
    }

    private boolean removeInflightEventsWhenSwitchingToQueueOpenForRead(List<T> list) {
        if (list.size() == 0) {
            return true;
        }
        boolean z = false;
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            z = this.inflightEvents.remove(it.next().getRecordId());
        }
        return z;
    }

    public void updateOnError(final T t) {
        this.sqlDao.inTransaction(new Transaction<Void, QueueSqlDao<T>>() { // from class: org.killbill.queue.DBBackedQueue.6
            @Override // org.skife.jdbi.v2.Transaction
            public Void inTransaction(QueueSqlDao<T> queueSqlDao, TransactionStatus transactionStatus) throws Exception {
                queueSqlDao.updateOnError(t.getRecordId(), DBBackedQueue.this.clock.getUTCNow().toDate(), t.getErrorCount(), DBBackedQueue.this.config.getTableName());
                if (t.getErrorCount().longValue() == 1) {
                    DBBackedQueue.this.totalProcessedFirstFailures.inc();
                }
                if (!DBBackedQueue.this.useInflightQueue) {
                    return null;
                }
                DBBackedQueue.this.transientInflightQRowIdCache.addRowId(t.getRecordId());
                return null;
            }
        });
    }

    public void moveEntryToHistory(final T t) {
        this.sqlDao.inTransaction(new Transaction<Void, QueueSqlDao<T>>() { // from class: org.killbill.queue.DBBackedQueue.7
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.skife.jdbi.v2.Transaction
            public Void inTransaction(QueueSqlDao<T> queueSqlDao, TransactionStatus transactionStatus) throws Exception {
                DBBackedQueue.this.moveEntryToHistoryFromTransaction(queueSqlDao, t);
                return null;
            }
        });
    }

    public void moveEntryToHistoryFromTransaction(QueueSqlDao<T> queueSqlDao, T t) {
        try {
            switch (t.getProcessingState()) {
                case FAILED:
                    this.totalProcessedAborted.inc();
                    break;
                case PROCESSED:
                    this.totalProcessedSuccess.inc();
                    break;
                case REMOVED:
                    break;
                default:
                    log.warn(this.DB_QUEUE_LOG_ID + "Unexpected terminal event state " + t.getProcessingState() + " for record_id = " + t.getRecordId());
                    break;
            }
            if (log.isDebugEnabled()) {
                log.debug(this.DB_QUEUE_LOG_ID + "Moving entry " + t.getRecordId() + " into history ");
            }
            queueSqlDao.insertEntry(t, this.config.getHistoryTableName());
            queueSqlDao.removeEntry(t.getRecordId(), this.config.getTableName());
        } catch (Exception e) {
            log.warn(this.DB_QUEUE_LOG_ID + "Failed to move entries [" + t.getRecordId() + "] into history ", (Throwable) e);
        }
    }

    public void moveEntriesToHistory(final Iterable<T> iterable) {
        try {
            this.sqlDao.inTransaction(new Transaction<Void, QueueSqlDao<T>>() { // from class: org.killbill.queue.DBBackedQueue.8
                @Override // org.skife.jdbi.v2.Transaction
                public Void inTransaction(QueueSqlDao<T> queueSqlDao, TransactionStatus transactionStatus) throws Exception {
                    DBBackedQueue.this.moveEntriesToHistoryFromTransaction(queueSqlDao, iterable);
                    return null;
                }
            });
        } catch (Exception e) {
            log.warn(this.DB_QUEUE_LOG_ID + "Failed to move entries [" + Joiner.on(", ").join(Iterables.transform(iterable, new Function<T, Long>() { // from class: org.killbill.queue.DBBackedQueue.9
                @Override // com.google.common.base.Function
                @Nullable
                public Long apply(@Nullable T t) {
                    return t.getRecordId();
                }
            })) + "] into history ", (Throwable) e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Failed to find 'out' block for switch in B:9:0x0038. Please report as an issue. */
    public void moveEntriesToHistoryFromTransaction(QueueSqlDao<T> queueSqlDao, Iterable<T> iterable) {
        if (iterable.iterator().hasNext()) {
            for (T t : iterable) {
                switch (t.getProcessingState()) {
                    case FAILED:
                        this.totalProcessedAborted.inc();
                        break;
                    case PROCESSED:
                        this.totalProcessedSuccess.inc();
                        break;
                    case REMOVED:
                        break;
                    default:
                        log.warn(this.DB_QUEUE_LOG_ID + "Unexpected terminal event state " + t.getProcessingState() + " for record_id = " + t.getRecordId());
                        break;
                }
                if (log.isDebugEnabled()) {
                    log.debug(this.DB_QUEUE_LOG_ID + "Moving entry " + t.getRecordId() + " into history ");
                }
            }
            Iterable transform = Iterables.transform(iterable, new Function<T, Object>() { // from class: org.killbill.queue.DBBackedQueue.10
                @Override // com.google.common.base.Function
                public Object apply(T t2) {
                    return t2.getRecordId();
                }
            });
            queueSqlDao.insertEntries(iterable, this.config.getHistoryTableName());
            queueSqlDao.removeEntries(ImmutableList.copyOf(transform), this.config.getTableName());
        }
    }

    private List<T> fetchReadyEntriesFromIds() {
        Long poll;
        int maxInflightQEntriesClaimed = this.config.getMaxInflightQEntriesClaimed() < this.inflightEvents.size() ? this.config.getMaxInflightQEntriesClaimed() : this.inflightEvents.size();
        int i = maxInflightQEntriesClaimed == 0 ? 1 : maxInflightQEntriesClaimed;
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            if (maxInflightQEntriesClaimed == 0) {
                try {
                    poll = this.inflightEvents.poll(INFLIGHT_POLLING_TIMEOUT_MSEC, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.warn(this.DB_QUEUE_LOG_ID + "Got interrupted ");
                    return ImmutableList.of();
                }
            } else {
                poll = this.inflightEvents.poll();
            }
            Long l = poll;
            if (l == null) {
                break;
            }
            arrayList.add(l);
        }
        ImmutableList of = ImmutableList.of();
        if (arrayList.size() > 0) {
            if (log.isDebugEnabled()) {
                log.debug(this.DB_QUEUE_LOG_ID + "fetchReadyEntriesFromIds, size = " + i + ", ids = " + Joiner.on(", ").join(arrayList));
            }
            of = ImmutableList.copyOf(Collections2.filter(this.sqlDao.getEntriesFromIds(arrayList, this.config.getTableName()), new Predicate<T>() { // from class: org.killbill.queue.DBBackedQueue.11
                @Override // com.google.common.base.Predicate
                public boolean apply(T t) {
                    return t.getProcessingState() == PersistentQueueEntryLifecycleState.AVAILABLE;
                }
            }));
        }
        return of;
    }

    private List<T> fetchReadyEntries(int i) {
        return this.sqlDao.getReadyEntries(this.clock.getUTCNow().toDate(), i, this.config.isSticky() ? CreatorName.get() : null, this.config.getTableName());
    }

    private List<T> claimEntries(List<T> list) {
        return this.config.isSticky() ? batchClaimEntries(list) : sequentialClaimEntries(list);
    }

    private List<T> batchClaimEntries(List<T> list) {
        if (list.size() == 0) {
            return ImmutableList.of();
        }
        Date date = this.clock.getUTCNow().plus(this.config.getClaimedTime().getMillis()).toDate();
        Collection<Long> transform = Collections2.transform(list, new Function<T, Long>() { // from class: org.killbill.queue.DBBackedQueue.12
            @Override // com.google.common.base.Function
            public Long apply(T t) {
                return t.getRecordId();
            }
        });
        int claimEntries = this.sqlDao.claimEntries(transform, this.clock.getUTCNow().toDate(), CreatorName.get(), date, this.config.getTableName());
        if (claimEntries == list.size()) {
            this.totalClaimed.inc(claimEntries);
            return list;
        }
        if (claimEntries == 0) {
            return ImmutableList.of();
        }
        ImmutableList copyOf = ImmutableList.copyOf(Iterables.filter(this.sqlDao.getEntriesFromIds(ImmutableList.copyOf((Collection) transform), this.config.getTableName()), new Predicate<T>() { // from class: org.killbill.queue.DBBackedQueue.13
            @Override // com.google.common.base.Predicate
            public boolean apply(T t) {
                return t.getProcessingState() == PersistentQueueEntryLifecycleState.IN_PROCESSING && t.getProcessingOwner().equals(CreatorName.get());
            }
        }));
        this.totalClaimed.inc(copyOf.size());
        return copyOf;
    }

    private List<T> sequentialClaimEntries(List<T> list) {
        return ImmutableList.copyOf(Collections2.filter(list, new Predicate<T>() { // from class: org.killbill.queue.DBBackedQueue.14
            @Override // com.google.common.base.Predicate
            public boolean apply(T t) {
                return DBBackedQueue.this.claimEntry(t);
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean claimEntry(T t) {
        boolean z = this.sqlDao.claimEntry(t.getRecordId(), this.clock.getUTCNow().toDate(), CreatorName.get(), this.clock.getUTCNow().plus(this.config.getClaimedTime().getMillis()).toDate(), this.config.getTableName()) == 1;
        if (z) {
            this.totalClaimed.inc();
            if (log.isDebugEnabled()) {
                log.debug(this.DB_QUEUE_LOG_ID + "Claiming entry " + t.getRecordId());
            }
        }
        return z;
    }

    public QueueSqlDao<T> getSqlDao() {
        return this.sqlDao;
    }

    public boolean isQueueOpenForWrite() {
        return this.isQueueOpenForWrite.get();
    }

    public boolean isQueueOpenForRead() {
        return this.isQueueOpenForRead.get();
    }

    public long getTotalInflightFetched() {
        return this.totalInflightFetched.getCount();
    }

    public long getTotalFetched() {
        return this.totalFetched.getCount();
    }

    public long getTotalInflightInsert() {
        return this.totalInflightInsert.getCount();
    }

    public long getTotalInsert() {
        return this.totalInsert.getCount();
    }

    @Override // java.util.Observer
    public void update(Observable observable, Object obj) {
        DatabaseTransactionEvent databaseTransactionEvent = (DatabaseTransactionEvent) obj;
        if (this.transientInflightQRowIdCache == null || !this.transientInflightQRowIdCache.isValid()) {
            return;
        }
        if (databaseTransactionEvent.getType() == DatabaseTransactionEventType.ROLLBACK) {
            return;
        }
        try {
            Iterator<Long> it = this.transientInflightQRowIdCache.iterator();
            while (it.hasNext()) {
                Long next = it.next();
                boolean offer = this.inflightEvents.offer(next);
                if (offer) {
                    if (log.isDebugEnabled()) {
                        log.debug(this.DB_QUEUE_LOG_ID + "Inserting entry " + next + (offer ? " into inflightQ" : " into disk"));
                    }
                    this.totalInflightInsert.inc();
                } else if (this.isQueueOpenForWrite.compareAndSet(true, false)) {
                    log.info(this.DB_QUEUE_LOG_ID + "Closing Q for write: Overflowed with recordId = " + next);
                }
            }
        } finally {
            this.transientInflightQRowIdCache.reset();
        }
    }

    private Long safeInsertEntry(QueueSqlDao<T> queueSqlDao, T t) {
        queueSqlDao.resetLastInsertId();
        queueSqlDao.insertEntry(t, this.config.getTableName());
        return queueSqlDao.getLastInsertId();
    }
}
