package org.killbill.notificationq;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.joda.time.DateTime;
import org.killbill.Hostname;
import org.killbill.clock.Clock;
import org.killbill.notificationq.api.NotificationEvent;
import org.killbill.notificationq.api.NotificationQueue;
import org.killbill.notificationq.api.NotificationQueueConfig;
import org.killbill.notificationq.api.NotificationQueueService;
import org.killbill.notificationq.dao.NotificationEventModelDao;
import org.killbill.notificationq.dao.NotificationSqlDao;
import org.killbill.queue.DBBackedQueue;
import org.killbill.queue.DefaultQueueLifecycle;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/killbill/notificationq/NotificationQueueDispatcher.class */
public class NotificationQueueDispatcher extends DefaultQueueLifecycle {
    protected static final Logger log = LoggerFactory.getLogger(NotificationQueueDispatcher.class);
    public static final int CLAIM_TIME_MS = 300000;
    private static final String NOTIFICATION_THREAD_NAME = "Notification-queue-dispatch";
    private final AtomicLong nbProcessedEvents;
    protected final NotificationQueueConfig config;
    protected final Clock clock;
    protected final Map<String, NotificationQueue> queues;
    protected final DBBackedQueue<NotificationEventModelDao> dao;
    protected final MetricRegistry metricRegistry;
    private final Gauge pendingNotifications;
    private final Counter processedNotificationsSinceStart;
    private final Map<String, Histogram> perQueueProcessingTime;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NotificationQueueDispatcher(Clock clock, NotificationQueueConfig notificationQueueConfig, IDBI idbi, MetricRegistry metricRegistry) {
        super("NotificationQ", Executors.newFixedThreadPool(1, new ThreadFactory() { // from class: org.killbill.notificationq.NotificationQueueDispatcher.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setName(NotificationQueueDispatcher.NOTIFICATION_THREAD_NAME);
                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.killbill.notificationq.NotificationQueueDispatcher.1.1
                    @Override // java.lang.Thread.UncaughtExceptionHandler
                    public void uncaughtException(Thread thread2, Throwable th) {
                        NotificationQueueDispatcher.log.error("Uncaught exception for thread " + thread2.getName(), th);
                    }
                });
                return thread;
            }
        }), notificationQueueConfig.getNbThreads(), notificationQueueConfig);
        this.clock = clock;
        this.config = notificationQueueConfig;
        this.nbProcessedEvents = new AtomicLong();
        this.dao = new DBBackedQueue<>(clock, idbi != null ? (NotificationSqlDao) idbi.onDemand(NotificationSqlDao.class) : null, notificationQueueConfig, "notif-" + notificationQueueConfig.getTableName(), metricRegistry);
        this.queues = new TreeMap();
        this.metricRegistry = metricRegistry;
        this.pendingNotifications = (Gauge) metricRegistry.register(MetricRegistry.name((Class<?>) NotificationQueueDispatcher.class, "pending-notifications"), new Gauge<Integer>() { // from class: org.killbill.notificationq.NotificationQueueDispatcher.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.codahale.metrics.Gauge
            public Integer getValue() {
                return 0;
            }
        });
        this.processedNotificationsSinceStart = metricRegistry.counter(MetricRegistry.name((Class<?>) NotificationQueueDispatcher.class, "processed-notifications-since-start"));
        this.perQueueProcessingTime = new HashMap();
    }

    @Override // org.killbill.queue.DefaultQueueLifecycle, org.killbill.queue.api.QueueLifecycle
    public void stopQueue() {
        if (this.config.isProcessingOff() || !isStarted()) {
            return;
        }
        int i = 0;
        synchronized (this.queues) {
            Iterator it = this.queues.values().iterator();
            while (it.hasNext()) {
                if (((NotificationQueue) it.next()).isStarted()) {
                    i++;
                }
            }
        }
        if (i == 0) {
            super.stopQueue();
        }
    }

    public AtomicLong getNbProcessedEvents() {
        return this.nbProcessedEvents;
    }

    public Clock getClock() {
        return this.clock;
    }

    protected NotificationQueueService.NotificationQueueHandler getHandlerForActiveQueue(String str) {
        synchronized (this.queues) {
            NotificationQueue notificationQueue = (NotificationQueue) this.queues.get(str);
            if (notificationQueue == null || !notificationQueue.isStarted()) {
                return null;
            }
            return notificationQueue.getHandler();
        }
    }

    @Override // org.killbill.queue.DefaultQueueLifecycle
    public int doProcessEvents() {
        return doProcessEventsWithLimit(-1);
    }

    /* JADX WARN: Code restructure failed: missing block: B:36:0x011a, code lost:
    
        if (r20 != null) goto L30;
     */
    /* JADX WARN: Code restructure failed: missing block: B:37:0x011d, code lost:
    
        r12 = r12 + 1;
        clearNotification(r0);
        logDebug("done handling notification %s, key = %s for time %s", r0.getRecordId(), r0.getEventJson(), r0.getEffectiveDate());
     */
    /* JADX WARN: Code restructure failed: missing block: B:41:0x0156, code lost:
    
        if (r0 > r9.config.getMaxFailureRetries()) goto L33;
     */
    /* JADX WARN: Code restructure failed: missing block: B:42:0x0159, code lost:
    
        org.killbill.notificationq.NotificationQueueDispatcher.log.info("NotificationQ dispatch error, will attempt a retry ", (java.lang.Throwable) r20);
        r9.dao.updateOnError(new org.killbill.notificationq.dao.NotificationEventModelDao(r0, org.killbill.Hostname.get(), r9.clock.getUTCNow(), org.killbill.queue.api.PersistentQueueEntryLifecycleState.AVAILABLE, java.lang.Long.valueOf(r0)));
     */
    /* JADX WARN: Code restructure failed: missing block: B:43:0x0190, code lost:
    
        org.killbill.notificationq.NotificationQueueDispatcher.log.error("Fatal NotificationQ dispatch error, data corruption...", (java.lang.Throwable) r20);
        clearFailedNotification(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:46:0x011a, code lost:
    
        if (0 != 0) goto L30;
     */
    /* JADX WARN: Code restructure failed: missing block: B:47:0x011d, code lost:
    
        r12 = r12 + 1;
        clearNotification(r0);
        logDebug("done handling notification %s, key = %s for time %s", r0.getRecordId(), r0.getEventJson(), r0.getEffectiveDate());
     */
    /* JADX WARN: Code restructure failed: missing block: B:49:0x0115, code lost:
    
        throw r21;
     */
    /* JADX WARN: Code restructure failed: missing block: B:51:0x0156, code lost:
    
        if (r0 > r9.config.getMaxFailureRetries()) goto L33;
     */
    /* JADX WARN: Code restructure failed: missing block: B:52:0x0159, code lost:
    
        org.killbill.notificationq.NotificationQueueDispatcher.log.info("NotificationQ dispatch error, will attempt a retry ", (java.lang.Throwable) null);
        r9.dao.updateOnError(new org.killbill.notificationq.dao.NotificationEventModelDao(r0, org.killbill.Hostname.get(), r9.clock.getUTCNow(), org.killbill.queue.api.PersistentQueueEntryLifecycleState.AVAILABLE, java.lang.Long.valueOf(r0)));
     */
    /* JADX WARN: Code restructure failed: missing block: B:53:0x0190, code lost:
    
        org.killbill.notificationq.NotificationQueueDispatcher.log.error("Fatal NotificationQ dispatch error, data corruption...", (java.lang.Throwable) null);
        clearFailedNotification(r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected int doProcessEventsWithLimit(int r10) {
        /*
            Method dump skipped, instructions count: 425
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.killbill.notificationq.NotificationQueueDispatcher.doProcessEventsWithLimit(int):int");
    }

    private void handleNotificationWithMetrics(NotificationQueueService.NotificationQueueHandler notificationQueueHandler, NotificationEventModelDao notificationEventModelDao, NotificationEvent notificationEvent) throws NotificationQueueException {
        Histogram histogram;
        String[] split = notificationEventModelDao.getQueueName().split(":");
        String str = split[0].substring(0, 3) + "-" + split[1] + "-process-time";
        synchronized (this.perQueueProcessingTime) {
            if (!this.perQueueProcessingTime.containsKey(notificationEventModelDao.getQueueName())) {
                this.perQueueProcessingTime.put(notificationEventModelDao.getQueueName(), this.metricRegistry.histogram(MetricRegistry.name((Class<?>) NotificationQueueDispatcher.class, str)));
            }
            histogram = (Histogram) this.perQueueProcessingTime.get(notificationEventModelDao.getQueueName());
        }
        DateTime uTCNow = this.clock.getUTCNow();
        try {
            try {
                notificationQueueHandler.handleReadyNotification(notificationEvent, notificationEventModelDao.getEffectiveDate(), notificationEventModelDao.getFutureUserToken(), notificationEventModelDao.getSearchKey1(), notificationEventModelDao.getSearchKey2());
            } catch (RuntimeException e) {
                throw new NotificationQueueException(e);
            }
        } finally {
            histogram.update(this.clock.getUTCNow().getMillis() - uTCNow.getMillis());
            this.processedNotificationsSinceStart.inc();
        }
    }

    private void clearNotification(NotificationEventModelDao notificationEventModelDao) {
        this.dao.moveEntryToHistory(new NotificationEventModelDao(notificationEventModelDao, Hostname.get(), this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED));
    }

    private void clearFailedNotification(NotificationEventModelDao notificationEventModelDao) {
        this.dao.moveEntryToHistory(new NotificationEventModelDao(notificationEventModelDao, Hostname.get(), this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.FAILED));
    }

    private List<NotificationEventModelDao> getReadyNotifications() {
        List<NotificationEventModelDao> readyEntries = this.dao.getReadyEntries();
        ArrayList arrayList = new ArrayList();
        for (NotificationEventModelDao notificationEventModelDao : readyEntries) {
            NotificationQueue notificationQueue = (NotificationQueue) this.queues.get(notificationEventModelDao.getQueueName());
            if (notificationQueue != null && notificationQueue.isStarted()) {
                arrayList.add(notificationEventModelDao);
            }
        }
        return arrayList;
    }

    private void logDebug(String str, Object... objArr) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Thread %d  %s", new Object[]{Long.valueOf(Thread.currentThread().getId()), String.format(str, objArr)}));
        }
    }

    public static String getCompositeName(String str, String str2) {
        return str + ":" + str2;
    }
}
