package org.killbill.queue.dispatching;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.lang.reflect.InvocationTargetException;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.killbill.clock.Clock;
import org.killbill.commons.concurrent.DynamicThreadPoolExecutorWithLoggingOnExceptions;
import org.killbill.queue.DefaultQueueLifecycle;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.api.QueueEvent;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.retry.RetryableInternalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:org/killbill/queue/dispatching/Dispatcher.class */
public class Dispatcher<E extends QueueEvent, M extends EventEntryModelDao> {
    private static final Logger log = LoggerFactory.getLogger(Dispatcher.class);
    private final int corePoolSize;
    private final int maximumPoolSize;
    private final long keepAliveTime;
    private final TimeUnit keepAliveTimeUnit;
    private final BlockingQueue<Runnable> workQueue;
    private final ThreadFactory threadFactory;
    private final RejectedExecutionHandler rejectionHandler;
    private final int maxFailureRetries;
    private final CallableCallback<E, M> handlerCallback;
    private final DefaultQueueLifecycle parentLifeCycle;
    private final Clock clock;
    private ExecutorService handlerExecutor;

    /* loaded from: input_file:org/killbill/queue/dispatching/Dispatcher$CallableQueueHandler.class */
    public static class CallableQueueHandler<E extends QueueEvent, M extends EventEntryModelDao> implements Callable<E> {
        private static final String MDC_KB_USER_TOKEN = "kb.userToken";
        private static final Logger log = LoggerFactory.getLogger(CallableQueueHandler.class);
        private final M entry;
        private final CallableCallback<E, M> callback;
        private final DefaultQueueLifecycle parentLifeCycle;
        private final int maxFailureRetries;
        private final Clock clock;

        public CallableQueueHandler(M m, CallableCallback<E, M> callableCallback, DefaultQueueLifecycle defaultQueueLifecycle, Clock clock, int i) {
            this.entry = m;
            this.callback = callableCallback;
            this.parentLifeCycle = defaultQueueLifecycle;
            this.clock = clock;
            this.maxFailureRetries = i;
        }

        @Override // java.util.concurrent.Callable
        public E call() throws Exception {
            try {
                UUID userToken = this.entry.getUserToken();
                MDC.put(MDC_KB_USER_TOKEN, userToken != null ? userToken.toString() : null);
                log.debug("Starting processing entry {}", this.entry);
                E deserialize = this.callback.deserialize(this.entry);
                if (deserialize != null) {
                    Object obj = null;
                    long longValue = this.entry.getErrorCount().longValue();
                    try {
                        try {
                            this.callback.dispatch(deserialize, this.entry);
                            if (this.parentLifeCycle != null) {
                                if (0 == 0) {
                                    this.parentLifeCycle.dispatchCompletedOrFailedEvents(this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED, this.entry.getErrorCount().longValue()));
                                    log.debug("Done handling notification {}, key = {}", this.entry.getRecordId(), this.entry.getEventJson());
                                } else if (obj instanceof RetryableInternalException) {
                                    this.parentLifeCycle.dispatchCompletedOrFailedEvents(this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.FAILED, this.entry.getErrorCount().longValue()));
                                } else if (longValue <= this.maxFailureRetries) {
                                    log.info("Dispatch error, will attempt a retry ", (Throwable) null);
                                    this.parentLifeCycle.dispatchRetriedEvents(this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.AVAILABLE, longValue));
                                } else {
                                    log.error("Fatal NotificationQ dispatch error, data corruption...", (Throwable) null);
                                    this.parentLifeCycle.dispatchCompletedOrFailedEvents(this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.FAILED, this.entry.getErrorCount().longValue()));
                                }
                            }
                        } catch (Throwable th) {
                            if (this.parentLifeCycle != null) {
                                if (0 == 0) {
                                    this.parentLifeCycle.dispatchCompletedOrFailedEvents(this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED, this.entry.getErrorCount().longValue()));
                                    log.debug("Done handling notification {}, key = {}", this.entry.getRecordId(), this.entry.getEventJson());
                                } else if (obj instanceof RetryableInternalException) {
                                    this.parentLifeCycle.dispatchCompletedOrFailedEvents(this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.FAILED, this.entry.getErrorCount().longValue()));
                                } else if (longValue <= this.maxFailureRetries) {
                                    log.info("Dispatch error, will attempt a retry ", (Throwable) null);
                                    this.parentLifeCycle.dispatchRetriedEvents(this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.AVAILABLE, longValue));
                                } else {
                                    log.error("Fatal NotificationQ dispatch error, data corruption...", (Throwable) null);
                                    this.parentLifeCycle.dispatchCompletedOrFailedEvents(this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.FAILED, this.entry.getErrorCount().longValue()));
                                }
                            }
                            throw th;
                        }
                    } catch (Exception e) {
                        Throwable cause = (e.getCause() == null || !(e.getCause() instanceof InvocationTargetException)) ? (e.getCause() == null || !(e.getCause() instanceof RetryableInternalException)) ? e : e.getCause() : e.getCause().getCause();
                        long j = longValue + 1;
                        if (this.parentLifeCycle != null) {
                            if (cause == null) {
                                this.parentLifeCycle.dispatchCompletedOrFailedEvents(this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED, this.entry.getErrorCount().longValue()));
                                log.debug("Done handling notification {}, key = {}", this.entry.getRecordId(), this.entry.getEventJson());
                            } else if (cause instanceof RetryableInternalException) {
                                this.parentLifeCycle.dispatchCompletedOrFailedEvents(this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.FAILED, this.entry.getErrorCount().longValue()));
                            } else if (j <= this.maxFailureRetries) {
                                log.info("Dispatch error, will attempt a retry ", cause);
                                this.parentLifeCycle.dispatchRetriedEvents(this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.AVAILABLE, j));
                            } else {
                                log.error("Fatal NotificationQ dispatch error, data corruption...", cause);
                                this.parentLifeCycle.dispatchCompletedOrFailedEvents(this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.FAILED, this.entry.getErrorCount().longValue()));
                            }
                        }
                    }
                }
                return deserialize;
            } finally {
                MDC.remove(MDC_KB_USER_TOKEN);
            }
        }
    }

    public Dispatcher(int i, PersistentQueueConfig persistentQueueConfig, long j, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler, Clock clock, CallableCallback<E, M> callableCallback, DefaultQueueLifecycle defaultQueueLifecycle) {
        this.corePoolSize = i;
        this.maximumPoolSize = persistentQueueConfig.geMaxDispatchThreads();
        this.keepAliveTime = j;
        this.keepAliveTimeUnit = timeUnit;
        this.workQueue = blockingQueue;
        this.threadFactory = threadFactory;
        this.rejectionHandler = rejectedExecutionHandler;
        this.clock = clock;
        this.maxFailureRetries = persistentQueueConfig.getMaxFailureRetries();
        this.handlerCallback = callableCallback;
        this.parentLifeCycle = defaultQueueLifecycle;
    }

    public void start() {
        this.handlerExecutor = new DynamicThreadPoolExecutorWithLoggingOnExceptions(this.corePoolSize, this.maximumPoolSize, this.keepAliveTime, this.keepAliveTimeUnit, this.workQueue, this.threadFactory, this.rejectionHandler);
    }

    public void stop() {
        this.handlerExecutor.shutdown();
        try {
            this.handlerExecutor.awaitTermination(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Stop sequence, handlerExecutor has been interrupted");
        }
    }

    @SuppressFBWarnings({"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"})
    public void dispatch(M m) {
        log.debug("Dispatching entry {}", m);
        this.handlerExecutor.submit(new CallableQueueHandler(m, this.handlerCallback, this.parentLifeCycle, this.clock, this.maxFailureRetries));
    }
}
