package org.springframework.statemachine.support;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.StateMachineException;
import org.springframework.statemachine.StateMachineSystemConstants;
import org.springframework.statemachine.state.JoinPseudoState;
import org.springframework.statemachine.state.PseudoStateKind;
import org.springframework.statemachine.state.State;
import org.springframework.statemachine.support.StateMachineExecutor;
import org.springframework.statemachine.transition.AbstractTransition;
import org.springframework.statemachine.transition.Transition;
import org.springframework.statemachine.transition.TransitionConflictPolicy;
import org.springframework.statemachine.trigger.DefaultTriggerContext;
import org.springframework.statemachine.trigger.TimerTrigger;
import org.springframework.statemachine.trigger.Trigger;
import org.springframework.statemachine.trigger.TriggerListener;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.retry.Retry;

/* loaded from: input_file:BOOT-INF/lib/spring-statemachine-core-3.1.0.jar:org/springframework/statemachine/support/ReactiveStateMachineExecutor.class */
public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport implements StateMachineExecutor<S, E> {
    private static final Log log = LogFactory.getLog((Class<?>) ReactiveStateMachineExecutor.class);
    private static final String REACTOR_CONTEXT_TRIGGER_ERRORS = "stateMachineTriggerErrors";
    private final StateMachine<S, E> stateMachine;
    private final StateMachine<S, E> relayStateMachine;
    private final Map<Trigger<S, E>, Transition<S, E>> triggerToTransitionMap;
    private final List<Transition<S, E>> triggerlessTransitions;
    private final Collection<Transition<S, E>> transitions;
    private final Transition<S, E> initialTransition;
    private final Message<E> initialEvent;
    private final TransitionComparator<S, E> transitionComparator;
    private final TransitionConflictPolicy transitionConflictPolicy;
    private volatile Message<E> forwardedInitialEvent;
    private StateMachineExecutor.StateMachineExecutorTransit<S, E> stateMachineExecutorTransit;
    private Sinks.Many<ReactiveStateMachineExecutor<S, E>.TriggerQueueItem> triggerSink;
    private Flux<Void> triggerFlux;
    private Disposable triggerDisposable;
    private final Queue<Message<E>> deferList = new ConcurrentLinkedQueue();
    private final AtomicBoolean initialHandled = new AtomicBoolean(false);
    private final StateMachineInterceptorList<S, E> interceptors = new StateMachineInterceptorList<>();
    private volatile Message<E> queuedMessage = null;
    private final Set<Transition<S, E>> joinSyncTransitions = new HashSet();
    private final Set<State<S, E>> joinSyncStates = new HashSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-statemachine-core-3.1.0.jar:org/springframework/statemachine/support/ReactiveStateMachineExecutor$TriggerQueueItem.class */
    public class TriggerQueueItem {
        Trigger<S, E> trigger;
        Message<E> message;
        StateMachineExecutor.StateMachineExecutorCallback callback;
        StateMachineExecutor.StateMachineExecutorCallback triggerCallback;

        public TriggerQueueItem(Trigger<S, E> trigger, Message<E> message, StateMachineExecutor.StateMachineExecutorCallback stateMachineExecutorCallback, StateMachineExecutor.StateMachineExecutorCallback stateMachineExecutorCallback2) {
            this.trigger = trigger;
            this.message = message;
            this.callback = stateMachineExecutorCallback;
            this.triggerCallback = stateMachineExecutorCallback2;
        }

        public String toString() {
            return "TriggerQueueItem [message=" + this.message + ", trigger=" + this.trigger + "]";
        }
    }

    public ReactiveStateMachineExecutor(StateMachine<S, E> stateMachine, StateMachine<S, E> stateMachine2, Collection<Transition<S, E>> collection, Map<Trigger<S, E>, Transition<S, E>> map, List<Transition<S, E>> list, Transition<S, E> transition, Message<E> message, TransitionConflictPolicy transitionConflictPolicy) {
        this.stateMachine = stateMachine;
        this.relayStateMachine = stateMachine2;
        this.triggerToTransitionMap = map;
        this.triggerlessTransitions = list;
        this.transitions = collection;
        this.initialTransition = transition;
        this.initialEvent = message;
        this.transitionComparator = new TransitionComparator<>(transitionConflictPolicy);
        this.transitionConflictPolicy = transitionConflictPolicy;
        this.triggerlessTransitions.sort(this.transitionComparator);
        registerTriggerListener();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.statemachine.support.LifecycleObjectSupport
    public void onInit() throws Exception {
        this.triggerSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.triggerFlux = this.triggerSink.asFlux().flatMap(triggerQueueItem -> {
            return handleTrigger(triggerQueueItem);
        }, 1);
    }

    @Override // org.springframework.statemachine.support.LifecycleObjectSupport
    protected Mono<Void> doPreStartReactively() {
        return Mono.defer(() -> {
            Mono<Void> startTriggers = startTriggers();
            if (this.triggerDisposable == null) {
                this.triggerDisposable = this.triggerFlux.subscribe();
            }
            if (!this.initialHandled.getAndSet(true)) {
                new ArrayList().add(this.initialTransition);
                startTriggers = this.initialEvent != null ? startTriggers.then(handleInitialTrans(this.initialTransition, this.initialEvent)) : startTriggers.then(handleInitialTrans(this.initialTransition, this.forwardedInitialEvent));
            }
            return startTriggers.then(handleTriggerlessTransitions(null, null));
        });
    }

    @Override // org.springframework.statemachine.support.LifecycleObjectSupport
    protected Mono<Void> doPreStopReactively() {
        return stopTriggers().and(Mono.fromRunnable(() -> {
            if (this.triggerDisposable != null) {
                this.triggerDisposable.dispose();
                this.triggerDisposable = null;
            }
            this.initialHandled.set(false);
        }));
    }

    @Override // org.springframework.statemachine.support.StateMachineExecutor
    public void queueDeferredEvent(Message<E> message) {
        if (log.isDebugEnabled()) {
            log.debug("Deferring message " + message);
        }
        this.deferList.add(message);
    }

    @Override // org.springframework.statemachine.support.StateMachineExecutor
    public Mono<Void> executeTriggerlessTransitions(StateContext<S, E> stateContext, State<S, E> state) {
        if (this.stateMachine.getState() == null) {
            return Mono.empty();
        }
        if (log.isDebugEnabled()) {
            log.debug("About to handleTriggerlessTransitions");
        }
        return handleTriggerlessTransitions(stateContext, state);
    }

    @Override // org.springframework.statemachine.support.StateMachineExecutor
    public void setInitialEnabled(boolean z) {
        this.initialHandled.set(!z);
    }

    @Override // org.springframework.statemachine.support.StateMachineExecutor
    public void setForwardedInitialEvent(Message<E> message) {
        this.forwardedInitialEvent = message;
    }

    @Override // org.springframework.statemachine.support.StateMachineExecutor
    public void setStateMachineExecutorTransit(StateMachineExecutor.StateMachineExecutorTransit<S, E> stateMachineExecutorTransit) {
        this.stateMachineExecutorTransit = stateMachineExecutorTransit;
    }

    @Override // org.springframework.statemachine.support.StateMachineExecutor
    public void addStateMachineInterceptor(StateMachineInterceptor<S, E> stateMachineInterceptor) {
        this.interceptors.add(stateMachineInterceptor);
    }

    @Override // org.springframework.statemachine.support.StateMachineExecutor
    public Mono<Void> queueEvent(Mono<Message<E>> mono, StateMachineExecutor.StateMachineExecutorCallback stateMachineExecutorCallback) {
        Flux merge = Flux.merge(mono, Flux.fromIterable(this.deferList));
        StateMachineExecutor.MonoSinkStateMachineExecutorCallback monoSinkStateMachineExecutorCallback = new StateMachineExecutor.MonoSinkStateMachineExecutorCallback();
        return merge.flatMap(message -> {
            return handleEvent(message, stateMachineExecutorCallback, monoSinkStateMachineExecutorCallback);
        }).flatMap(triggerQueueItem -> {
            return Mono.fromRunnable(() -> {
                this.triggerSink.emitNext(triggerQueueItem, Sinks.EmitFailureHandler.FAIL_FAST);
            }).retryWhen(Retry.fixedDelay(10L, Duration.ofMillis(10L)));
        }).then().and(Mono.create(monoSinkStateMachineExecutorCallback));
    }

    private Mono<ReactiveStateMachineExecutor<S, E>.TriggerQueueItem> handleEvent(Message<E> message, StateMachineExecutor.StateMachineExecutorCallback stateMachineExecutorCallback, StateMachineExecutor.StateMachineExecutorCallback stateMachineExecutorCallback2) {
        if (log.isDebugEnabled()) {
            log.debug("Handling message " + message);
        }
        return Mono.defer(() -> {
            State<S, E> state = this.stateMachine.getState();
            if (state == null || !state.shouldDefer(message)) {
                DefaultTriggerContext defaultTriggerContext = new DefaultTriggerContext(message.getPayload());
                return Flux.fromIterable(this.transitions).filter(transition -> {
                    return transition.getTrigger() != null;
                }).filter(transition2 -> {
                    return StateMachineUtils.containsAtleastOne(transition2.getSource().getIds(), state.getIds());
                }).flatMap(transition3 -> {
                    return Mono.from(transition3.getTrigger().evaluate(defaultTriggerContext)).flatMap(bool -> {
                        return bool.booleanValue() ? Mono.just(transition3.getTrigger()) : Mono.empty();
                    });
                }).next().doOnNext(trigger -> {
                    this.deferList.remove(message);
                }).map(trigger2 -> {
                    return new TriggerQueueItem(trigger2, message, stateMachineExecutorCallback, stateMachineExecutorCallback2);
                });
            }
            log.info("Current state " + state + " deferred event " + message);
            return Mono.just(new TriggerQueueItem(null, message, stateMachineExecutorCallback, stateMachineExecutorCallback2));
        });
    }

    private Mono<Void> handleTrigger(ReactiveStateMachineExecutor<S, E>.TriggerQueueItem triggerQueueItem) {
        return Mono.defer(() -> {
            Transition<S, E> transition;
            Mono<Void> mono = null;
            State<S, E> state = this.stateMachine.getState();
            if (triggerQueueItem != null && state != null) {
                if (log.isDebugEnabled()) {
                    log.debug("Process trigger item " + triggerQueueItem + " " + this);
                }
                this.queuedMessage = triggerQueueItem.message;
                E payload = this.queuedMessage != null ? this.queuedMessage.getPayload() : null;
                ArrayList arrayList = new ArrayList();
                if (payload != null) {
                    ArrayList arrayList2 = new ArrayList(state.getIds());
                    Collections.reverse(arrayList2);
                    Iterator<E> it = arrayList2.iterator();
                    while (it.hasNext()) {
                        E next = it.next();
                        for (Map.Entry<Trigger<S, E>, Transition<S, E>> entry : this.triggerToTransitionMap.entrySet()) {
                            E event = entry.getKey().getEvent();
                            Transition<S, E> value = entry.getValue();
                            if (payload.equals(event) && value.getSource().getId().equals(next) && !arrayList.contains(value)) {
                                arrayList.add(value);
                            }
                        }
                    }
                }
                if (arrayList.isEmpty() && (transition = this.triggerToTransitionMap.get(triggerQueueItem.trigger)) != null) {
                    arrayList.add(transition);
                }
                arrayList.sort(this.transitionComparator);
                mono = handleTriggerTrans(arrayList, this.queuedMessage).then();
            }
            ArrayList arrayList3 = new ArrayList();
            for (Transition<S, E> transition2 : this.triggerlessTransitions) {
                if (((AbstractTransition) transition2).getGuard() != null) {
                    arrayList3.add(transition2);
                }
            }
            if (mono == null) {
                mono = Mono.empty();
            }
            return mono;
        }).onErrorResume(resumeTriggerErrorToContext()).and(Mono.deferContextual((v0) -> {
            return Mono.just(v0);
        }).doOnNext(contextView -> {
            if (triggerQueueItem.callback != null) {
                contextView.getOrEmpty(StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS).ifPresent(executorExceptionHolder -> {
                    if (executorExceptionHolder.getError() != null) {
                        triggerQueueItem.callback.error(new StateMachineException("Execution error", executorExceptionHolder.getError()));
                    } else {
                        triggerQueueItem.callback.complete();
                    }
                });
            }
            if (triggerQueueItem.triggerCallback != null) {
                contextView.getOrEmpty(REACTOR_CONTEXT_TRIGGER_ERRORS).ifPresent(executorExceptionHolder2 -> {
                    if (executorExceptionHolder2.getError() != null) {
                        triggerQueueItem.triggerCallback.error(new StateMachineException("Execution error", executorExceptionHolder2.getError()));
                    } else {
                        triggerQueueItem.triggerCallback.complete();
                    }
                });
            }
        })).contextWrite(Context.of(StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS, new StateMachineExecutor.ExecutorExceptionHolder(), REACTOR_CONTEXT_TRIGGER_ERRORS, new StateMachineExecutor.ExecutorExceptionHolder()));
    }

    private Mono<Void> handleInitialTrans(Transition<S, E> transition, Message<E> message) {
        return Mono.defer(() -> {
            StateContext<S, E> buildStateContext = buildStateContext(message, transition, this.relayStateMachine);
            return transition.transit(buildStateContext).then(this.stateMachineExecutorTransit.transit(transition, buildStateContext, message));
        });
    }

    private Mono<Void> handleTriggerlessTransitions(StateContext<S, E> stateContext, State<S, E> state) {
        return Flux.concat(Flux.generate(synchronousSink -> {
            synchronousSink.next(handleTriggerTrans(this.triggerlessTransitions, stateContext != null ? stateContext.getMessage() : null, state));
        })).takeUntil(bool -> {
            return !bool.booleanValue();
        }).then();
    }

    private Mono<Boolean> handleTriggerTrans(List<Transition<S, E>> list, Message<E> message) {
        return handleTriggerTrans(list, message, null);
    }

    private Mono<Boolean> handleTriggerTrans(List<Transition<S, E>> list, Message<E> message, State<S, E> state) {
        return Flux.fromIterable(list).filter(transition -> {
            State<S, E> state2;
            State<S, E> source = transition.getSource();
            if (source == null || (state2 = this.stateMachine.getState()) == null || !StateMachineUtils.containsAtleastOne(source.getIds(), state2.getIds())) {
                return false;
            }
            if (this.transitionConflictPolicy == TransitionConflictPolicy.PARENT || state == null || source.getId().equals(state.getId())) {
                return true;
            }
            return !source.isOrthogonal() && StateMachineUtils.isSubstate(source, state);
        }).flatMap(transition2 -> {
            if (!StateMachineUtils.isPseudoState(transition2.getTarget(), PseudoStateKind.JOIN)) {
                StateContext<S, E> buildStateContext = buildStateContext(message, transition2, this.relayStateMachine);
                return Mono.just(buildStateContext).map(stateContext -> {
                    return this.interceptors.preTransition(buildStateContext);
                }).then(transition2.transit(buildStateContext).flatMap(bool -> {
                    return bool.booleanValue() ? this.stateMachineExecutorTransit.transit(transition2, buildStateContext, message).thenReturn(true).doOnNext(bool -> {
                        this.interceptors.postTransition(buildStateContext);
                    }) : Mono.just(false);
                }));
            }
            if (this.joinSyncStates.isEmpty()) {
                Iterator<List<State<S, E>>> it = ((JoinPseudoState) transition2.getTarget().getPseudoState()).getJoins().iterator();
                while (it.hasNext()) {
                    this.joinSyncStates.addAll(it.next());
                }
            }
            this.joinSyncTransitions.add(transition2);
            return this.joinSyncStates.remove(transition2.getSource()) & this.joinSyncStates.isEmpty() ? Flux.fromIterable(this.joinSyncTransitions).flatMap(transition2 -> {
                StateContext<S, E> buildStateContext2 = buildStateContext(message, transition2, this.relayStateMachine);
                return transition2.transit(buildStateContext2).then(this.stateMachineExecutorTransit.transit(transition2, buildStateContext2, message));
            }).doFinally(signalType -> {
                this.joinSyncTransitions.clear();
            }).then(Mono.just(true)) : Mono.just(false);
        }).takeUntil(bool -> {
            return bool.booleanValue();
        }).last(false);
    }

    private StateContext<S, E> buildStateContext(Message<E> message, Transition<S, E> transition, StateMachine<S, E> stateMachine) {
        HashMap hashMap = new HashMap(message != null ? message.getHeaders() : new MessageHeaders(new HashMap()));
        if (!hashMap.containsKey(StateMachineSystemConstants.STATEMACHINE_IDENTIFIER)) {
            hashMap.put(StateMachineSystemConstants.STATEMACHINE_IDENTIFIER, stateMachine.getUuid());
        }
        return new DefaultStateContext(StateContext.Stage.TRANSITION, message, new MessageHeaders(hashMap), stateMachine.getExtendedState(), transition, stateMachine, null, null, null);
    }

    private void registerTriggerListener() {
        for (final Trigger<S, E> trigger : this.triggerToTransitionMap.keySet()) {
            if (trigger instanceof TimerTrigger) {
                ((TimerTrigger) trigger).addTriggerListener(new TriggerListener() { // from class: org.springframework.statemachine.support.ReactiveStateMachineExecutor.1
                    @Override // org.springframework.statemachine.trigger.TriggerListener
                    public void triggered() {
                        if (ReactiveStateMachineExecutor.log.isDebugEnabled()) {
                            ReactiveStateMachineExecutor.log.debug("TimedTrigger triggered " + trigger);
                        }
                        Mono.just(new TriggerQueueItem(trigger, null, null, null)).flatMap(triggerQueueItem -> {
                            return Mono.fromCallable(() -> {
                                ReactiveStateMachineExecutor.this.triggerSink.emitNext(triggerQueueItem, Sinks.EmitFailureHandler.FAIL_FAST);
                                return null;
                            }).retryWhen(Retry.fixedDelay(10L, Duration.ofNanos(10L)));
                        }).subscribe();
                    }
                });
            }
        }
    }

    private Mono<Void> startTriggers() {
        Stream<Trigger<S, E>> stream = this.triggerToTransitionMap.keySet().stream();
        Class<StateMachineReactiveLifecycle> cls = StateMachineReactiveLifecycle.class;
        StateMachineReactiveLifecycle.class.getClass();
        Stream<Trigger<S, E>> filter = stream.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Class<StateMachineReactiveLifecycle> cls2 = StateMachineReactiveLifecycle.class;
        StateMachineReactiveLifecycle.class.getClass();
        return Flux.fromIterable((List) filter.map((v1) -> {
            return r1.cast(v1);
        }).collect(Collectors.toList())).flatMap((v0) -> {
            return v0.startReactively();
        }).then();
    }

    private Mono<Void> stopTriggers() {
        Stream<Trigger<S, E>> stream = this.triggerToTransitionMap.keySet().stream();
        Class<StateMachineReactiveLifecycle> cls = StateMachineReactiveLifecycle.class;
        StateMachineReactiveLifecycle.class.getClass();
        Stream<Trigger<S, E>> filter = stream.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Class<StateMachineReactiveLifecycle> cls2 = StateMachineReactiveLifecycle.class;
        StateMachineReactiveLifecycle.class.getClass();
        return Flux.fromIterable((List) filter.map((v1) -> {
            return r1.cast(v1);
        }).collect(Collectors.toList())).flatMap((v0) -> {
            return v0.stopReactively();
        }).then();
    }

    private static Function<? super Throwable, Mono<Void>> resumeTriggerErrorToContext() {
        return th -> {
            return Mono.deferContextual((v0) -> {
                return Mono.just(v0);
            }).doOnNext(contextView -> {
                contextView.getOrEmpty(REACTOR_CONTEXT_TRIGGER_ERRORS).ifPresent(executorExceptionHolder -> {
                    executorExceptionHolder.setError(th);
                });
            }).then();
        };
    }
}
