/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.test;

import com.fasterxml.jackson.databind.JsonNode;
import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.ThrowingConsumer;
import io.fluxcapacitor.common.ThrowingFunction;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.SerializedObject;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.application.PropertySource;
import io.fluxcapacitor.common.application.SimplePropertySource;
import io.fluxcapacitor.common.handling.Handler;
import io.fluxcapacitor.common.handling.HandlerFilter;
import io.fluxcapacitor.common.handling.HandlerInvoker;
import io.fluxcapacitor.common.handling.ParameterResolver;
import io.fluxcapacitor.common.reflection.ReflectionUtils;
import io.fluxcapacitor.common.serialization.JsonUtils;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.ClientUtils;
import io.fluxcapacitor.javaclient.common.IdentityProvider;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.configuration.DefaultFluxCapacitor;
import io.fluxcapacitor.javaclient.configuration.FluxCapacitorBuilder;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.configuration.client.InMemoryClient;
import io.fluxcapacitor.javaclient.persisting.search.Search;
import io.fluxcapacitor.javaclient.publishing.DispatchInterceptor;
import io.fluxcapacitor.javaclient.scheduling.DefaultScheduler;
import io.fluxcapacitor.javaclient.scheduling.Schedule;
import io.fluxcapacitor.javaclient.scheduling.ScheduledCommand;
import io.fluxcapacitor.javaclient.scheduling.Scheduler;
import io.fluxcapacitor.javaclient.scheduling.client.LocalSchedulingClient;
import io.fluxcapacitor.javaclient.scheduling.client.SchedulingClient;
import io.fluxcapacitor.javaclient.test.BeanParameterResolver;
import io.fluxcapacitor.javaclient.test.FixtureResult;
import io.fluxcapacitor.javaclient.test.Given;
import io.fluxcapacitor.javaclient.test.PredictableIdentityProvider;
import io.fluxcapacitor.javaclient.test.ResultValidator;
import io.fluxcapacitor.javaclient.test.TestClient;
import io.fluxcapacitor.javaclient.test.TestFluxCapacitor;
import io.fluxcapacitor.javaclient.test.TestUserProvider;
import io.fluxcapacitor.javaclient.test.Then;
import io.fluxcapacitor.javaclient.test.When;
import io.fluxcapacitor.javaclient.tracking.BatchInterceptor;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.ErrorHandler;
import io.fluxcapacitor.javaclient.tracking.Tracker;
import io.fluxcapacitor.javaclient.tracking.handling.HandleSchedule;
import io.fluxcapacitor.javaclient.tracking.handling.HandlerInterceptor;
import io.fluxcapacitor.javaclient.tracking.handling.HasLocalHandlers;
import io.fluxcapacitor.javaclient.tracking.handling.authentication.UnauthorizedException;
import io.fluxcapacitor.javaclient.tracking.handling.authentication.User;
import io.fluxcapacitor.javaclient.tracking.handling.authentication.UserProvider;
import io.fluxcapacitor.javaclient.web.WebRequest;
import io.fluxcapacitor.javaclient.web.WebResponse;
import java.beans.ConstructorProperties;
import java.lang.reflect.Executable;
import java.net.HttpCookie;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestFixture
implements Given,
When {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TestFixture.class);
    public static Duration defaultResultTimeout = Duration.ofSeconds(10L);
    public static Duration defaultConsumerTimeout = Duration.ofSeconds(30L);
    private final FluxCapacitor fluxCapacitor;
    private final FluxCapacitorBuilder fluxCapacitorBuilder;
    private final GivenWhenThenInterceptor interceptor;
    private Duration resultTimeout = defaultResultTimeout;
    private Duration consumerTimeout = defaultConsumerTimeout;
    private final boolean synchronous;
    private final boolean spying;
    private Registration registration = Registration.noOp();
    private final Map<ActiveConsumer, List<Message>> consumers = new ConcurrentHashMap<ActiveConsumer, List<Message>>();
    private FixtureResult fixtureResult = new FixtureResult();
    private final BeanParameterResolver beanParameterResolver = new BeanParameterResolver();
    private final Map<String, String> testProperties = new HashMap<String, String>();
    private final List<HttpCookie> cookies = new ArrayList<HttpCookie>();
    private final List<ThrowingConsumer<TestFixture>> modifiers = new CopyOnWriteArrayList<ThrowingConsumer<TestFixture>>();
    private static final ThreadLocal<List<TestFixture>> activeFixtures = ThreadLocal.withInitial(ArrayList::new);
    private static final Executor shutdownExecutor = Executors.newFixedThreadPool(16);

    public static TestFixture create(Object ... handlers) {
        return TestFixture.create((FluxCapacitorBuilder)DefaultFluxCapacitor.builder(), handlers);
    }

    public static TestFixture create(FluxCapacitorBuilder fluxCapacitorBuilder, Object ... handlers) {
        return TestFixture.create(fluxCapacitorBuilder, (FluxCapacitor fc) -> Arrays.asList(handlers));
    }

    public static TestFixture create(Function<FluxCapacitor, List<?>> handlersFactory) {
        return TestFixture.create((FluxCapacitorBuilder)DefaultFluxCapacitor.builder(), handlersFactory);
    }

    public static TestFixture create(FluxCapacitorBuilder fluxCapacitorBuilder, Function<FluxCapacitor, List<?>> handlersFactory) {
        return new TestFixture(fluxCapacitorBuilder, handlersFactory, (Client)InMemoryClient.newInstance(null), true);
    }

    public static TestFixture createAsync(Object ... handlers) {
        return TestFixture.createAsync((FluxCapacitorBuilder)DefaultFluxCapacitor.builder(), handlers);
    }

    public static TestFixture createAsync(FluxCapacitorBuilder fluxCapacitorBuilder, Object ... handlers) {
        return TestFixture.createAsync(fluxCapacitorBuilder, (FluxCapacitor fc) -> Arrays.asList(handlers));
    }

    public static TestFixture createAsync(Function<FluxCapacitor, List<?>> handlersFactory) {
        return TestFixture.createAsync((FluxCapacitorBuilder)DefaultFluxCapacitor.builder(), handlersFactory);
    }

    public static TestFixture createAsync(FluxCapacitorBuilder fluxCapacitorBuilder, Function<FluxCapacitor, List<?>> handlersFactory) {
        return new TestFixture(fluxCapacitorBuilder, handlersFactory, (Client)InMemoryClient.newInstance(null), false);
    }

    public static TestFixture createAsync(FluxCapacitorBuilder fluxCapacitorBuilder, Client client, Object ... handlers) {
        return new TestFixture(fluxCapacitorBuilder, fc -> Arrays.asList(handlers), client, false);
    }

    public static void shutDownActiveFixtures() {
        List<TestFixture> fixtures = activeFixtures.get();
        if (!fixtures.isEmpty()) {
            activeFixtures.remove();
            fixtures.forEach(fixture -> shutdownExecutor.execute(() -> {
                Optional.ofNullable(fixture.registration).ifPresent(Registration::cancel);
                fixture.fluxCapacitor.client().shutDown();
            }));
        }
    }

    protected TestFixture(FluxCapacitorBuilder fluxCapacitorBuilder, Function<FluxCapacitor, List<?>> handlerFactory, Client client, boolean synchronous) {
        activeFixtures.get().add(this);
        this.synchronous = synchronous;
        this.spying = false;
        Optional<TestUserProvider> userProvider = Optional.ofNullable(UserProvider.defaultUserSupplier).map(TestUserProvider::new);
        if (userProvider.isPresent()) {
            fluxCapacitorBuilder = fluxCapacitorBuilder.registerUserProvider((UserProvider)userProvider.get());
        }
        if (synchronous) {
            fluxCapacitorBuilder.disableScheduledCommandHandler();
        }
        fluxCapacitorBuilder.addPropertySource((PropertySource)new SimplePropertySource(this.testProperties));
        this.interceptor = new GivenWhenThenInterceptor(this);
        Arrays.stream(MessageType.values()).forEach(type -> client.getGatewayClient(type).registerMonitor(messages -> this.interceptor.interceptClientDispatch((List<SerializedMessage>)messages, (MessageType)type)));
        this.fluxCapacitorBuilder = fluxCapacitorBuilder = fluxCapacitorBuilder.disableShutdownHook().addParameterResolver((ParameterResolver)this.beanParameterResolver).addDispatchInterceptor((DispatchInterceptor)this.interceptor, new MessageType[0]).replaceIdentityProvider(p -> p == IdentityProvider.defaultIdentityProvider ? PredictableIdentityProvider.defaultPredictableIdentityProvider() : p).addBatchInterceptor((BatchInterceptor)this.interceptor, new MessageType[0]).addHandlerInterceptor((HandlerInterceptor)this.interceptor, true, new MessageType[0]);
        this.fluxCapacitor = fluxCapacitorBuilder.build(client);
        if (synchronous) {
            this.localHandlerRegistries(this.fluxCapacitor).forEach(r -> r.setSelfHandlerFilter(HandlerFilter.ALWAYS_HANDLE));
        }
        this.withClock(Clock.fixed(Instant.now(), ZoneId.systemDefault()));
        ArrayList<1> handlers = new ArrayList<1>();
        if (synchronous) {
            handlers.add(new Object(){

                @HandleSchedule
                void handle(ScheduledCommand schedule) {
                    SerializedMessage command = schedule.getCommand();
                    command.setTimestamp(Long.valueOf(FluxCapacitor.currentTime().toEpochMilli()));
                    TestFixture.this.fluxCapacitor.serializer().deserializeMessages(Stream.of(command), MessageType.COMMAND).findFirst().map(DeserializingMessage::toMessage).ifPresent(FluxCapacitor::sendAndForgetCommand);
                }
            });
        }
        handlers.addAll((Collection)handlerFactory.apply(this.fluxCapacitor));
        this.registerHandlers(handlers);
    }

    protected TestFixture(TestFixture currentFixture, boolean synchronous, boolean spying) {
        TestFixture.shutDownActiveFixtures();
        activeFixtures.get().add(this);
        this.synchronous = synchronous;
        this.spying = spying;
        this.fluxCapacitorBuilder = currentFixture.fluxCapacitorBuilder;
        this.interceptor = currentFixture.interceptor;
        currentFixture.interceptor.testFixture = this;
        Client currentClient = currentFixture.fluxCapacitor.client().unwrap();
        Client newClient = currentClient instanceof InMemoryClient ? InMemoryClient.newInstance(null) : currentClient;
        Arrays.stream(MessageType.values()).forEach(type -> newClient.getGatewayClient(type).registerMonitor(messages -> this.interceptor.interceptClientDispatch((List<SerializedMessage>)messages, (MessageType)type)));
        this.fluxCapacitor = spying ? new TestFluxCapacitor(this.fluxCapacitorBuilder.build((Client)new TestClient(newClient))) : this.fluxCapacitorBuilder.build(newClient);
        this.localHandlerRegistries(this.fluxCapacitor).forEach(r -> r.setSelfHandlerFilter(synchronous ? HandlerFilter.ALWAYS_HANDLE : (t, m) -> !ClientUtils.isSelfTracking((Class)t, (Executable)m)));
        currentFixture.modifiers.forEach(this::modifyFixture);
    }

    public TestFixture resultTimeout(Duration resultTimeout) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> {
            fixture.resultTimeout = resultTimeout;
        }));
    }

    public TestFixture consumerTimeout(Duration consumerTimeout) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> {
            fixture.consumerTimeout = consumerTimeout;
        }));
    }

    public TestFixture async() {
        return this.synchronous ? new TestFixture(this, false, this.spying) : this;
    }

    public TestFixture sync() {
        return !this.synchronous ? new TestFixture(this, true, this.spying) : this;
    }

    public TestFixture spy() {
        return this.spying ? this : new TestFixture(this, this.synchronous, true);
    }

    public TestFixture registerHandlers(List<?> handlers) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> {
            FluxCapacitor fc = fixture.getFluxCapacitor();
            if (handlers.isEmpty()) {
                return;
            }
            handlers.stream().collect(Collectors.toMap(o -> {
                Class clazz;
                if (o instanceof Class) {
                    Class c;
                    clazz = c = (Class)o;
                } else if (o instanceof Handler) {
                    Handler h = (Handler)o;
                    clazz = h.getTargetClass();
                } else {
                    clazz = o.getClass();
                }
                return clazz;
            }, Function.identity(), (a, b) -> {
                log.warn("Handler of type {} is registered more than once. Please make sure this is intentional.", a.getClass());
                return a;
            }));
            if (!fixture.synchronous) {
                fixture.registration = fixture.registration.merge(fc.registerHandlers(handlers));
                return;
            }
            HandlerFilter handlerFilter = (c, e) -> true;
            Registration registration = (Registration)fc.apply(f -> handlers.stream().flatMap(h -> this.localHandlerRegistries((FluxCapacitor)f).map(r -> r.registerHandler(h, handlerFilter))).reduce(Registration::merge).orElse(Registration.noOp()));
            fixture.registration = fixture.registration.merge(registration);
        }));
    }

    protected Stream<HasLocalHandlers> localHandlerRegistries(FluxCapacitor fluxCapacitor) {
        Stream<HasLocalHandlers> stream;
        Stream<HasLocalHandlers> gateways = Stream.of(fluxCapacitor.commandGateway(), fluxCapacitor.queryGateway(), fluxCapacitor.eventGateway(), fluxCapacitor.eventStore(), fluxCapacitor.errorGateway(), fluxCapacitor.webRequestGateway(), fluxCapacitor.metricsGateway());
        Scheduler scheduler = fluxCapacitor.scheduler();
        if (scheduler instanceof DefaultScheduler) {
            DefaultScheduler scheduler2 = (DefaultScheduler)scheduler;
            stream = Stream.concat(gateways, Stream.of(scheduler2));
        } else {
            stream = gateways;
        }
        return stream;
    }

    public TestFixture registerHandlers(Object ... handlers) {
        return this.registerHandlers(Arrays.asList(handlers));
    }

    @Override
    public TestFixture withClock(Clock clock) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.setClock(clock)));
    }

    @Override
    public TestFixture atFixedTime(Instant time) {
        return this.withClock(Clock.fixed(time, ZoneId.systemDefault()));
    }

    @Override
    public TestFixture withProperty(String name, Object value) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.testProperties.compute(name, (k, v) -> value == null ? null : value.toString())));
    }

    @Override
    public TestFixture withBean(Object bean) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.beanParameterResolver.registerBean(bean)));
    }

    protected TestFixture modifyFixture(ThrowingConsumer<TestFixture> modifier) {
        this.modifiers.add(modifier);
        return (TestFixture)this.fluxCapacitor.apply(fc -> {
            modifier.accept((Object)this);
            return this;
        });
    }

    @Override
    public TestFixture givenCommands(Object ... commands) {
        Class callerClass = ReflectionUtils.getCallerClass();
        for (Object command : commands) {
            this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.asMessages(callerClass, command).forEach(c -> fixture.getDispatchResult(fixture.getFluxCapacitor().commandGateway().send(c)))));
        }
        return this;
    }

    @Override
    public TestFixture givenCommandsByUser(Object userRep, Object ... commands) {
        Class callerClass = ReflectionUtils.getCallerClass();
        for (Object command : commands) {
            this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.asMessages(callerClass, command).map(c -> fixture.addUser(this.getUser(userRep), c)).forEach(c -> fixture.getDispatchResult(fixture.getFluxCapacitor().commandGateway().send(c)))));
        }
        return this;
    }

    @Override
    public TestFixture givenAppliedEvents(String aggregateId, Class<?> aggregateClass, Object ... events) {
        Class callerClass = ReflectionUtils.getCallerClass();
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.applyEvents(aggregateId, aggregateClass, fixture.getFluxCapacitor(), fixture.asMessages(callerClass, events).toList())));
    }

    @Override
    public TestFixture givenEvents(Object ... events) {
        Class callerClass = ReflectionUtils.getCallerClass();
        for (Object event : events) {
            this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.asMessages(callerClass, event).forEach(e -> fixture.getFluxCapacitor().eventGateway().publish(e))));
        }
        return this;
    }

    @Override
    public TestFixture givenDocument(Object document, Object id, Object collection, Instant timestamp, Instant end) {
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.getFluxCapacitor().documentStore().index(document, id, collection, timestamp, end).get()));
    }

    @Override
    public TestFixture givenDocument(Object document) {
        this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.getFluxCapacitor().documentStore().index(document).get()));
        return this;
    }

    @Override
    public TestFixture givenDocuments(Object collection, Object firstDocument, Object ... otherDocuments) {
        for (Object document : Stream.concat(Stream.of(firstDocument), Arrays.stream(otherDocuments)).toList()) {
            this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.getFluxCapacitor().documentStore().index(document, collection).get()));
        }
        return this;
    }

    @Override
    public TestFixture givenWebRequest(WebRequest webRequest) {
        Class callerClass = ReflectionUtils.getCallerClass();
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> {
            WebRequest request = (WebRequest)fixture.parseObject(webRequest, callerClass);
            fixture.executeWebRequest(request);
        }));
    }

    @Override
    public TestFixture givenPost(String path, Object payload) {
        return (TestFixture)Given.super.givenPost(path, payload);
    }

    @Override
    public TestFixture givenGet(String path) {
        return (TestFixture)Given.super.givenGet(path);
    }

    @Override
    public TestFixture givenTimeAdvancedTo(Instant instant) {
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.advanceTimeTo(instant)));
    }

    @Override
    public TestFixture givenElapsedTime(Duration duration) {
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.advanceTimeBy(duration)));
    }

    @Override
    public TestFixture given(ThrowingConsumer<FluxCapacitor> condition) {
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> condition.accept((Object)fixture.getFluxCapacitor())));
    }

    protected TestFixture givenModification(ThrowingConsumer<TestFixture> modifier) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> {
            try {
                fixture.handleExpiredSchedulesLocally(false);
                modifier.accept(fixture);
                fixture.handleExpiredSchedulesLocally(false);
                fixture.waitForConsumers();
            }
            catch (Throwable e) {
                throw new IllegalStateException("Failed to execute given", e);
            }
        }));
    }

    @Override
    public Then<Object> whenCommand(Object command) {
        Object message = this.trace(command);
        return this.whenApplying(fc -> this.getDispatchResult(fc.commandGateway().send(message)));
    }

    @Override
    public Then<Object> whenCommandByUser(Object user, Object command) {
        Object message = this.trace(command);
        return this.whenApplying(fc -> this.getDispatchResult(fc.commandGateway().send((Object)this.addUser(this.getUser(user), message))));
    }

    @Override
    public Then<Object> whenQuery(Object query) {
        Object message = this.trace(query);
        return this.whenApplying(fc -> this.getDispatchResult(fc.queryGateway().send(message)));
    }

    @Override
    public Then<Object> whenQueryByUser(Object user, Object query) {
        Object message = this.trace(query);
        return this.whenApplying(fc -> this.getDispatchResult(fc.queryGateway().send((Object)this.addUser(this.getUser(user), message))));
    }

    @Override
    public Then<?> whenEvent(Object event) {
        Object message = this.trace(event);
        return this.whenExecuting((ThrowingConsumer<FluxCapacitor>)((ThrowingConsumer)fc -> ClientUtils.runSilently(() -> fc.eventGateway().publish(message, Guarantee.STORED).get())));
    }

    @Override
    public Then<?> whenEventsAreApplied(String aggregateId, Class<?> aggregateClass, Object ... events) {
        Class callerClass = ReflectionUtils.getCallerClass();
        return this.whenExecuting((ThrowingConsumer<FluxCapacitor>)((ThrowingConsumer)fc -> this.applyEvents(aggregateId, aggregateClass, (FluxCapacitor)fc, this.asMessages(callerClass, events).collect(Collectors.toList()))));
    }

    @Override
    public <R> Then<List<R>> whenSearching(Object collection, UnaryOperator<Search> searchQuery) {
        return this.whenApplying(fc -> ((Search)searchQuery.apply(fc.documentStore().search(collection))).fetchAll());
    }

    @Override
    public Then<Object> whenWebRequest(WebRequest request) {
        WebRequest message = (WebRequest)this.trace(request);
        return this.whenApplying(fc -> {
            WebResponse response = this.executeWebRequest(message);
            if (response != null && this.synchronous && (response.getPayload() != null || !request.getMethod().isWebsocket())) {
                this.registerWebResponse(response);
            }
            return response;
        });
    }

    @Override
    public Then<?> whenScheduleExpires(Object schedule) {
        Object message = this.trace(schedule);
        return this.whenExecuting((ThrowingConsumer<FluxCapacitor>)((ThrowingConsumer)fc -> fc.scheduler().schedule(message, this.getCurrentTime())));
    }

    @Override
    public Then<?> whenTimeElapses(Duration duration) {
        return this.whenExecuting((ThrowingConsumer<FluxCapacitor>)((ThrowingConsumer)fc -> this.advanceTimeBy(duration)));
    }

    @Override
    public Then<?> whenTimeAdvancesTo(Instant instant) {
        return this.whenExecuting((ThrowingConsumer<FluxCapacitor>)((ThrowingConsumer)fc -> this.advanceTimeTo(instant)));
    }

    @Override
    public <R> Then<R> whenApplying(ThrowingFunction<FluxCapacitor, R> action) {
        return (Then)this.fluxCapacitor.apply(fc -> {
            Object result;
            this.handleExpiredSchedulesLocally(true);
            this.waitForConsumers();
            this.resetMocks();
            this.setCollectingResults(true);
            try {
                result = action.apply(fc);
                if (result instanceof CompletableFuture) {
                    CompletableFuture future = (CompletableFuture)result;
                    result = this.getDispatchResult(future);
                }
            }
            catch (Throwable e) {
                this.registerError(e);
                result = e;
            }
            this.setResult(result);
            this.waitForConsumers();
            this.handleExpiredSchedulesLocally(true);
            return new ResultValidator(this);
        });
    }

    protected WebResponse executeWebRequest(WebRequest request) {
        if (request.getMethod().isWebsocket() && !request.getMetadata().containsKey((Object)"sessionId")) {
            request = request.addMetadata("sessionId", (Object)"testSession");
        }
        if (!this.cookies.isEmpty()) {
            WebRequest.Builder builder = request.toBuilder();
            for (HttpCookie cookie2 : this.cookies) {
                if (!request.getCookie(cookie2.getName()).isEmpty()) continue;
                builder.cookie(cookie2);
            }
            request = builder.build();
        }
        WebResponse response = (WebResponse)this.getDispatchResult(this.getFluxCapacitor().webRequestGateway().send(request));
        response.getCookies().forEach(cookie -> {
            this.cookies.remove(cookie);
            if (!cookie.hasExpired()) {
                this.cookies.add((HttpCookie)cookie);
            }
        });
        return response;
    }

    protected User getUser(Object userOrId) {
        User user;
        User result;
        User user2 = result = userOrId instanceof User ? (user = (User)userOrId) : (User)this.fluxCapacitor.apply(fc -> fc.userProvider().getUserById(userOrId));
        if (result == null) {
            throw new UnauthorizedException("User %s could not be provided".formatted(userOrId));
        }
        return result;
    }

    protected void applyEvents(String aggregateId, Class<?> aggregateClass, FluxCapacitor fc, List<Message> events) {
        fc.aggregateRepository().load((Object)aggregateId, aggregateClass).apply(events.stream().map(e -> e.withMetadata(e.getMetadata().with(new Object[]{"$aggregateId", aggregateId, "$aggregateType", aggregateClass.getName()}))).toList());
    }

    protected void handleExpiredSchedulesLocally(boolean collectErrors) {
        if (this.synchronous) {
            try {
                SchedulingClient schedulingClient = this.getFluxCapacitor().client().getSchedulingClient();
                if (schedulingClient instanceof LocalSchedulingClient) {
                    LocalSchedulingClient local = (LocalSchedulingClient)schedulingClient;
                    List expiredSchedules = local.removeExpiredSchedules(this.getFluxCapacitor().serializer());
                    Scheduler scheduler = this.getFluxCapacitor().scheduler();
                    if (scheduler instanceof DefaultScheduler) {
                        DefaultScheduler scheduler2 = (DefaultScheduler)scheduler;
                        expiredSchedules.forEach(arg_0 -> ((DefaultScheduler)scheduler2).handleLocally(arg_0));
                    }
                }
            }
            catch (Throwable e) {
                if (collectErrors) {
                    this.registerError(e);
                }
                throw e;
            }
        }
    }

    protected List<Schedule> getFutureSchedules() {
        List list;
        SchedulingClient schedulingClient = this.getFluxCapacitor().client().getSchedulingClient();
        if (schedulingClient instanceof LocalSchedulingClient) {
            LocalSchedulingClient local = (LocalSchedulingClient)schedulingClient;
            list = local.getSchedules(this.getFluxCapacitor().serializer());
        } else {
            list = Collections.emptyList();
        }
        return list;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForConsumers() {
        if (this.synchronous) {
            return;
        }
        Map<ActiveConsumer, List<Message>> map = this.consumers;
        synchronized (map) {
            if (!this.checkConsumers()) {
                try {
                    this.consumers.wait(this.consumerTimeout.toMillis());
                }
                catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                }
            }
            if (!this.checkConsumers()) {
                log.warn("Some consumers in the test fixture did not finish processing all messages. This may cause your test to fail. Waiting consumers: {}", this.consumers.entrySet().stream().filter(e -> !((List)e.getValue()).isEmpty()).map(e -> ((ActiveConsumer)e.getKey()).getName() + " : " + ((List)e.getValue()).stream().map(m -> m.getPayload() == null ? "Void" : m.getPayload().getClass().getSimpleName()).collect(Collectors.joining(", "))).collect(Collectors.toList()));
            }
        }
    }

    protected TestFixture reset() {
        this.resetMocks();
        this.fixtureResult = new FixtureResult();
        return this;
    }

    protected void resetMocks() {
        if (this.spying) {
            ((TestClient)this.fluxCapacitor.client()).resetMocks();
            ((TestFluxCapacitor)this.fluxCapacitor).resetMocks();
        }
    }

    protected void advanceTimeBy(Duration duration) {
        this.advanceTimeTo(this.getCurrentTime().plus(duration));
    }

    protected void advanceTimeTo(Instant instant) {
        this.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
    }

    protected void setClock(Clock clock) {
        this.getFluxCapacitor().withClock(clock);
        SchedulingClient schedulingClient = this.getFluxCapacitor().client().getSchedulingClient();
        if (schedulingClient instanceof LocalSchedulingClient) {
            LocalSchedulingClient local = (LocalSchedulingClient)schedulingClient;
            local.setClock(clock);
        } else {
            log.warn("Could not update clock of scheduling client. Timing tests may not work.");
        }
    }

    protected void registerCommand(Message command) {
        this.getCommands().add(command);
    }

    protected void registerQuery(Message query) {
        this.getQueries().add(query);
    }

    protected void registerMetric(Message metric) {
        this.getMetrics().add(metric);
    }

    protected void registerEvent(Message event) {
        this.getEvents().add(event);
    }

    protected void registerWebRequest(Message request) {
        this.getWebRequests().add(request);
    }

    protected void registerWebResponse(WebResponse response) {
        if (!response.getMetadata().contains((Object)"function", (Object)"ack")) {
            this.getWebResponses().add((Message)response);
        }
    }

    protected void registerSchedule(Schedule schedule) {
        this.getSchedules().add(schedule);
    }

    protected void registerError(Throwable e) {
        this.getErrors().addIfAbsent(e);
    }

    protected <R> R getDispatchResult(CompletableFuture<?> dispatchResult) {
        try {
            return (R)(this.synchronous ? dispatchResult.get(0L, TimeUnit.MILLISECONDS) : dispatchResult.get(this.resultTimeout.toMillis(), TimeUnit.MILLISECONDS));
        }
        catch (ExecutionException e) {
            throw e.getCause();
        }
        catch (TimeoutException e) {
            throw new TimeoutException("Test fixture did not receive a dispatch result in time. Perhaps some messages did not have handlers?");
        }
    }

    protected Stream<Message> asMessages(Class<?> callerClass, Object ... messages) {
        return Arrays.stream(messages).flatMap(c -> {
            if (c == null) {
                return Stream.empty();
            }
            if (c instanceof Collection) {
                return ((Collection)c).stream();
            }
            if (c.getClass().isArray()) {
                return Arrays.stream((Object[])c);
            }
            return Stream.of(c);
        }).flatMap(c -> {
            Object parsed = this.parseObject(c, callerClass);
            return parsed == null ? Stream.empty() : (parsed instanceof Collection ? ((Collection)parsed).stream() : (parsed.getClass().isArray() ? Arrays.stream((Object[])parsed) : Stream.of(parsed)));
        }).map(Message::asMessage);
    }

    protected <M extends Message> M trace(Object object) {
        Class callerClass = ReflectionUtils.getCallerClass();
        Message result = (Message)this.fluxCapacitor.apply(fc -> Message.asMessage(this.parseObject(object, callerClass)));
        this.setTracedMessage(result);
        return (M)result;
    }

    protected Message addUser(User user, Object value) {
        Class callerClass = ReflectionUtils.getCallerClass();
        return (Message)this.fluxCapacitor.apply(fc -> Message.asMessage(this.parseObject(value, callerClass)).addUser(user));
    }

    public <T> T parseObject(Object object, Class<?> callerClass) {
        String payload;
        WebRequest message;
        Object object2;
        if (object instanceof WebRequest && (object2 = (message = (WebRequest)object).getPayload()) instanceof String && (payload = (String)object2).endsWith(".json")) {
            return (T)message.toBuilder().payload(JsonUtils.fromFile(callerClass, (String)payload, JsonNode.class)).clearHeader("Content-Type").contentType("application/json").build();
        }
        if (object instanceof Message) {
            message = (Message)object;
            return (T)message.withPayload(this.parseObject(message.getPayload(), callerClass));
        }
        if (object instanceof String && ((String)object).endsWith(".json")) {
            object = JsonUtils.fromFile(callerClass, (String)((String)object));
        }
        if (object instanceof SerializedObject) {
            SerializedObject s = (SerializedObject)object;
            SerializedObject eventBytes = s.data().getValue() instanceof byte[] ? s : this.fluxCapacitor.serializer().serialize((Object)s);
            object = this.fluxCapacitor.serializer().deserialize(eventBytes);
        }
        return (T)object;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean checkConsumers() {
        if (this.synchronous) {
            return true;
        }
        Map<ActiveConsumer, List<Message>> map = this.consumers;
        synchronized (map) {
            if (this.consumers.values().stream().allMatch(l -> l.stream().allMatch(m -> m instanceof Schedule && ((Schedule)m).getDeadline().isAfter(this.getCurrentTime())))) {
                this.consumers.notifyAll();
                return true;
            }
        }
        return false;
    }

    @Generated
    FluxCapacitorBuilder getFluxCapacitorBuilder() {
        return this.fluxCapacitorBuilder;
    }

    @Generated
    GivenWhenThenInterceptor getInterceptor() {
        return this.interceptor;
    }

    @Generated
    Duration getResultTimeout() {
        return this.resultTimeout;
    }

    @Generated
    Duration getConsumerTimeout() {
        return this.consumerTimeout;
    }

    @Generated
    boolean isSynchronous() {
        return this.synchronous;
    }

    @Generated
    boolean isSpying() {
        return this.spying;
    }

    @Generated
    Registration getRegistration() {
        return this.registration;
    }

    @Generated
    Map<ActiveConsumer, List<Message>> getConsumers() {
        return this.consumers;
    }

    @Generated
    FixtureResult getFixtureResult() {
        return this.fixtureResult;
    }

    @Generated
    BeanParameterResolver getBeanParameterResolver() {
        return this.beanParameterResolver;
    }

    @Generated
    Map<String, String> getTestProperties() {
        return this.testProperties;
    }

    @Generated
    List<HttpCookie> getCookies() {
        return this.cookies;
    }

    @Generated
    List<ThrowingConsumer<TestFixture>> getModifiers() {
        return this.modifiers;
    }

    @Override
    @Generated
    public FluxCapacitor getFluxCapacitor() {
        return this.fluxCapacitor;
    }

    @Generated
    public boolean isCollectingResults() {
        return this.getFixtureResult().isCollectingResults();
    }

    @Generated
    public Message getTracedMessage() {
        return this.getFixtureResult().getTracedMessage();
    }

    @Generated
    public Object getResult() {
        return this.getFixtureResult().getResult();
    }

    @Generated
    public CopyOnWriteArrayList<Message> getCommands() {
        return this.getFixtureResult().getCommands();
    }

    @Generated
    public CopyOnWriteArrayList<Message> getQueries() {
        return this.getFixtureResult().getQueries();
    }

    @Generated
    public CopyOnWriteArrayList<Message> getEvents() {
        return this.getFixtureResult().getEvents();
    }

    @Generated
    public CopyOnWriteArrayList<Message> getWebRequests() {
        return this.getFixtureResult().getWebRequests();
    }

    @Generated
    public CopyOnWriteArrayList<Message> getWebResponses() {
        return this.getFixtureResult().getWebResponses();
    }

    @Generated
    public CopyOnWriteArrayList<Message> getMetrics() {
        return this.getFixtureResult().getMetrics();
    }

    @Generated
    public CopyOnWriteArrayList<Schedule> getSchedules() {
        return this.getFixtureResult().getSchedules();
    }

    @Generated
    public CopyOnWriteArrayList<Throwable> getErrors() {
        return this.getFixtureResult().getErrors();
    }

    @Generated
    public void setCollectingResults(boolean collectingResults) {
        this.getFixtureResult().setCollectingResults(collectingResults);
    }

    @Generated
    public void setTracedMessage(Message tracedMessage) {
        this.getFixtureResult().setTracedMessage(tracedMessage);
    }

    @Generated
    public void setResult(Object result) {
        this.getFixtureResult().setResult(result);
    }

    protected static class GivenWhenThenInterceptor
    implements DispatchInterceptor,
    BatchInterceptor,
    HandlerInterceptor {
        private TestFixture testFixture;
        private final List<Schedule> publishedSchedules = new CopyOnWriteArrayList<Schedule>();
        private final Set<String> interceptedMessageIds = new CopyOnWriteArraySet<String>();

        protected void interceptClientDispatch(List<SerializedMessage> messages, MessageType messageType) {
            if (this.testFixture.isCollectingResults()) {
                try {
                    this.testFixture.fluxCapacitor.serializer().deserializeMessages(messages.stream().filter(m -> !this.interceptedMessageIds.contains(m.getMessageId())), messageType).map(DeserializingMessage::toMessage).forEach(m -> this.monitorDispatch((Message)m, messageType));
                }
                catch (Exception ignored) {
                    log.warn("Failed to intercept a published message. This may cause your test to fail.");
                }
            }
        }

        public Message interceptDispatch(Message message, MessageType messageType) {
            return message;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void monitorDispatch(Message message, MessageType messageType) {
            if (this.testFixture.isCollectingResults()) {
                this.interceptedMessageIds.add(message.getMessageId());
            }
            if (messageType == MessageType.SCHEDULE) {
                this.addMessage(this.publishedSchedules, (Schedule)message);
            }
            Map<ActiveConsumer, List<Message>> map = this.testFixture.consumers;
            synchronized (map) {
                this.testFixture.consumers.entrySet().stream().filter(t -> {
                    ActiveConsumer configuration = (ActiveConsumer)t.getKey();
                    return (configuration.getMessageType() == messageType || configuration.getMessageType() == MessageType.NOTIFICATION && messageType == MessageType.EVENT) && Optional.ofNullable(configuration.getTypeFilter()).map(f -> message.getPayload().getClass().getName().matches((String)f)).orElse(true) != false;
                }).forEach(e -> this.addMessage((List)e.getValue(), message));
            }
            if (this.captureMessage(message).booleanValue()) {
                switch (messageType) {
                    case COMMAND: {
                        this.testFixture.registerCommand(message);
                        break;
                    }
                    case QUERY: {
                        this.testFixture.registerQuery(message);
                        break;
                    }
                    case EVENT: {
                        this.testFixture.registerEvent(message);
                        break;
                    }
                    case SCHEDULE: {
                        this.testFixture.registerSchedule((Schedule)message);
                        break;
                    }
                    case WEBREQUEST: {
                        this.testFixture.registerWebRequest(message);
                        break;
                    }
                    case WEBRESPONSE: {
                        this.testFixture.registerWebResponse((WebResponse)message);
                        break;
                    }
                    case METRICS: {
                        this.testFixture.registerMetric(message);
                    }
                }
            }
        }

        protected Boolean captureMessage(Message message) {
            return this.testFixture.isCollectingResults() && Optional.ofNullable(this.testFixture.getFixtureResult().getTracedMessage()).map(t -> !Objects.equals(t.getMessageId(), message.getMessageId())).orElse(true) != false;
        }

        protected <T extends Message> void addMessage(List<T> messages, T message) {
            if (message instanceof Schedule) {
                messages.removeIf(m -> m instanceof Schedule && ((Schedule)m).getScheduleId().equals(((Schedule)message).getScheduleId()));
            }
            messages.add(message);
        }

        public Consumer<MessageBatch> intercept(Consumer<MessageBatch> consumer, Tracker tracker) {
            List messages = this.testFixture.consumers.computeIfAbsent(new ActiveConsumer(tracker.getConfiguration(), tracker.getMessageType()), c -> (c.getMessageType() == MessageType.SCHEDULE ? this.publishedSchedules : Collections.emptyList()).stream().filter(m -> Optional.ofNullable(c.getTypeFilter()).map(f -> m.getPayload().getClass().getName().matches((String)f)).orElse(true)).collect(Collectors.toCollection(CopyOnWriteArrayList::new)));
            return b -> {
                consumer.accept((MessageBatch)b);
                Collection messageIds = b.getMessages().stream().map(SerializedMessage::getMessageId).collect(Collectors.toSet());
                Map<ActiveConsumer, List<Message>> map = this.testFixture.consumers;
                synchronized (map) {
                    messages.removeIf(m -> messageIds.contains(m.getMessageId()));
                    this.testFixture.checkConsumers();
                }
            };
        }

        public Function<DeserializingMessage, Object> interceptHandling(Function<DeserializingMessage, Object> function, HandlerInvoker invoker) {
            return m -> {
                try {
                    Object r = function.apply((DeserializingMessage)m);
                    return r;
                }
                catch (Exception e2) {
                    this.testFixture.registerError(e2);
                    throw e2;
                }
                finally {
                    if (m.getMessageType().isRequest() && Tracker.current().map(Tracker::getMessageBatch).map(batch -> batch.getMessages().stream().noneMatch(bm -> bm.getMessageId().equals(m.getMessageId()))).orElse(true).booleanValue() && ClientUtils.getLocalHandlerAnnotation((Class)invoker.getTargetClass(), (Executable)invoker.getMethod()).map(l -> !l.logMessage()).orElse(true).booleanValue()) {
                        Map<ActiveConsumer, List<Message>> map = this.testFixture.consumers;
                        synchronized (map) {
                            this.testFixture.consumers.entrySet().stream().filter(t -> ((ActiveConsumer)t.getKey()).getMessageType() == m.getMessageType()).forEach(e -> ((List)e.getValue()).removeIf(m2 -> m2.getMessageId().equals(m.getMessageId())));
                        }
                        this.testFixture.checkConsumers();
                    }
                }
            };
        }

        public void shutdown(Tracker tracker) {
            this.testFixture.consumers.remove(new ActiveConsumer(tracker.getConfiguration(), tracker.getMessageType()));
            this.testFixture.checkConsumers();
        }

        @ConstructorProperties(value={"testFixture"})
        @Generated
        public GivenWhenThenInterceptor(TestFixture testFixture) {
            this.testFixture = testFixture;
        }
    }

    protected static final class ActiveConsumer {
        private final ConsumerConfiguration configuration;
        private final MessageType messageType;

        @ConstructorProperties(value={"configuration", "messageType"})
        @Generated
        public ActiveConsumer(ConsumerConfiguration configuration, MessageType messageType) {
            this.configuration = configuration;
            this.messageType = messageType;
        }

        @Generated
        public ConsumerConfiguration getConfiguration() {
            return this.configuration;
        }

        @Generated
        public MessageType getMessageType() {
            return this.messageType;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ActiveConsumer)) {
                return false;
            }
            ActiveConsumer other = (ActiveConsumer)o;
            ConsumerConfiguration this$configuration = this.getConfiguration();
            ConsumerConfiguration other$configuration = other.getConfiguration();
            if (this$configuration == null ? other$configuration != null : !this$configuration.equals(other$configuration)) {
                return false;
            }
            MessageType this$messageType = this.getMessageType();
            MessageType other$messageType = other.getMessageType();
            return !(this$messageType == null ? other$messageType != null : !this$messageType.equals(other$messageType));
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            ConsumerConfiguration $configuration = this.getConfiguration();
            result = result * 59 + ($configuration == null ? 43 : $configuration.hashCode());
            MessageType $messageType = this.getMessageType();
            result = result * 59 + ($messageType == null ? 43 : $messageType.hashCode());
            return result;
        }

        @Generated
        public String toString() {
            return "TestFixture.ActiveConsumer(configuration=" + String.valueOf(this.getConfiguration()) + ", messageType=" + String.valueOf(this.getMessageType()) + ")";
        }

        @Generated
        public ConsumerConfiguration.Builder toBuilder() {
            return this.getConfiguration().toBuilder();
        }

        @Generated
        public String getName() {
            return this.getConfiguration().getName();
        }

        @Generated
        public Predicate<Object> getHandlerFilter() {
            return this.getConfiguration().getHandlerFilter();
        }

        @Generated
        public ErrorHandler getErrorHandler() {
            return this.getConfiguration().getErrorHandler();
        }

        @Generated
        public int getThreads() {
            return this.getConfiguration().getThreads();
        }

        @Generated
        public String getTypeFilter() {
            return this.getConfiguration().getTypeFilter();
        }

        @Generated
        public int getMaxFetchSize() {
            return this.getConfiguration().getMaxFetchSize();
        }

        @Generated
        public Duration getMaxWaitDuration() {
            return this.getConfiguration().getMaxWaitDuration();
        }

        @Generated
        public List<BatchInterceptor> getBatchInterceptors() {
            return this.getConfiguration().getBatchInterceptors();
        }

        @Generated
        public List<HandlerInterceptor> getHandlerInterceptors() {
            return this.getConfiguration().getHandlerInterceptors();
        }

        @Generated
        public boolean filterMessageTarget() {
            return this.getConfiguration().filterMessageTarget();
        }

        @Generated
        public boolean ignoreSegment() {
            return this.getConfiguration().ignoreSegment();
        }

        @Generated
        public boolean singleTracker() {
            return this.getConfiguration().singleTracker();
        }

        @Generated
        public boolean clientControlledIndex() {
            return this.getConfiguration().clientControlledIndex();
        }

        @Generated
        public Long getMinIndex() {
            return this.getConfiguration().getMinIndex();
        }

        @Generated
        public Long getMaxIndexExclusive() {
            return this.getConfiguration().getMaxIndexExclusive();
        }

        @Generated
        public boolean exclusive() {
            return this.getConfiguration().exclusive();
        }

        @Generated
        public boolean passive() {
            return this.getConfiguration().passive();
        }

        @Generated
        public Function<Client, String> getTrackerIdFactory() {
            return this.getConfiguration().getTrackerIdFactory();
        }

        @Generated
        public Duration getPurgeDelay() {
            return this.getConfiguration().getPurgeDelay();
        }
    }
}

