package org.axonframework.eventhandling.replay;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.axonframework.common.DirectExecutor;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.Cluster;
import org.axonframework.eventhandling.ClusterMetaData;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventstore.EventVisitor;
import org.axonframework.eventstore.management.EventStoreManagement;
import org.axonframework.unitofwork.TransactionManager;

/* loaded from: input_file:org/axonframework/eventhandling/replay/ReplayingCluster.class */
public class ReplayingCluster implements Cluster {
    private final Cluster delegate;
    private final EventStoreManagement replayingEventStore;
    private final TransactionManager transactionManager;
    private final int commitThreshold;
    private final IncomingMessageHandler incomingMessageHandler;
    private final List<ReplayAware> replayAwareListeners = new CopyOnWriteArrayList();
    private volatile boolean inReplay = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/replay/ReplayingCluster$ReplayEventsTask.class */
    public class ReplayEventsTask implements Runnable {

        /* loaded from: input_file:org/axonframework/eventhandling/replay/ReplayingCluster$ReplayEventsTask$ReplayingEventVisitor.class */
        private class ReplayingEventVisitor implements EventVisitor {
            private int eventCounter = 0;
            private Object currentTransaction;

            public ReplayingEventVisitor(Object obj) {
                this.currentTransaction = obj;
            }

            @Override // org.axonframework.eventstore.EventVisitor
            public void doWithEvent(DomainEventMessage domainEventMessage) {
                if (ReplayingCluster.this.commitThreshold > 0) {
                    int i = this.eventCounter + 1;
                    this.eventCounter = i;
                    if (i > ReplayingCluster.this.commitThreshold) {
                        this.eventCounter = 0;
                        ReplayingCluster.this.transactionManager.commitTransaction(this.currentTransaction);
                        this.currentTransaction = ReplayingCluster.this.transactionManager.startTransaction();
                    }
                }
                ReplayingCluster.this.delegate.publish(domainEventMessage);
                ReplayingCluster.this.incomingMessageHandler.releaseMessage(domainEventMessage);
            }

            public Object getTransaction() {
                return this.currentTransaction;
            }
        }

        private ReplayEventsTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            ReplayingCluster.this.incomingMessageHandler.prepareForReplay(ReplayingCluster.this.delegate);
            ReplayingCluster.this.inReplay = true;
            ReplayingEventVisitor replayingEventVisitor = new ReplayingEventVisitor(ReplayingCluster.this.transactionManager.startTransaction());
            try {
                try {
                    Iterator it = ReplayingCluster.this.replayAwareListeners.iterator();
                    while (it.hasNext()) {
                        ((ReplayAware) it.next()).beforeReplay();
                    }
                    ReplayingCluster.this.replayingEventStore.visitEvents(replayingEventVisitor);
                    Iterator it2 = ReplayingCluster.this.replayAwareListeners.iterator();
                    while (it2.hasNext()) {
                        ((ReplayAware) it2.next()).afterReplay();
                    }
                    ReplayingCluster.this.incomingMessageHandler.processBacklog(ReplayingCluster.this.delegate);
                    ReplayingCluster.this.transactionManager.commitTransaction(replayingEventVisitor.getTransaction());
                    ReplayingCluster.this.inReplay = false;
                } catch (RuntimeException e) {
                    try {
                        ReplayingCluster.this.incomingMessageHandler.onReplayFailed(ReplayingCluster.this.delegate, e);
                        ReplayingCluster.this.transactionManager.rollbackTransaction(replayingEventVisitor.getTransaction());
                        throw e;
                    } catch (Throwable th) {
                        ReplayingCluster.this.transactionManager.rollbackTransaction(replayingEventVisitor.getTransaction());
                        throw th;
                    }
                }
            } catch (Throwable th2) {
                ReplayingCluster.this.inReplay = false;
                throw th2;
            }
        }
    }

    public ReplayingCluster(Cluster cluster, EventStoreManagement eventStoreManagement, TransactionManager transactionManager, int i, IncomingMessageHandler incomingMessageHandler) {
        this.delegate = cluster;
        this.replayingEventStore = eventStoreManagement;
        this.transactionManager = transactionManager;
        this.commitThreshold = i;
        this.incomingMessageHandler = incomingMessageHandler;
    }

    public void startReplay() {
        try {
            startReplay(DirectExecutor.INSTANCE).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplayFailedException("Replay failed because it was interrupted", e);
        } catch (ExecutionException e2) {
            throw new ReplayFailedException("Replay failed due to an exception.", e2.getCause());
        }
    }

    public Future<Void> startReplay(Executor executor) {
        FutureTask futureTask = new FutureTask(new ReplayEventsTask(), null);
        executor.execute(futureTask);
        return futureTask;
    }

    public boolean isInReplayMode() {
        return this.inReplay;
    }

    @Override // org.axonframework.eventhandling.Cluster
    public String getName() {
        return this.delegate.getName();
    }

    @Override // org.axonframework.eventhandling.Cluster
    public void publish(EventMessage... eventMessageArr) {
        if (this.inReplay) {
            this.incomingMessageHandler.onIncomingMessages(this.delegate, eventMessageArr);
        } else {
            this.delegate.publish(eventMessageArr);
        }
    }

    @Override // org.axonframework.eventhandling.Cluster
    public void subscribe(EventListener eventListener) {
        this.delegate.subscribe(eventListener);
        if (eventListener instanceof ReplayAware) {
            this.replayAwareListeners.add((ReplayAware) eventListener);
        }
    }

    @Override // org.axonframework.eventhandling.Cluster
    public void unsubscribe(EventListener eventListener) {
        if (eventListener instanceof ReplayAware) {
            this.replayAwareListeners.remove(eventListener);
        }
        this.delegate.unsubscribe(eventListener);
    }

    @Override // org.axonframework.eventhandling.Cluster
    public Set<EventListener> getMembers() {
        return this.delegate.getMembers();
    }

    @Override // org.axonframework.eventhandling.Cluster
    public ClusterMetaData getMetaData() {
        return this.delegate.getMetaData();
    }
}
