package org.killbill.bus;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.eventbus.EventBusException;
import com.google.common.eventbus.EventBusThatThrowsException;
import java.sql.Connection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import javax.sql.DataSource;
import org.joda.time.DateTime;
import org.killbill.CreatorName;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.BusEventWithMetadata;
import org.killbill.bus.api.PersistentBus;
import org.killbill.bus.api.PersistentBusConfig;
import org.killbill.bus.dao.BusEventModelDao;
import org.killbill.bus.dao.PersistentBusSqlDao;
import org.killbill.bus.dispatching.BusCallableCallback;
import org.killbill.clock.Clock;
import org.killbill.clock.DefaultClock;
import org.killbill.commons.jdbi.notification.DatabaseTransactionNotificationApi;
import org.killbill.commons.profiling.Profiling;
import org.killbill.commons.profiling.ProfilingFeature;
import org.killbill.queue.DBBackedQueue;
import org.killbill.queue.DBBackedQueueWithInflightQueue;
import org.killbill.queue.DBBackedQueueWithPolling;
import org.killbill.queue.DefaultQueueLifecycle;
import org.killbill.queue.InTransaction;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.QueueEvent;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dispatching.BlockingRejectionExecutionHandler;
import org.killbill.queue.dispatching.CallableCallbackBase;
import org.killbill.queue.dispatching.Dispatcher;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/killbill/bus/DefaultPersistentBus.class */
public class DefaultPersistentBus extends DefaultQueueLifecycle implements PersistentBus {
    private static final Logger log = LoggerFactory.getLogger(DefaultPersistentBus.class);
    private final DBI dbi;
    private final EventBusThatThrowsException eventBusDelegate;
    private final DBBackedQueue<BusEventModelDao> dao;
    private final Clock clock;
    private final PersistentBusConfig config;
    private final Profiling<Iterable<BusEventModelDao>, RuntimeException> prof;
    private final BusReaper reaper;
    private final Dispatcher<BusEvent, BusEventModelDao> dispatcher;
    private final Timer busHandlersProcessingTime;
    private final AtomicBoolean isInitialized;
    private final AtomicBoolean isStarted;
    private final String dbBackedQId;
    private final BusCallableCallback busCallableCallback;

    /* loaded from: input_file:org/killbill/bus/DefaultPersistentBus$EventBusDelegate.class */
    private static final class EventBusDelegate extends EventBusThatThrowsException {
        public EventBusDelegate(String str) {
            super(str);
        }
    }

    @Inject
    public DefaultPersistentBus(@Named("Queue") IDBI idbi, Clock clock, final PersistentBusConfig persistentBusConfig, MetricRegistry metricRegistry, DatabaseTransactionNotificationApi databaseTransactionNotificationApi) {
        super(persistentBusConfig.getTableName(), persistentBusConfig, metricRegistry);
        this.dbi = (DBI) idbi;
        this.clock = clock;
        this.config = persistentBusConfig;
        this.dbBackedQId = persistentBusConfig.getTableName();
        this.dao = persistentBusConfig.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS ? new DBBackedQueueWithInflightQueue<>(clock, idbi, PersistentBusSqlDao.class, persistentBusConfig, this.dbBackedQId, metricRegistry, databaseTransactionNotificationApi) : new DBBackedQueueWithPolling<>(clock, idbi, PersistentBusSqlDao.class, persistentBusConfig, this.dbBackedQId, metricRegistry);
        this.prof = new Profiling<>();
        ThreadFactory threadFactory = new ThreadFactory() { // from class: org.killbill.bus.DefaultPersistentBus.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(new ThreadGroup(PersistentBus.EVENT_BUS_GROUP_NAME), runnable, persistentBusConfig.getTableName() + "-th");
            }
        };
        this.busHandlersProcessingTime = metricRegistry.timer(MetricRegistry.name((Class<?>) DefaultPersistentBus.class, this.dbBackedQId, "busHandlersProcessingTime"));
        this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
        this.isInitialized = new AtomicBoolean(false);
        this.isStarted = new AtomicBoolean(false);
        this.reaper = new BusReaper(this.dao, persistentBusConfig, clock);
        this.busCallableCallback = new BusCallableCallback(this);
        this.dispatcher = new Dispatcher<>(1, persistentBusConfig, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(persistentBusConfig.getEventQueueCapacity()), threadFactory, new BlockingRejectionExecutionHandler(), clock, this.busCallableCallback, this);
    }

    public DefaultPersistentBus(DataSource dataSource, Properties properties) {
        this(InTransaction.buildDDBI(dataSource), new DefaultClock(), (PersistentBusConfig) new ConfigurationObjectFactory(properties).buildWithReplacements(PersistentBusConfig.class, ImmutableMap.of("instanceName", "main")), new MetricRegistry(), new DatabaseTransactionNotificationApi());
    }

    @Override // org.killbill.queue.api.QueueLifecycle
    public boolean initQueue() {
        if (this.config.isProcessingOff()) {
            log.warn("PersistentBus processing is off, cannot be initialized");
            return false;
        }
        if (!this.isInitialized.compareAndSet(false, true)) {
            return false;
        }
        this.dao.initialize();
        this.dispatcher.start();
        return true;
    }

    @Override // org.killbill.queue.DefaultQueueLifecycle, org.killbill.queue.api.QueueLifecycle
    public boolean startQueue() {
        if (this.config.isProcessingOff()) {
            log.warn("PersistentBus processing is off, cannot be started");
            return false;
        }
        if (!this.isInitialized.get()) {
            initQueue();
        }
        if (!this.isStarted.compareAndSet(false, true)) {
            return false;
        }
        this.reaper.start();
        super.startQueue();
        return true;
    }

    @Override // org.killbill.queue.DefaultQueueLifecycle, org.killbill.queue.api.QueueLifecycle
    public void stopQueue() {
        if (this.isStarted.compareAndSet(true, false)) {
            this.isInitialized.set(false);
            this.reaper.stop();
            super.stopQueue();
            this.dispatcher.stop();
            this.dao.close();
        }
    }

    @Override // org.killbill.queue.DefaultQueueLifecycle
    public DefaultQueueLifecycle.DispatchResultMetrics doDispatchEvents() {
        DBBackedQueue.ReadyEntriesWithMetrics<BusEventModelDao> readyEntries = this.dao.getReadyEntries();
        List<BusEventModelDao> entries = readyEntries.getEntries();
        if (entries.isEmpty()) {
            return new DefaultQueueLifecycle.DispatchResultMetrics(0, readyEntries.getTime());
        }
        log.debug("Bus events from {} to process: {}", this.config.getTableName(), entries);
        long nanoTime = System.nanoTime();
        Iterator<BusEventModelDao> it = entries.iterator();
        while (it.hasNext()) {
            this.dispatcher.dispatch(it.next());
        }
        return new DefaultQueueLifecycle.DispatchResultMetrics(entries.size(), (System.nanoTime() - nanoTime) + readyEntries.getTime());
    }

    @Override // org.killbill.queue.DefaultQueueLifecycle
    public void doProcessCompletedEvents(Iterable<? extends EventEntryModelDao> iterable) {
        this.busCallableCallback.moveCompletedOrFailedEvents(iterable);
    }

    @Override // org.killbill.queue.DefaultQueueLifecycle
    public void doProcessRetriedEvents(Iterable<? extends EventEntryModelDao> iterable) {
        Iterator<? extends EventEntryModelDao> it = iterable.iterator();
        while (it.hasNext()) {
            this.busCallableCallback.updateRetriedEvents((BusEventModelDao) it.next());
        }
    }

    @Override // org.killbill.queue.api.QueueLifecycle
    public boolean isStarted() {
        return this.isStarted.get();
    }

    @Override // org.killbill.bus.api.PersistentBus
    public void register(Object obj) throws PersistentBus.EventBusException {
        if (this.isInitialized.get()) {
            this.eventBusDelegate.register(obj);
        } else {
            log.warn("Attempting to register handler " + obj + " in a non initialized bus");
        }
    }

    @Override // org.killbill.bus.api.PersistentBus
    public void unregister(Object obj) throws PersistentBus.EventBusException {
        if (this.isInitialized.get()) {
            this.eventBusDelegate.unregister(obj);
        } else {
            log.warn("Attempting to unregister handler " + obj + " in a non initialized bus");
        }
    }

    @Override // org.killbill.bus.api.PersistentBus
    public void post(BusEvent busEvent) throws PersistentBus.EventBusException {
        try {
            if (this.isInitialized.get()) {
                this.dao.insertEntry(new BusEventModelDao(CreatorName.get(), this.clock.getUTCNow(), busEvent.getClass().getName(), this.objectMapper.writeValueAsString(busEvent), busEvent.getUserToken(), busEvent.getSearchKey1(), busEvent.getSearchKey2()));
            } else {
                log.warn("Attempting to post event " + busEvent + " in a non initialized bus");
            }
        } catch (Exception e) {
            log.error("Failed to post BusEvent " + busEvent, e);
        }
    }

    @Override // org.killbill.bus.api.PersistentBus
    public void postFromTransaction(BusEvent busEvent, Connection connection) throws PersistentBus.EventBusException {
        if (!this.isInitialized.get()) {
            log.warn("Attempting to post event " + busEvent + " in a non initialized bus");
            return;
        }
        try {
            final BusEventModelDao busEventModelDao = new BusEventModelDao(CreatorName.get(), this.clock.getUTCNow(), busEvent.getClass().getName(), this.objectMapper.writeValueAsString(busEvent), busEvent.getUserToken(), busEvent.getSearchKey1(), busEvent.getSearchKey2());
            InTransaction.execute(this.dbi, connection, new InTransaction.InTransactionHandler<PersistentBusSqlDao, Void>() { // from class: org.killbill.bus.DefaultPersistentBus.2
                @Override // org.killbill.queue.InTransaction.InTransactionHandler
                public Void withSqlDao(PersistentBusSqlDao persistentBusSqlDao) {
                    DefaultPersistentBus.this.dao.insertEntryFromTransaction(persistentBusSqlDao, busEventModelDao);
                    return null;
                }
            }, PersistentBusSqlDao.class);
        } catch (JsonProcessingException e) {
            log.warn("Unable to serialize event " + busEvent, e);
        }
    }

    @Override // org.killbill.bus.api.PersistentBus
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableBusEventsForSearchKeys(Long l, Long l2) {
        return getAvailableBusEventsForSearchKeysInternal((PersistentBusSqlDao) this.dao.getSqlDao(), null, l, l2);
    }

    @Override // org.killbill.bus.api.PersistentBus
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableBusEventsFromTransactionForSearchKeys(final Long l, final Long l2, Connection connection) {
        return (Iterable) InTransaction.execute(this.dbi, connection, new InTransaction.InTransactionHandler<PersistentBusSqlDao, Iterable<BusEventWithMetadata<T>>>() { // from class: org.killbill.bus.DefaultPersistentBus.3
            @Override // org.killbill.queue.InTransaction.InTransactionHandler
            public Iterable<BusEventWithMetadata<T>> withSqlDao(PersistentBusSqlDao persistentBusSqlDao) {
                return DefaultPersistentBus.this.getAvailableBusEventsForSearchKeysInternal(persistentBusSqlDao, null, l, l2);
            }
        }, PersistentBusSqlDao.class);
    }

    @Override // org.killbill.bus.api.PersistentBus
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableBusEventsForSearchKey2(DateTime dateTime, Long l) {
        return getAvailableBusEventsForSearchKeysInternal((PersistentBusSqlDao) this.dao.getSqlDao(), dateTime, null, l);
    }

    @Override // org.killbill.bus.api.PersistentBus
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableBusEventsFromTransactionForSearchKey2(final DateTime dateTime, final Long l, Connection connection) {
        return (Iterable) InTransaction.execute(this.dbi, connection, new InTransaction.InTransactionHandler<PersistentBusSqlDao, Iterable<BusEventWithMetadata<T>>>() { // from class: org.killbill.bus.DefaultPersistentBus.4
            @Override // org.killbill.queue.InTransaction.InTransactionHandler
            public Iterable<BusEventWithMetadata<T>> withSqlDao(PersistentBusSqlDao persistentBusSqlDao) {
                return DefaultPersistentBus.this.getAvailableBusEventsForSearchKeysInternal(persistentBusSqlDao, dateTime, null, l);
            }
        }, PersistentBusSqlDao.class);
    }

    @Override // org.killbill.bus.api.PersistentBus
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getInProcessingBusEvents() {
        return toBusEventWithMetadata(this.dao.getSqlDao().getInProcessingEntries(this.config.getTableName()));
    }

    @Override // org.killbill.bus.api.PersistentBus
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableOrInProcessingBusEventsForSearchKeys(Long l, Long l2) {
        return getAvailableOrInProcessingBusEventsForSearchKeysInternal((PersistentBusSqlDao) this.dao.getSqlDao(), null, l, l2);
    }

    @Override // org.killbill.bus.api.PersistentBus
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableOrInProcessingBusEventsFromTransactionForSearchKeys(final Long l, final Long l2, Connection connection) {
        return (Iterable) InTransaction.execute(this.dbi, connection, new InTransaction.InTransactionHandler<PersistentBusSqlDao, Iterable<BusEventWithMetadata<T>>>() { // from class: org.killbill.bus.DefaultPersistentBus.5
            @Override // org.killbill.queue.InTransaction.InTransactionHandler
            public Iterable<BusEventWithMetadata<T>> withSqlDao(PersistentBusSqlDao persistentBusSqlDao) {
                return DefaultPersistentBus.this.getAvailableOrInProcessingBusEventsForSearchKeysInternal(persistentBusSqlDao, null, l, l2);
            }
        }, PersistentBusSqlDao.class);
    }

    @Override // org.killbill.bus.api.PersistentBus
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableOrInProcessingBusEventsForSearchKey2(DateTime dateTime, Long l) {
        return getAvailableOrInProcessingBusEventsForSearchKeysInternal((PersistentBusSqlDao) this.dao.getSqlDao(), dateTime, null, l);
    }

    @Override // org.killbill.bus.api.PersistentBus
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableOrInProcessingBusEventsFromTransactionForSearchKey2(final DateTime dateTime, final Long l, Connection connection) {
        return (Iterable) InTransaction.execute(this.dbi, connection, new InTransaction.InTransactionHandler<PersistentBusSqlDao, Iterable<BusEventWithMetadata<T>>>() { // from class: org.killbill.bus.DefaultPersistentBus.6
            @Override // org.killbill.queue.InTransaction.InTransactionHandler
            public Iterable<BusEventWithMetadata<T>> withSqlDao(PersistentBusSqlDao persistentBusSqlDao) {
                return DefaultPersistentBus.this.getAvailableOrInProcessingBusEventsForSearchKeysInternal(persistentBusSqlDao, dateTime, null, l);
            }
        }, PersistentBusSqlDao.class);
    }

    @Override // org.killbill.bus.api.PersistentBus
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getHistoricalBusEventsForSearchKeys(Long l, Long l2) {
        return getHistoricalBusEventsForSearchKeysInternal((PersistentBusSqlDao) this.dao.getSqlDao(), null, l, l2);
    }

    @Override // org.killbill.bus.api.PersistentBus
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getHistoricalBusEventsForSearchKey2(DateTime dateTime, Long l) {
        return getHistoricalBusEventsForSearchKeysInternal((PersistentBusSqlDao) this.dao.getSqlDao(), dateTime, null, l);
    }

    @Override // org.killbill.bus.api.PersistentBus
    public long getNbReadyEntries(DateTime dateTime) {
        return this.dao.getNbReadyEntries(dateTime.toDate());
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("DefaultPersistentBus{");
        sb.append("dbBackedQId='").append(this.dbBackedQId).append('\'');
        sb.append('}');
        return sb.toString();
    }

    public void dispatchBusEventWithMetrics(QueueEvent queueEvent) throws EventBusException {
        Timer.Context time = this.busHandlersProcessingTime.time();
        try {
            this.eventBusDelegate.postWithException(queueEvent);
        } finally {
            time.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableBusEventsForSearchKeysInternal(PersistentBusSqlDao persistentBusSqlDao, @Nullable DateTime dateTime, @Nullable Long l, Long l2) {
        return toBusEventWithMetadata(getReadyQueueEntriesForSearchKeysWithProfiling(persistentBusSqlDao, dateTime, l, l2));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableOrInProcessingBusEventsForSearchKeysInternal(PersistentBusSqlDao persistentBusSqlDao, @Nullable DateTime dateTime, @Nullable Long l, Long l2) {
        return toBusEventWithMetadata(getReadyOrInProcessingQueueEntriesForSearchKeysWithProfiling(persistentBusSqlDao, dateTime, l, l2));
    }

    private <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getHistoricalBusEventsForSearchKeysInternal(PersistentBusSqlDao persistentBusSqlDao, @Nullable DateTime dateTime, @Nullable Long l, Long l2) {
        return toBusEventWithMetadata(getHistoricalQueueEntriesForSearchKeysWithProfiling(persistentBusSqlDao, dateTime, l, l2));
    }

    private Iterable<BusEventModelDao> getReadyQueueEntriesForSearchKeysWithProfiling(final PersistentBusSqlDao persistentBusSqlDao, @Nullable final DateTime dateTime, @Nullable final Long l, final Long l2) {
        return this.prof.executeWithProfiling(ProfilingFeature.ProfilingFeatureType.DAO, "DAO:PersistentBusSqlDao:getReadyQueueEntriesForSearchKeys", new Profiling.WithProfilingCallback<Iterable<BusEventModelDao>, RuntimeException>() { // from class: org.killbill.bus.DefaultPersistentBus.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.killbill.commons.profiling.Profiling.WithProfilingCallback
            public Iterable<BusEventModelDao> execute() throws RuntimeException {
                return new Iterable<BusEventModelDao>() { // from class: org.killbill.bus.DefaultPersistentBus.7.1
                    @Override // java.lang.Iterable
                    public Iterator<BusEventModelDao> iterator() {
                        return l != null ? persistentBusSqlDao.getReadyQueueEntriesForSearchKeys(l, l2, DefaultPersistentBus.this.config.getTableName()) : persistentBusSqlDao.getReadyQueueEntriesForSearchKey2(dateTime, l2, DefaultPersistentBus.this.config.getTableName());
                    }
                };
            }
        });
    }

    private Iterable<BusEventModelDao> getReadyOrInProcessingQueueEntriesForSearchKeysWithProfiling(final PersistentBusSqlDao persistentBusSqlDao, @Nullable final DateTime dateTime, @Nullable final Long l, final Long l2) {
        return this.prof.executeWithProfiling(ProfilingFeature.ProfilingFeatureType.DAO, "DAO:PersistentBusSqlDao:getReadyOrInProcessingQueueEntriesForSearchKeys", new Profiling.WithProfilingCallback<Iterable<BusEventModelDao>, RuntimeException>() { // from class: org.killbill.bus.DefaultPersistentBus.8
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.killbill.commons.profiling.Profiling.WithProfilingCallback
            public Iterable<BusEventModelDao> execute() throws RuntimeException {
                return new Iterable<BusEventModelDao>() { // from class: org.killbill.bus.DefaultPersistentBus.8.1
                    @Override // java.lang.Iterable
                    public Iterator<BusEventModelDao> iterator() {
                        return l != null ? persistentBusSqlDao.getReadyOrInProcessingQueueEntriesForSearchKeys(l, l2, DefaultPersistentBus.this.config.getTableName()) : persistentBusSqlDao.getReadyOrInProcessingQueueEntriesForSearchKey2(dateTime, l2, DefaultPersistentBus.this.config.getTableName());
                    }
                };
            }
        });
    }

    private Iterable<BusEventModelDao> getHistoricalQueueEntriesForSearchKeysWithProfiling(final PersistentBusSqlDao persistentBusSqlDao, @Nullable final DateTime dateTime, @Nullable final Long l, final Long l2) {
        return this.prof.executeWithProfiling(ProfilingFeature.ProfilingFeatureType.DAO, "DAO:PersistentBusSqlDao:getHistoricalQueueEntriesForSearchKeys", new Profiling.WithProfilingCallback<Iterable<BusEventModelDao>, RuntimeException>() { // from class: org.killbill.bus.DefaultPersistentBus.9
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.killbill.commons.profiling.Profiling.WithProfilingCallback
            public Iterable<BusEventModelDao> execute() throws RuntimeException {
                return new Iterable<BusEventModelDao>() { // from class: org.killbill.bus.DefaultPersistentBus.9.1
                    @Override // java.lang.Iterable
                    public Iterator<BusEventModelDao> iterator() {
                        return l != null ? persistentBusSqlDao.getHistoricalQueueEntriesForSearchKeys(l, l2, DefaultPersistentBus.this.config.getHistoryTableName()) : persistentBusSqlDao.getHistoricalQueueEntriesForSearchKey2(dateTime, l2, DefaultPersistentBus.this.config.getHistoryTableName());
                    }
                };
            }
        });
    }

    private <T extends BusEvent> Iterable<BusEventWithMetadata<T>> toBusEventWithMetadata(Iterable<BusEventModelDao> iterable) {
        return Iterables.transform(iterable, new Function<BusEventModelDao, BusEventWithMetadata<T>>() { // from class: org.killbill.bus.DefaultPersistentBus.10
            @Override // com.google.common.base.Function, java.util.function.Function
            public BusEventWithMetadata<T> apply(BusEventModelDao busEventModelDao) {
                return DefaultPersistentBus.this.toBusEventWithMetadata(busEventModelDao);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T extends BusEvent> BusEventWithMetadata<T> toBusEventWithMetadata(BusEventModelDao busEventModelDao) {
        return new BusEventWithMetadata<>(busEventModelDao.getRecordId(), busEventModelDao.getUserToken(), busEventModelDao.getCreatedDate(), busEventModelDao.getSearchKey1(), busEventModelDao.getSearchKey2(), (BusEvent) CallableCallbackBase.deserializeEvent(busEventModelDao, this.objectMapper));
    }

    public DBBackedQueue<BusEventModelDao> getDao() {
        return this.dao;
    }

    public Clock getClock() {
        return this.clock;
    }

    public PersistentBusConfig getConfig() {
        return this.config;
    }
}
