package no.ks.eventstore2.saga;

import akka.ConfigurationException;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.cluster.ClusterEvent;
import java.beans.IntrospectionException;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Method;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import no.ks.eventstore2.AkkaClusterInfo;
import no.ks.eventstore2.Event;
import no.ks.eventstore2.TakeBackup;
import no.ks.eventstore2.TakeSnapshot;
import no.ks.eventstore2.eventstore.AcknowledgePreviousEventsProcessed;
import no.ks.eventstore2.eventstore.IncompleteSubscriptionPleaseSendNew;
import no.ks.eventstore2.eventstore.Subscription;
import no.ks.eventstore2.projection.Subscriber;
import no.ks.eventstore2.reflection.HandlerFinder;
import no.ks.eventstore2.response.Success;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:no/ks/eventstore2/saga/SagaManager.class */
public class SagaManager extends UntypedActor {
    private final ActorRef commandDispatcher;
    private final SagaRepository repository;
    private ActorRef eventstore;
    private String packageScanPath;
    private AkkaClusterInfo akkaClusterInfo;
    private static Logger log = LoggerFactory.getLogger(SagaManager.class);
    private static Set<String> aggregates = new HashSet();
    private static Map<Class<? extends Event>, ArrayList<SagaEventMapping>> eventToSagaMap = new HashMap();
    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
    private Map<SagaCompositeId, ActorRef> sagas = new HashMap();
    private Map<String, String> latestJournalidReceived = new HashMap();
    private final Cancellable snapshotSchedule = getContext().system().scheduler().schedule(Duration.create(1, TimeUnit.HOURS), Duration.create(2, TimeUnit.HOURS), getSelf(), new TakeSnapshot(), getContext().dispatcher(), (ActorRef) null);

    public static Props mkProps(ActorRef actorRef, SagaInMemoryRepository sagaInMemoryRepository, ActorRef actorRef2) {
        return mkProps(actorRef, sagaInMemoryRepository, actorRef2, "no");
    }

    public static Props mkProps(ActorRef actorRef, SagaRepository sagaRepository, ActorRef actorRef2, String str) {
        return Props.create(SagaManager.class, new Object[]{actorRef, sagaRepository, actorRef2, str});
    }

    public SagaManager(ActorRef actorRef, SagaRepository sagaRepository, ActorRef actorRef2, String str) {
        this.commandDispatcher = actorRef;
        this.repository = sagaRepository;
        this.eventstore = actorRef2;
        this.packageScanPath = str;
    }

    public void postStop() {
        this.snapshotSchedule.cancel();
        this.repository.close();
    }

    public void preStart() {
        registerSagas();
        this.akkaClusterInfo = new AkkaClusterInfo(getContext().system());
        this.akkaClusterInfo.subscribeToClusterEvents(self());
        updateLeaderState(null);
    }

    public void postRestart(Throwable th) throws Exception {
        super.postRestart(th);
        log.warn("Restarted sagamanager, restarting storage");
        this.repository.close();
        if (this.akkaClusterInfo.isLeader()) {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
            }
            this.repository.open();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onReceive(Object obj) throws Exception {
        if (log.isDebugEnabled() && (obj instanceof Event)) {
            log.debug("SagaManager received event {} is leader {}", obj, Boolean.valueOf(this.akkaClusterInfo.isLeader()));
        }
        if (obj instanceof Event) {
            this.latestJournalidReceived.put(((Event) obj).getAggregateType(), ((Event) obj).getJournalid());
        }
        if ((obj instanceof Event) && this.akkaClusterInfo.isLeader()) {
            log.debug("SagaManager processing event {}", obj);
            Event event = (Event) obj;
            for (SagaEventMapping sagaEventMapping : getSagaClassesForEvent(event.getClass())) {
                getOrCreateSaga(sagaEventMapping.getSagaClass(), (String) sagaEventMapping.getPropertyMethod().invoke(event, new Object[0])).tell(event, self());
            }
            return;
        }
        if (obj instanceof ClusterEvent.LeaderChanged) {
            updateLeaderState((ClusterEvent.LeaderChanged) obj);
            return;
        }
        if ((obj instanceof UpgradeSagaRepoStore) && this.akkaClusterInfo.isLeader()) {
            this.repository.open();
            if (this.repository.getState("Saga", "upgradedH2Db") != 1) {
                log.info("Upgrading sagaRepository");
                ((UpgradeSagaRepoStore) obj).getSagaRepository().readAllStatesToNewRepository(this.repository);
                this.repository.saveState("Saga", "upgradedH2Db", (byte) 1);
                return;
            }
            return;
        }
        if (obj instanceof IncompleteSubscriptionPleaseSendNew) {
            String aggregateType = ((IncompleteSubscriptionPleaseSendNew) obj).getAggregateType();
            log.debug("Sending new subscription on '{}' from latest journalid '{}'", aggregateType, this.latestJournalidReceived);
            if (this.latestJournalidReceived.get(aggregateType) == null) {
                throw new RuntimeException("Missing latestJournalidReceived but got IncompleteSubscriptionPleaseSendNew");
            }
            this.eventstore.tell(new Subscription(aggregateType, this.latestJournalidReceived.get(aggregateType)), self());
            return;
        }
        if (obj instanceof TakeBackup) {
            if (this.akkaClusterInfo.isLeader()) {
                this.repository.doBackup(((TakeBackup) obj).getBackupdir(), "backupSagaRepo" + this.format.format(new Date()));
            }
        } else {
            if (obj instanceof AcknowledgePreviousEventsProcessed) {
                if (this.akkaClusterInfo.isLeader()) {
                    sender().tell(new Success(), self());
                    return;
                } else {
                    getLeaderSagaManager().tell(obj, sender());
                    return;
                }
            }
            if ((obj instanceof TakeSnapshot) && this.akkaClusterInfo.isLeader()) {
                for (String str : this.latestJournalidReceived.keySet()) {
                    log.info("Saving latestJournalId {} for sagaManager aggregate {}", this.latestJournalidReceived.get(str), str);
                    this.repository.saveLatestJournalId(str, this.latestJournalidReceived.get(str));
                }
            }
        }
    }

    private ActorRef getLeaderSagaManager() {
        return getContext().actorFor(this.akkaClusterInfo.getLeaderAdress() + "/user/sagaManager");
    }

    private ActorRef getOrCreateSaga(Class<? extends Saga> cls, String str) {
        SagaCompositeId sagaCompositeId = new SagaCompositeId(cls, str);
        if (!this.sagas.containsKey(sagaCompositeId)) {
            this.sagas.put(sagaCompositeId, getContext().actorOf(Props.create(cls, new Object[]{str, this.commandDispatcher, this.repository})));
        }
        return this.sagas.get(sagaCompositeId);
    }

    private void registerSagas() {
        ClassPathScanningCandidateComponentProvider classPathScanningCandidateComponentProvider = new ClassPathScanningCandidateComponentProvider(false);
        classPathScanningCandidateComponentProvider.addIncludeFilter(new AnnotationTypeFilter(ListensTo.class));
        classPathScanningCandidateComponentProvider.addIncludeFilter(new AnnotationTypeFilter(SagaEventIdProperty.class));
        for (BeanDefinition beanDefinition : classPathScanningCandidateComponentProvider.findCandidateComponents(this.packageScanPath)) {
            if (!beanDefinition.isAbstract()) {
                register(beanDefinition.getBeanClassName());
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void register(String str) {
        try {
            Class<?> cls = Class.forName(str);
            if (!handleOldStyleAnnotations(cls)) {
                handleNewStyleAnnotations(cls);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void handleNewStyleAnnotations(Class<? extends Saga> cls) {
        Subscriber subscriber = (Subscriber) cls.getAnnotation(Subscriber.class);
        if (subscriber == null) {
            throw new InvalidSagaConfigurationException("Missing aggregate annotation, please annotate " + cls + " with @Aggregate to specify subscribed aggregate");
        }
        registerAggregate(subscriber.value());
        SagaEventIdProperty sagaEventIdProperty = (SagaEventIdProperty) cls.getAnnotation(SagaEventIdProperty.class);
        if (sagaEventIdProperty == null) {
            throw new InvalidSagaConfigurationException("Missing @SagaEventIdProperty annotation, please annotate " + cls + " with @SagaEventIdProperty to specify id-properties");
        }
        String value = sagaEventIdProperty.value();
        for (Class<? extends Event> cls2 : HandlerFinder.getEventHandlers(cls).keySet()) {
            try {
                Method method = cls2.getMethod(propertyfy(value), new Class[0]);
                if (!String.class.equals(method.getReturnType())) {
                    throw new InvalidSagaConfigurationException("Event " + cls2.getName() + "s " + value + " eventPropertyMethod does not return String, which is required for saga " + cls);
                }
                registerEventToSaga(cls2, cls, method);
            } catch (NoSuchMethodException e) {
                throw new InvalidSagaConfigurationException("Event " + cls2.getName() + " does not implement the java property " + value + " which is required for saga " + cls, e);
            }
        }
    }

    private String propertyfy(String str) {
        if (str == null || str.length() < 1) {
            return null;
        }
        return "get" + str.substring(0, 1).toUpperCase() + str.substring(1);
    }

    private void registerAggregate(String str) {
        registerAggregates(new String[]{str});
    }

    private boolean handleOldStyleAnnotations(Class<? extends Saga> cls) throws IntrospectionException {
        ListensTo listensTo = (ListensTo) cls.getAnnotation(ListensTo.class);
        if (null == listensTo) {
            return false;
        }
        registerAggregates(listensTo.aggregates());
        for (EventIdBind eventIdBind : listensTo.value()) {
            registerEventToSaga(eventIdBind.eventClass(), cls, new PropertyDescriptor(eventIdBind.idProperty(), eventIdBind.eventClass()).getReadMethod());
        }
        return true;
    }

    private void registerAggregates(String[] strArr) {
        Collections.addAll(aggregates, strArr);
    }

    private Set<SagaEventMapping> getSagaClassesForEvent(Class<? extends Event> cls) {
        HashSet hashSet = new HashSet();
        Class<? extends Event> cls2 = cls;
        while (true) {
            Class<? extends Event> cls3 = cls2;
            if (cls3 == Object.class) {
                return hashSet;
            }
            if (eventToSagaMap.containsKey(cls3)) {
                hashSet.addAll(eventToSagaMap.get(cls3));
            }
            cls2 = cls3.getSuperclass();
        }
    }

    private void registerEventToSaga(Class<? extends Event> cls, Class<? extends Saga> cls2, Method method) {
        if (eventToSagaMap.get(cls) == null) {
            eventToSagaMap.put(cls, new ArrayList<>());
        }
        eventToSagaMap.get(cls).add(new SagaEventMapping(cls2, method));
    }

    private void updateLeaderState(ClusterEvent.LeaderChanged leaderChanged) {
        try {
            boolean isLeader = this.akkaClusterInfo.isLeader();
            this.akkaClusterInfo.updateLeaderState(leaderChanged);
            if (isLeader && !this.akkaClusterInfo.isLeader()) {
                removeOldActorsWithWrongState();
            }
            if (this.akkaClusterInfo.isLeader()) {
                log.info("Opening repository for sagaManager");
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                }
                this.repository.open();
                for (String str : aggregates) {
                    this.latestJournalidReceived.put(str, this.repository.loadLatestJournalID(str));
                    log.info("SagaManager loaded aggregate {} latestJournalid {}", str, this.latestJournalidReceived.get(str));
                }
                for (String str2 : aggregates) {
                    this.eventstore.tell(new Subscription(str2, this.latestJournalidReceived.get(str2)), self());
                }
            } else {
                log.info("Closing repository for sagaManager");
                this.repository.close();
            }
        } catch (ConfigurationException e2) {
            log.debug("Not cluster system");
        }
    }

    private void removeOldActorsWithWrongState() {
        for (SagaCompositeId sagaCompositeId : this.sagas.keySet()) {
            log.debug("Removing actor {}", this.sagas.get(sagaCompositeId).path());
            this.sagas.get(sagaCompositeId).tell(PoisonPill.getInstance(), (ActorRef) null);
        }
        this.sagas = new HashMap();
    }
}
