package org.axonframework.eventhandling;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonNonTransientException;
import org.axonframework.common.io.IOUtils;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.GenericTrackedDomainEventMessage;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.interceptors.TransactionManagingInterceptor;
import org.axonframework.messaging.unitofwork.BatchingUnitOfWork;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventhandling/TrackingEventProcessor.class */
public class TrackingEventProcessor extends AbstractEventProcessor {
    private static final Logger logger = LoggerFactory.getLogger(TrackingEventProcessor.class);
    public static final int DEFAULT_BACKOFF_TIME_MILLIS = 5000;
    private final StreamableMessageSource<TrackedEventMessage<?>> messageSource;
    private final TokenStore tokenStore;
    private final TransactionManager transactionManager;
    private final int batchSize;
    private final int segmentsSize;
    private final ActivityCountingThreadFactory threadFactory;
    private final AtomicReference<State> state;
    private final ConcurrentMap<Integer, TrackerStatus> activeSegments;
    private final int maxThreadCount;
    private final String segmentIdResourceKey;
    private final String lastTokenResourceKey;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/TrackingEventProcessor$ActivityCountingThreadFactory.class */
    public static class ActivityCountingThreadFactory implements ThreadFactory {
        private final AtomicInteger threadCount = new AtomicInteger(0);
        private final ThreadFactory delegate;

        public ActivityCountingThreadFactory(ThreadFactory threadFactory) {
            this.delegate = threadFactory;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return this.delegate.newThread(new CountingRunnable(runnable, this.threadCount));
        }

        public int activeThreads() {
            return this.threadCount.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/TrackingEventProcessor$CountingRunnable.class */
    public static class CountingRunnable implements Runnable {
        private final Runnable delegate;
        private final AtomicInteger counter;

        public CountingRunnable(Runnable runnable, AtomicInteger atomicInteger) {
            this.delegate = runnable;
            this.counter = atomicInteger;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.counter.incrementAndGet();
            try {
                this.delegate.run();
            } finally {
                this.counter.decrementAndGet();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/TrackingEventProcessor$ReplayingMessageStream.class */
    public class ReplayingMessageStream implements MessageStream<TrackedEventMessage<?>> {
        private final MessageStream<TrackedEventMessage<?>> delegate;
        private ReplayToken lastToken;

        public ReplayingMessageStream(ReplayToken replayToken, MessageStream<TrackedEventMessage<?>> messageStream) {
            this.delegate = messageStream;
            this.lastToken = replayToken;
        }

        @Override // org.axonframework.messaging.MessageStream
        public Optional<TrackedEventMessage<?>> peek() {
            return this.delegate.peek();
        }

        @Override // org.axonframework.messaging.MessageStream
        public boolean hasNextAvailable(int i, TimeUnit timeUnit) throws InterruptedException {
            return this.delegate.hasNextAvailable(i, timeUnit);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.axonframework.messaging.MessageStream
        public TrackedEventMessage<?> nextAvailable() throws InterruptedException {
            TrackedEventMessage<?> alterToken = alterToken(this.delegate.nextAvailable());
            this.lastToken = alterToken.trackingToken() instanceof ReplayToken ? (ReplayToken) alterToken.trackingToken() : null;
            return alterToken;
        }

        @Override // org.axonframework.messaging.MessageStream, java.lang.AutoCloseable
        public void close() {
            this.delegate.close();
        }

        @Override // org.axonframework.messaging.MessageStream
        public boolean hasNextAvailable() {
            return this.delegate.hasNextAvailable();
        }

        public <T> TrackedEventMessage<T> alterToken(TrackedEventMessage<T> trackedEventMessage) {
            return this.lastToken == null ? trackedEventMessage : trackedEventMessage instanceof DomainEventMessage ? new GenericTrackedDomainEventMessage(this.lastToken.advancedTo(trackedEventMessage.trackingToken()), (DomainEventMessage) trackedEventMessage) : new GenericTrackedEventMessage(this.lastToken.advancedTo(trackedEventMessage.trackingToken()), trackedEventMessage);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/axonframework/eventhandling/TrackingEventProcessor$State.class */
    public enum State {
        NOT_STARTED(false),
        STARTED(true),
        PAUSED(false),
        SHUT_DOWN(false),
        PAUSED_ERROR(false);

        private final boolean allowProcessing;

        State(boolean z) {
            this.allowProcessing = z;
        }

        boolean isRunning() {
            return this.allowProcessing;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/TrackingEventProcessor$TrackerStatus.class */
    public static final class TrackerStatus implements EventTrackerStatus {
        private final Segment segment;
        private final boolean caughtUp;
        private final TrackingToken trackingToken;

        private TrackerStatus(Segment segment, TrackingToken trackingToken) {
            this(segment, false, trackingToken);
        }

        private TrackerStatus(Segment segment, boolean z, TrackingToken trackingToken) {
            this.segment = segment;
            this.caughtUp = z;
            this.trackingToken = trackingToken;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TrackerStatus caughtUp() {
            return this.caughtUp ? this : new TrackerStatus(this.segment, true, this.trackingToken);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TrackerStatus advancedTo(TrackingToken trackingToken) {
            return Objects.equals(this.trackingToken, trackingToken) ? this : new TrackerStatus(this.segment, this.caughtUp, trackingToken);
        }

        @Override // org.axonframework.eventhandling.EventTrackerStatus
        public Segment getSegment() {
            return this.segment;
        }

        @Override // org.axonframework.eventhandling.EventTrackerStatus
        public boolean isCaughtUp() {
            return this.caughtUp;
        }

        @Override // org.axonframework.eventhandling.EventTrackerStatus
        public boolean isReplaying() {
            return this.trackingToken instanceof ReplayToken;
        }

        @Override // org.axonframework.eventhandling.EventTrackerStatus
        public TrackingToken getTrackingToken() {
            return this.trackingToken instanceof ReplayToken ? ((ReplayToken) this.trackingToken).unwrap() : this.trackingToken;
        }
    }

    /* loaded from: input_file:org/axonframework/eventhandling/TrackingEventProcessor$TrackingSegmentWorker.class */
    private class TrackingSegmentWorker implements Runnable {
        private final Segment segment;

        public TrackingSegmentWorker(Segment segment) {
            this.segment = segment;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                TrackingEventProcessor.this.processingLoop(this.segment);
            } catch (Throwable th) {
                TrackingEventProcessor.logger.error("Processing loop ended due to uncaught exception. Processor pausing.", th);
                TrackingEventProcessor.this.state.set(State.PAUSED_ERROR);
            } finally {
                TrackingEventProcessor.this.activeSegments.remove(Integer.valueOf(this.segment.getSegmentId()));
            }
        }

        public String toString() {
            return "TrackingSegmentWorker{processor=" + TrackingEventProcessor.this.getName() + ", segment=" + this.segment + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/TrackingEventProcessor$WorkerLauncher.class */
    public class WorkerLauncher implements Runnable {
        private WorkerLauncher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            TrackingSegmentWorker trackingSegmentWorker;
            int i = 1;
            String name = TrackingEventProcessor.this.getName();
            while (TrackingEventProcessor.this.getState().isRunning()) {
                try {
                    int[] fetchSegments = TrackingEventProcessor.this.tokenStore.fetchSegments(name);
                    i = 1;
                    if (fetchSegments.length == 0 && TrackingEventProcessor.this.segmentsSize > 0) {
                        fetchSegments = (int[]) TrackingEventProcessor.this.transactionManager.fetchInTransaction(() -> {
                            TrackingEventProcessor.this.tokenStore.initializeTokenSegments(name, TrackingEventProcessor.this.segmentsSize);
                            return TrackingEventProcessor.this.tokenStore.fetchSegments(name);
                        });
                    }
                    Segment[] computeSegments = Segment.computeSegments(fetchSegments);
                    trackingSegmentWorker = null;
                    int i2 = 0;
                    while (true) {
                        if (i2 >= computeSegments.length || TrackingEventProcessor.this.activeSegments.size() >= TrackingEventProcessor.this.maxThreadCount) {
                            break;
                        }
                        Segment segment = computeSegments[i2];
                        if (!TrackingEventProcessor.this.activeSegments.containsKey(Integer.valueOf(segment.getSegmentId()))) {
                            try {
                                TrackingEventProcessor.this.transactionManager.executeInTransaction(() -> {
                                    TrackingEventProcessor.this.activeSegments.putIfAbsent(Integer.valueOf(segment.getSegmentId()), new TrackerStatus(segment, TrackingEventProcessor.this.tokenStore.fetchToken(name, segment.getSegmentId())));
                                });
                                TrackingSegmentWorker trackingSegmentWorker2 = new TrackingSegmentWorker(segment);
                                if (TrackingEventProcessor.this.threadFactory.activeThreads() >= TrackingEventProcessor.this.maxThreadCount) {
                                    trackingSegmentWorker = trackingSegmentWorker2;
                                    break;
                                } else {
                                    TrackingEventProcessor.logger.info("Dispatching new tracking segment worker: {}", trackingSegmentWorker2);
                                    TrackingEventProcessor.this.threadFactory.newThread(trackingSegmentWorker2).start();
                                }
                            } catch (UnableToClaimTokenException e) {
                                TrackingEventProcessor.logger.debug("Unable to claim the token for segment: {}. It is owned by another process", Integer.valueOf(segment.getSegmentId()));
                                TrackingEventProcessor.this.activeSegments.remove(Integer.valueOf(segment.getSegmentId()));
                            } catch (Exception e2) {
                                TrackingEventProcessor.this.activeSegments.remove(Integer.valueOf(segment.getSegmentId()));
                                if (AxonNonTransientException.isCauseOf(e2)) {
                                    TrackingEventProcessor.logger.error("An unrecoverable error has occurred wile attempting to claim a token for segment: {}. Shutting down processor [{}].", new Object[]{Integer.valueOf(segment.getSegmentId()), TrackingEventProcessor.this.getName(), e2});
                                    TrackingEventProcessor.this.state.set(State.PAUSED_ERROR);
                                    break;
                                }
                                TrackingEventProcessor.logger.info("An error occurred while attempting to claim a token for segment: {}. Will retry later...", Integer.valueOf(segment.getSegmentId()), e2);
                            }
                        }
                        i2++;
                    }
                } catch (Exception e3) {
                    TrackingEventProcessor.logger.warn("Fetch Segments for Processor '{}' failed: {}. Preparing for retry in {}s", new Object[]{name, e3.getMessage(), Integer.valueOf(i)});
                    TrackingEventProcessor.this.doSleepFor(TimeUnit.SECONDS.toMillis(Math.min(i, 60)));
                    i *= 2;
                }
                if (Objects.nonNull(trackingSegmentWorker)) {
                    TrackingEventProcessor.logger.info("Using current Thread for last segment segment worker: {}", trackingSegmentWorker);
                    trackingSegmentWorker.run();
                    return;
                }
                TrackingEventProcessor.this.doSleepFor(5000L);
            }
        }
    }

    public TrackingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, TokenStore tokenStore, TransactionManager transactionManager) {
        this(str, eventHandlerInvoker, streamableMessageSource, tokenStore, transactionManager, NoOpMessageMonitor.INSTANCE);
    }

    public TrackingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, TokenStore tokenStore, TransactionManager transactionManager, MessageMonitor<? super EventMessage<?>> messageMonitor) {
        this(str, eventHandlerInvoker, streamableMessageSource, tokenStore, transactionManager, messageMonitor, RollbackConfigurationType.ANY_THROWABLE, PropagatingErrorHandler.INSTANCE, TrackingEventProcessorConfiguration.forSingleThreadedProcessing());
    }

    public TrackingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, TokenStore tokenStore, TransactionManager transactionManager, MessageMonitor<? super EventMessage<?>> messageMonitor, RollbackConfiguration rollbackConfiguration, ErrorHandler errorHandler, TrackingEventProcessorConfiguration trackingEventProcessorConfiguration) {
        super(str, eventHandlerInvoker, rollbackConfiguration, errorHandler, messageMonitor);
        this.state = new AtomicReference<>(State.NOT_STARTED);
        this.activeSegments = new ConcurrentSkipListMap();
        this.batchSize = trackingEventProcessorConfiguration.getBatchSize();
        this.messageSource = (StreamableMessageSource) Objects.requireNonNull(streamableMessageSource);
        this.tokenStore = (TokenStore) Objects.requireNonNull(tokenStore);
        this.segmentsSize = trackingEventProcessorConfiguration.getInitialSegmentsCount();
        this.transactionManager = transactionManager;
        this.maxThreadCount = trackingEventProcessorConfiguration.getMaxThreadCount();
        this.threadFactory = new ActivityCountingThreadFactory(trackingEventProcessorConfiguration.getThreadFactory(str));
        this.segmentIdResourceKey = "Processor[" + str + "]/SegmentId";
        this.lastTokenResourceKey = "Processor[" + str + "]/Token";
        registerInterceptor(new TransactionManagingInterceptor(transactionManager));
        registerInterceptor((unitOfWork, interceptorChain) -> {
            if (!(unitOfWork instanceof BatchingUnitOfWork) || ((BatchingUnitOfWork) unitOfWork).isFirstMessage()) {
                tokenStore.extendClaim(getName(), ((Integer) unitOfWork.getResource(this.segmentIdResourceKey)).intValue());
            }
            if (!(unitOfWork instanceof BatchingUnitOfWork) || ((BatchingUnitOfWork) unitOfWork).isLastMessage()) {
                unitOfWork.onPrepareCommit(unitOfWork -> {
                    tokenStore.storeToken((TrackingToken) unitOfWork.getResource(this.lastTokenResourceKey), str, ((Integer) unitOfWork.getResource(this.segmentIdResourceKey)).intValue());
                });
            }
            return interceptorChain.proceed();
        });
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public void start() {
        if (this.state.getAndSet(State.STARTED).isRunning()) {
            return;
        }
        startSegmentWorkers();
    }

    protected void processingLoop(Segment segment) {
        MessageStream<TrackedEventMessage<?>> messageStream = null;
        long j = 1;
        while (this.state.get().isRunning()) {
            try {
                try {
                    try {
                        messageStream = ensureEventStreamOpened(messageStream, segment);
                        processBatch(segment, messageStream);
                        j = 1;
                    } catch (UnableToClaimTokenException e) {
                        if (j == 1) {
                            logger.info("Token is owned by another node. Waiting for it to become available...");
                        }
                        doSleepFor(5000L);
                    }
                } catch (Exception e2) {
                    if (j == 1) {
                        logger.warn("Error occurred. Starting retry mode.", e2);
                    }
                    logger.warn("Releasing claim on token and preparing for retry in {}s", Long.valueOf(j));
                    releaseToken(segment);
                    IOUtils.closeQuietly(messageStream);
                    messageStream = null;
                    doSleepFor(TimeUnit.SECONDS.toMillis(j));
                    j = Math.min(j * 2, 60L);
                }
            } catch (Throwable th) {
                IOUtils.closeQuietly(messageStream);
                releaseToken(segment);
                throw th;
            }
        }
        IOUtils.closeQuietly(messageStream);
        releaseToken(segment);
    }

    private void releaseToken(Segment segment) {
        try {
            this.transactionManager.executeInTransaction(() -> {
                this.tokenStore.releaseClaim(getName(), segment.getSegmentId());
            });
        } catch (Exception e) {
        }
    }

    private void processBatch(Segment segment, MessageStream<TrackedEventMessage<?>> messageStream) throws Exception {
        ArrayList arrayList = new ArrayList();
        try {
            checkSegmentCaughtUp(segment, messageStream);
            TrackingToken trackingToken = null;
            if (!messageStream.hasNextAvailable(1, TimeUnit.SECONDS)) {
                this.transactionManager.executeInTransaction(() -> {
                    this.tokenStore.extendClaim(getName(), segment.getSegmentId());
                });
                return;
            }
            for (int i = 0; i < this.batchSize * 10 && arrayList.size() < this.batchSize && messageStream.hasNextAvailable(); i++) {
                TrackedEventMessage<?> nextAvailable = messageStream.nextAvailable();
                trackingToken = nextAvailable.trackingToken();
                if (canHandle(nextAvailable, segment)) {
                    arrayList.add(nextAvailable);
                } else {
                    reportIgnored(nextAvailable);
                }
            }
            if (arrayList.isEmpty()) {
                TrackingToken trackingToken2 = trackingToken;
                this.transactionManager.executeInTransaction(() -> {
                    this.tokenStore.storeToken(trackingToken2, getName(), segment.getSegmentId());
                });
                return;
            }
            TrackingToken trackingToken3 = trackingToken;
            while (trackingToken != null && messageStream.peek().filter(trackedEventMessage -> {
                return trackingToken3.equals(trackedEventMessage.trackingToken());
            }).isPresent()) {
                TrackedEventMessage<?> nextAvailable2 = messageStream.nextAvailable();
                if (canHandle(nextAvailable2, segment)) {
                    arrayList.add(nextAvailable2);
                }
            }
            BatchingUnitOfWork batchingUnitOfWork = new BatchingUnitOfWork(arrayList);
            batchingUnitOfWork.resources().put(this.segmentIdResourceKey, Integer.valueOf(segment.getSegmentId()));
            batchingUnitOfWork.resources().put(this.lastTokenResourceKey, trackingToken3);
            processInUnitOfWork(arrayList, batchingUnitOfWork, segment);
            this.activeSegments.computeIfPresent(Integer.valueOf(segment.getSegmentId()), (num, trackerStatus) -> {
                return trackerStatus.advancedTo(trackingToken3);
            });
        } catch (InterruptedException e) {
            logger.error(String.format("Event processor [%s] was interrupted. Shutting down.", getName()), e);
            shutDown();
            Thread.currentThread().interrupt();
        }
    }

    private void checkSegmentCaughtUp(Segment segment, MessageStream<TrackedEventMessage<?>> messageStream) {
        if (messageStream.hasNextAvailable()) {
            return;
        }
        this.activeSegments.computeIfPresent(Integer.valueOf(segment.getSegmentId()), (num, trackerStatus) -> {
            return trackerStatus.caughtUp();
        });
    }

    private MessageStream<TrackedEventMessage<?>> ensureEventStreamOpened(MessageStream<TrackedEventMessage<?>> messageStream, Segment segment) {
        MessageStream<TrackedEventMessage<?>> messageStream2 = messageStream;
        if (messageStream2 == null && this.state.get().isRunning()) {
            TrackingToken trackingToken = (TrackingToken) this.transactionManager.fetchInTransaction(() -> {
                return this.tokenStore.fetchToken(getName(), segment.getSegmentId());
            });
            logger.info("Fetched token: {} for segment: {}", trackingToken, segment);
            messageStream2 = (MessageStream) this.transactionManager.fetchInTransaction(() -> {
                return doOpenStream(trackingToken);
            });
        }
        return messageStream2;
    }

    private MessageStream<TrackedEventMessage<?>> doOpenStream(TrackingToken trackingToken) {
        return trackingToken instanceof ReplayToken ? new ReplayingMessageStream((ReplayToken) trackingToken, this.messageSource.openStream2(((ReplayToken) trackingToken).unwrap())) : this.messageSource.openStream2(trackingToken);
    }

    @Deprecated
    public void pause() {
        this.state.updateAndGet(state -> {
            return state.isRunning() ? State.PAUSED : state;
        });
    }

    public void resetTokens() {
        Assert.state(supportsReset(), () -> {
            return "The handlers assigned to this Processor do not support a reset";
        });
        Assert.state(!isRunning() && activeProcessorThreads() == 0, () -> {
            return "TrackingProcessor must be shut down before triggering a reset";
        });
        this.transactionManager.executeInTransaction(() -> {
            int[] fetchSegments = this.tokenStore.fetchSegments(getName());
            TrackingToken[] trackingTokenArr = new TrackingToken[fetchSegments.length];
            for (int i = 0; i < fetchSegments.length; i++) {
                trackingTokenArr[i] = this.tokenStore.fetchToken(getName(), fetchSegments[i]);
            }
            eventHandlerInvoker().performReset();
            for (int i2 = 0; i2 < trackingTokenArr.length; i2++) {
                this.tokenStore.storeToken(ReplayToken.createReplayToken(trackingTokenArr[i2]), getName(), fetchSegments[i2]);
            }
        });
    }

    public boolean supportsReset() {
        return eventHandlerInvoker().supportsReset();
    }

    public boolean isRunning() {
        return this.state.get().isRunning();
    }

    public boolean isError() {
        return this.state.get() == State.PAUSED_ERROR;
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public void shutDown() {
        if (this.state.getAndUpdate(state -> {
            return State.SHUT_DOWN;
        }) != State.SHUT_DOWN) {
            logger.info("Shutdown state set for Processor '{}'. Awaiting termination...", getName());
            while (this.threadFactory.activeThreads() > 0) {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    logger.info("Thread was interrupted while waiting for TrackingProcessor '{}' shutdown.", getName());
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    public int activeProcessorThreads() {
        return this.activeSegments.size();
    }

    public Map<Integer, EventTrackerStatus> processingStatus() {
        return Collections.unmodifiableMap(this.activeSegments);
    }

    protected State getState() {
        return this.state.get();
    }

    protected void startSegmentWorkers() {
        this.threadFactory.newThread(new WorkerLauncher()).start();
    }

    protected void doSleepFor(long j) {
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (getState().isRunning()) {
            try {
                long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                if (currentTimeMillis2 <= 0) {
                    break;
                } else {
                    Thread.sleep(Math.min(currentTimeMillis2, 100L));
                }
            } catch (InterruptedException e) {
                logger.warn("Thread interrupted. Preparing to shut down event processor");
                shutDown();
                Thread.currentThread().interrupt();
                return;
            }
        }
    }
}
