package org.killbill.queue;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.killbill.CreatorName;
import org.killbill.clock.Clock;
import org.killbill.commons.profiling.Profiling;
import org.killbill.commons.profiling.ProfilingFeature;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dao.QueueSqlDao;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/killbill/queue/DBBackedQueue.class */
public abstract class DBBackedQueue<T extends EventEntryModelDao> {
    protected static final Logger log = LoggerFactory.getLogger((Class<?>) DBBackedQueue.class);
    protected final String DB_QUEUE_LOG_ID;
    protected final IDBI dbi;
    protected final Class<? extends QueueSqlDao<T>> sqlDaoClass;
    protected final QueueSqlDao<T> sqlDao;
    protected final Clock clock;
    protected final PersistentQueueConfig config;
    protected final Timer rawGetEntriesTime;
    protected final Timer rawInsertEntryTime;
    protected final Timer rawClaimEntriesTime;
    protected final Timer rawClaimEntryTime;
    protected final Timer rawDeleteEntriesTime;
    protected final Timer rawDeleteEntryTime;
    protected final Profiling<Long, RuntimeException> prof = new Profiling<>();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/killbill/queue/DBBackedQueue$Query.class */
    public interface Query<U, QueueSqlDao> {
        U execute(QueueSqlDao queuesqldao);
    }

    /* loaded from: input_file:org/killbill/queue/DBBackedQueue$ReadyEntriesWithMetrics.class */
    public static class ReadyEntriesWithMetrics<T extends EventEntryModelDao> {
        private final List<T> entries;
        private final long time;

        public ReadyEntriesWithMetrics(List<T> list, long j) {
            this.entries = list;
            this.time = j;
        }

        public List<T> getEntries() {
            return this.entries;
        }

        public long getTime() {
            return this.time;
        }
    }

    public DBBackedQueue(Clock clock, IDBI idbi, Class<? extends QueueSqlDao<T>> cls, PersistentQueueConfig persistentQueueConfig, String str, MetricRegistry metricRegistry) {
        this.dbi = idbi;
        this.sqlDaoClass = cls;
        this.sqlDao = (QueueSqlDao) idbi.onDemand(cls);
        this.config = persistentQueueConfig;
        this.clock = clock;
        this.rawGetEntriesTime = metricRegistry.timer(MetricRegistry.name((Class<?>) DBBackedQueue.class, "rawGetEntriesTime"));
        this.rawInsertEntryTime = metricRegistry.timer(MetricRegistry.name((Class<?>) DBBackedQueue.class, "rawInsertEntryTime"));
        this.rawClaimEntriesTime = metricRegistry.timer(MetricRegistry.name((Class<?>) DBBackedQueue.class, "rawClaimEntriesTime"));
        this.rawClaimEntryTime = metricRegistry.timer(MetricRegistry.name((Class<?>) DBBackedQueue.class, "rawClaimEntryTime"));
        this.rawDeleteEntriesTime = metricRegistry.timer(MetricRegistry.name((Class<?>) DBBackedQueue.class, "rawDeleteEntriesTime"));
        this.rawDeleteEntryTime = metricRegistry.timer(MetricRegistry.name((Class<?>) DBBackedQueue.class, "rawDeleteEntryTime"));
        this.DB_QUEUE_LOG_ID = "DBBackedQueue-" + str;
    }

    public abstract void initialize();

    public abstract void close();

    public abstract ReadyEntriesWithMetrics<T> getReadyEntries();

    public abstract void insertEntryFromTransaction(QueueSqlDao<T> queueSqlDao, T t);

    public abstract void updateOnError(T t);

    protected abstract void insertReapedEntriesFromTransaction(QueueSqlDao<T> queueSqlDao, List<T> list, DateTime dateTime);

    public void insertEntry(final T t) {
        executeTransaction(new Transaction<Void, QueueSqlDao<T>>() { // from class: org.killbill.queue.DBBackedQueue.1
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.skife.jdbi.v2.Transaction
            public Void inTransaction(QueueSqlDao<T> queueSqlDao, TransactionStatus transactionStatus) {
                DBBackedQueue.this.insertEntryFromTransaction(queueSqlDao, t);
                return null;
            }
        });
    }

    public void moveEntryToHistory(final T t) {
        executeTransaction(new Transaction<Void, QueueSqlDao<T>>() { // from class: org.killbill.queue.DBBackedQueue.2
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.skife.jdbi.v2.Transaction
            public Void inTransaction(QueueSqlDao<T> queueSqlDao, TransactionStatus transactionStatus) throws Exception {
                DBBackedQueue.this.moveEntryToHistoryFromTransaction(queueSqlDao, t);
                return null;
            }
        });
    }

    public void moveEntryToHistoryFromTransaction(QueueSqlDao<T> queueSqlDao, T t) {
        try {
            switch (t.getProcessingState()) {
                case FAILED:
                case PROCESSED:
                case REMOVED:
                case REAPED:
                    break;
                default:
                    log.warn("{} Unexpected terminal event state={} for record_id={}", this.DB_QUEUE_LOG_ID, t.getProcessingState(), t.getRecordId());
                    break;
            }
            log.debug("{} Moving entry into history: recordId={}, className={}, json={}", this.DB_QUEUE_LOG_ID, t.getRecordId(), t.getClassName(), t.getEventJson());
            long nanoTime = System.nanoTime();
            queueSqlDao.insertEntry(t, this.config.getHistoryTableName());
            queueSqlDao.removeEntry(t.getRecordId(), this.config.getTableName());
            this.rawDeleteEntryTime.update(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
        } catch (Exception e) {
            log.warn("{} Failed to move entry into history: {}", this.DB_QUEUE_LOG_ID, t, e);
        }
    }

    public void moveEntriesToHistory(final Iterable<T> iterable) {
        try {
            executeTransaction(new Transaction<Void, QueueSqlDao<T>>() { // from class: org.killbill.queue.DBBackedQueue.3
                @Override // org.skife.jdbi.v2.Transaction
                public Void inTransaction(QueueSqlDao<T> queueSqlDao, TransactionStatus transactionStatus) throws Exception {
                    DBBackedQueue.this.moveEntriesToHistoryFromTransaction(queueSqlDao, iterable);
                    return null;
                }
            });
        } catch (Exception e) {
            log.warn("{} Failed to move entries into history: {}", this.DB_QUEUE_LOG_ID, iterable, e);
        }
    }

    public void moveEntriesToHistoryFromTransaction(QueueSqlDao<T> queueSqlDao, Iterable<T> iterable) {
        if (iterable.iterator().hasNext()) {
            for (T t : iterable) {
                switch (t.getProcessingState()) {
                    case FAILED:
                    case PROCESSED:
                    case REMOVED:
                    case REAPED:
                        break;
                    default:
                        log.warn("{} Unexpected terminal event state={} for record_id={}", this.DB_QUEUE_LOG_ID, t.getProcessingState(), t.getRecordId());
                        break;
                }
                log.debug("{} Moving entry into history: recordId={}, className={}, json={}", this.DB_QUEUE_LOG_ID, t.getRecordId(), t.getClassName(), t.getEventJson());
            }
            Iterable transform = Iterables.transform(iterable, new Function<T, Long>() { // from class: org.killbill.queue.DBBackedQueue.4
                @Override // com.google.common.base.Function, java.util.function.Function
                public Long apply(T t2) {
                    return t2.getRecordId();
                }
            });
            long nanoTime = System.nanoTime();
            queueSqlDao.insertEntries(iterable, this.config.getHistoryTableName());
            queueSqlDao.removeEntries(ImmutableList.copyOf(transform), this.config.getTableName());
            this.rawDeleteEntriesTime.update(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
        }
    }

    protected long getNbReadyEntries() {
        return getNbReadyEntries(this.clock.getUTCNow().toDate());
    }

    public long getNbReadyEntries(final Date date) {
        final String str = this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.POLLING ? null : CreatorName.get();
        return ((Long) executeQuery(new Query<Long, QueueSqlDao<T>>() { // from class: org.killbill.queue.DBBackedQueue.5
            @Override // org.killbill.queue.DBBackedQueue.Query
            public Long execute(QueueSqlDao<T> queueSqlDao) {
                return Long.valueOf(queueSqlDao.getNbReadyEntries(date, str, DBBackedQueue.this.config.getTableName()));
            }
        })).longValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Long safeInsertEntry(final QueueSqlDao<T> queueSqlDao, final T t) {
        return this.prof.executeWithProfiling(ProfilingFeature.ProfilingFeatureType.DAO, "QueueSqlDao:insert", new Profiling.WithProfilingCallback<Long, RuntimeException>() { // from class: org.killbill.queue.DBBackedQueue.6
            /* JADX WARN: Can't rename method to resolve collision */
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.killbill.commons.profiling.Profiling.WithProfilingCallback
            public Long execute() throws RuntimeException {
                long nanoTime = System.nanoTime();
                queueSqlDao.resetLastInsertId();
                queueSqlDao.insertEntry(t, DBBackedQueue.this.config.getTableName());
                Long lastInsertId = queueSqlDao.getLastInsertId();
                DBBackedQueue.log.debug("{} Inserting entry: lastInsertId={}, entry={}", DBBackedQueue.this.DB_QUEUE_LOG_ID, lastInsertId, t);
                DBBackedQueue.this.rawInsertEntryTime.update(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                return lastInsertId;
            }
        });
    }

    public void reapEntries(final Date date) {
        executeTransaction(new Transaction<Void, QueueSqlDao<T>>() { // from class: org.killbill.queue.DBBackedQueue.7
            @Override // org.skife.jdbi.v2.Transaction
            public Void inTransaction(QueueSqlDao<T> queueSqlDao, TransactionStatus transactionStatus) throws Exception {
                DateTime uTCNow = DBBackedQueue.this.clock.getUTCNow();
                List<T> entriesLeftBehind = queueSqlDao.getEntriesLeftBehind(DBBackedQueue.this.config.getMaxReDispatchCount(), uTCNow.toDate(), date, DBBackedQueue.this.config.getTableName());
                if (entriesLeftBehind.size() <= 0) {
                    return null;
                }
                DBBackedQueue.this.moveEntriesToHistoryFromTransaction(queueSqlDao, Iterables.transform(entriesLeftBehind, new Function<T, T>() { // from class: org.killbill.queue.DBBackedQueue.7.1
                    @Override // com.google.common.base.Function, java.util.function.Function
                    @Nullable
                    public T apply(@Nullable T t) {
                        t.setProcessingState(PersistentQueueEntryLifecycleState.REAPED);
                        return t;
                    }
                }));
                DBBackedQueue.this.insertReapedEntriesFromTransaction(queueSqlDao, entriesLeftBehind, uTCNow);
                DBBackedQueue.log.warn("{} {} entries were reaped by {}", DBBackedQueue.this.DB_QUEUE_LOG_ID, Integer.valueOf(entriesLeftBehind.size()), CreatorName.get());
                return null;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <U> U executeQuery(final Query<U, QueueSqlDao<T>> query) {
        return (U) this.dbi.withHandle(new HandleCallback<U>() { // from class: org.killbill.queue.DBBackedQueue.8
            @Override // org.skife.jdbi.v2.tweak.HandleCallback
            public U withHandle(Handle handle) throws Exception {
                U u = (U) query.execute(handle.attach(DBBackedQueue.this.sqlDaoClass));
                DBBackedQueue.this.printSQLWarnings(handle);
                return u;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <U> U executeTransaction(final Transaction<U, QueueSqlDao<T>> transaction) {
        return (U) this.dbi.inTransaction(new TransactionCallback<U>() { // from class: org.killbill.queue.DBBackedQueue.9
            @Override // org.skife.jdbi.v2.TransactionCallback
            public U inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                U u = (U) transaction.inTransaction(handle.attach(DBBackedQueue.this.sqlDaoClass), transactionStatus);
                DBBackedQueue.this.printSQLWarnings(handle);
                return u;
            }
        });
    }

    protected void printSQLWarnings(Handle handle) {
        try {
            for (SQLWarning warnings = handle.getConnection().getWarnings(); warnings != null; warnings = warnings.getNextWarning()) {
                log.debug("[SQL WARNING] {}", (Throwable) warnings);
            }
            handle.getConnection().clearWarnings();
        } catch (SQLException e) {
            log.debug("Error whilst retrieving SQL warnings", (Throwable) e);
        }
    }

    public QueueSqlDao<T> getSqlDao() {
        return this.sqlDao;
    }
}
