package org.killbill.bus;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.eventbus.EventBusThatThrowsException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import javax.inject.Named;
import org.killbill.Hostname;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.PersistentBus;
import org.killbill.bus.api.PersistentBusConfig;
import org.killbill.bus.dao.BusEventModelDao;
import org.killbill.bus.dao.PersistentBusSqlDao;
import org.killbill.clock.Clock;
import org.killbill.commons.jdbi.notification.DatabaseTransactionNotificationApi;
import org.killbill.queue.DBBackedQueue;
import org.killbill.queue.DefaultQueueLifecycle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/killbill/bus/DefaultPersistentBus.class */
public class DefaultPersistentBus extends DefaultQueueLifecycle implements PersistentBus {
    private static final Logger log = LoggerFactory.getLogger(DefaultPersistentBus.class);
    private final EventBusDelegate eventBusDelegate;
    private final DBBackedQueue<BusEventModelDao> dao;
    private final Clock clock;
    final Timer dispatchTimer;
    private AtomicBoolean isStarted;

    /* loaded from: input_file:org/killbill/bus/DefaultPersistentBus$EventBusDelegate.class */
    private static final class EventBusDelegate extends EventBusThatThrowsException {
        public EventBusDelegate(String str) {
            super(str);
        }
    }

    @Inject
    public DefaultPersistentBus(@Named("Queue") IDBI idbi, Clock clock, final PersistentBusConfig persistentBusConfig, MetricRegistry metricRegistry, DatabaseTransactionNotificationApi databaseTransactionNotificationApi) {
        super("Bus", Executors.newFixedThreadPool(persistentBusConfig.getNbThreads(), new ThreadFactory() { // from class: org.killbill.bus.DefaultPersistentBus.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(new ThreadGroup(PersistentBus.EVENT_BUS_GROUP_NAME), runnable, PersistentBusConfig.this.getTableName() + "-th");
            }
        }), persistentBusConfig.getNbThreads(), persistentBusConfig);
        PersistentBusSqlDao persistentBusSqlDao = (PersistentBusSqlDao) idbi.onDemand(PersistentBusSqlDao.class);
        this.clock = clock;
        this.dao = new DBBackedQueue<>(clock, persistentBusSqlDao, persistentBusConfig, "bus-" + persistentBusConfig.getTableName(), metricRegistry, databaseTransactionNotificationApi);
        this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
        this.dispatchTimer = metricRegistry.timer(MetricRegistry.name((Class<?>) DefaultPersistentBus.class, "dispatch"));
        this.isStarted = new AtomicBoolean(false);
    }

    @Override // org.killbill.bus.api.PersistentBus
    public void start() {
        if (this.isStarted.compareAndSet(false, true)) {
            this.dao.initialize();
            startQueue();
        }
    }

    @Override // org.killbill.bus.api.PersistentBus
    public void stop() {
        if (this.isStarted.compareAndSet(true, false)) {
            stopQueue();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:25:0x00bd, code lost:
    
        if (0 != 0) goto L28;
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x00c0, code lost:
    
        r0.add(new org.killbill.bus.dao.BusEventModelDao(r0, org.killbill.Hostname.get(), r9.clock.getUTCNow(), org.killbill.queue.api.PersistentQueueEntryLifecycleState.PROCESSED));
     */
    /* JADX WARN: Code restructure failed: missing block: B:28:0x00b2, code lost:
    
        throw r21;
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x00f3, code lost:
    
        if (r0 > r9.config.getMaxFailureRetries()) goto L31;
     */
    /* JADX WARN: Code restructure failed: missing block: B:31:0x00f6, code lost:
    
        org.killbill.bus.DefaultPersistentBus.log.info("Bus dispatch error, will attempt a retry ", (java.lang.Throwable) null);
        r9.dao.updateOnError(new org.killbill.bus.dao.BusEventModelDao(r0, org.killbill.Hostname.get(), r9.clock.getUTCNow(), org.killbill.queue.api.PersistentQueueEntryLifecycleState.AVAILABLE, java.lang.Long.valueOf(r0)));
     */
    /* JADX WARN: Code restructure failed: missing block: B:32:0x012d, code lost:
    
        org.killbill.bus.DefaultPersistentBus.log.error("Fatal Bus dispatch error, data corruption...", (java.lang.Throwable) null);
        r0.add(new org.killbill.bus.dao.BusEventModelDao(r0, org.killbill.Hostname.get(), r9.clock.getUTCNow(), org.killbill.queue.api.PersistentQueueEntryLifecycleState.FAILED));
     */
    /* JADX WARN: Code restructure failed: missing block: B:41:0x00bd, code lost:
    
        if (r18 != null) goto L28;
     */
    /* JADX WARN: Code restructure failed: missing block: B:42:0x00c0, code lost:
    
        r0.add(new org.killbill.bus.dao.BusEventModelDao(r0, org.killbill.Hostname.get(), r9.clock.getUTCNow(), org.killbill.queue.api.PersistentQueueEntryLifecycleState.PROCESSED));
     */
    /* JADX WARN: Code restructure failed: missing block: B:46:0x00f3, code lost:
    
        if (r0 > r9.config.getMaxFailureRetries()) goto L31;
     */
    /* JADX WARN: Code restructure failed: missing block: B:47:0x00f6, code lost:
    
        org.killbill.bus.DefaultPersistentBus.log.info("Bus dispatch error, will attempt a retry ", r18);
        r9.dao.updateOnError(new org.killbill.bus.dao.BusEventModelDao(r0, org.killbill.Hostname.get(), r9.clock.getUTCNow(), org.killbill.queue.api.PersistentQueueEntryLifecycleState.AVAILABLE, java.lang.Long.valueOf(r0)));
     */
    /* JADX WARN: Code restructure failed: missing block: B:48:0x012d, code lost:
    
        org.killbill.bus.DefaultPersistentBus.log.error("Fatal Bus dispatch error, data corruption...", r18);
        r0.add(new org.killbill.bus.dao.BusEventModelDao(r0, org.killbill.Hostname.get(), r9.clock.getUTCNow(), org.killbill.queue.api.PersistentQueueEntryLifecycleState.FAILED));
     */
    @Override // org.killbill.queue.DefaultQueueLifecycle
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public int doProcessEvents() {
        /*
            Method dump skipped, instructions count: 363
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.killbill.bus.DefaultPersistentBus.doProcessEvents():int");
    }

    @Override // org.killbill.queue.DefaultQueueLifecycle, org.killbill.queue.api.QueueLifecycle
    public boolean isStarted() {
        return this.isStarted.get();
    }

    @Override // org.killbill.bus.api.PersistentBus
    public void register(Object obj) throws PersistentBus.EventBusException {
        if (this.isStarted.get()) {
            this.eventBusDelegate.register(obj);
        } else {
            log.warn("Attempting to register handler " + obj + " in a non initialized bus");
        }
    }

    @Override // org.killbill.bus.api.PersistentBus
    public void unregister(Object obj) throws PersistentBus.EventBusException {
        if (this.isStarted.get()) {
            this.eventBusDelegate.unregister(obj);
        } else {
            log.warn("Attempting to unregister handler " + obj + " in a non initialized bus");
        }
    }

    @Override // org.killbill.bus.api.PersistentBus
    public void post(BusEvent busEvent) throws PersistentBus.EventBusException {
        try {
            if (this.isStarted.get()) {
                this.dao.insertEntry(new BusEventModelDao(Hostname.get(), this.clock.getUTCNow(), busEvent.getClass().getName(), this.objectMapper.writeValueAsString(busEvent), busEvent.getUserToken(), busEvent.getSearchKey1(), busEvent.getSearchKey2()));
            } else {
                log.warn("Attempting to post event " + busEvent + " in a non initialized bus");
            }
        } catch (Exception e) {
            log.error("Failed to post BusEvent " + busEvent, (Throwable) e);
        }
    }

    @Override // org.killbill.bus.api.PersistentBus
    public void postFromTransaction(BusEvent busEvent, Transmogrifier transmogrifier) throws PersistentBus.EventBusException {
        try {
            PersistentBusSqlDao persistentBusSqlDao = (PersistentBusSqlDao) transmogrifier.become(PersistentBusSqlDao.class);
            if (this.isStarted.get()) {
                this.dao.insertEntryFromTransaction(persistentBusSqlDao, new BusEventModelDao(Hostname.get(), this.clock.getUTCNow(), busEvent.getClass().getName(), this.objectMapper.writeValueAsString(busEvent), busEvent.getUserToken(), busEvent.getSearchKey1(), busEvent.getSearchKey2()));
            } else {
                log.warn("Attempting to post event " + busEvent + " in a non initialized bus");
            }
        } catch (Exception e) {
            log.error("Failed to post BusEvent " + busEvent, (Throwable) e);
        }
    }
}
