package io.es4j.core.verticles;

import io.es4j.Aggregate;
import io.es4j.Aggregator;
import io.es4j.Behaviour;
import io.es4j.Command;
import io.es4j.core.CommandHandler;
import io.es4j.core.exceptions.Es4jException;
import io.es4j.core.objects.AggregateConfiguration;
import io.es4j.core.objects.AggregatorWrap;
import io.es4j.core.objects.BehaviourWrap;
import io.es4j.core.objects.ErrorSource;
import io.es4j.core.objects.Es4jError;
import io.es4j.core.objects.Es4jErrorBuilder;
import io.es4j.core.objects.LoadAggregate;
import io.es4j.infrastructure.Infrastructure;
import io.es4j.infrastructure.bus.AggregateBus;
import io.es4j.infrastructure.bus.ProjectionService;
import io.es4j.infrastructure.misc.Es4jServiceLoader;
import io.es4j.launcher.Es4jMain;
import io.reactiverse.contextual.logging.ContextualData;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.eventbus.DeliveryContext;
import io.vertx.mutiny.core.eventbus.Message;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.crac.Context;
import org.crac.Core;
import org.crac.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/core/verticles/AggregateVerticle.class */
public class AggregateVerticle<T extends Aggregate> extends AbstractVerticle implements Resource {
    protected static final Logger LOGGER = LoggerFactory.getLogger(AggregateVerticle.class);
    public static final String ACTION = "action";
    private final Class<T> aggregateClass;
    private final String deploymentID;
    private AggregateConfiguration aggregateConfiguration;
    private CommandHandler<T> commandHandler;
    private List<BehaviourWrap> behaviourWraps;
    private List<AggregatorWrap> aggregatorWraps;
    private Infrastructure infrastructure;
    private ProjectionService projectionService;

    public AggregateVerticle(Class<T> cls, String str) {
        this.aggregateClass = cls;
        this.deploymentID = str;
        Core.getGlobalContext().register(this);
    }

    public Uni<Void> asyncStart() {
        config().put("schema", CommandHandler.camelToKebab(this.aggregateClass.getSimpleName()));
        this.aggregateConfiguration = (AggregateConfiguration) config().getJsonObject("aggregate-configuration", new JsonObject()).mapTo(AggregateConfiguration.class);
        LOGGER.info("Event.x starting {}::{}", this.aggregateClass.getSimpleName(), this.deploymentID);
        this.aggregatorWraps = loadAggregators(this.aggregateClass);
        this.behaviourWraps = loadBehaviours(this.aggregateClass);
        this.infrastructure = new Infrastructure(Es4jServiceLoader.loadCache(), Es4jServiceLoader.loadEventStore(), Optional.empty(), Es4jServiceLoader.loadOffsetStore());
        this.infrastructure.start(this.aggregateClass, this.vertx, config());
        this.vertx.eventBus().addInboundInterceptor(this::addContextualData);
        this.commandHandler = new CommandHandler<>(this.vertx, this.aggregateClass, this.aggregatorWraps, this.behaviourWraps, this.infrastructure, this.aggregateConfiguration);
        this.projectionService = new ProjectionService(this.infrastructure.offsetStore(), this.infrastructure.eventStore(), this.aggregateClass);
        return this.projectionService.register(this.vertx).flatMap(r3 -> {
            return registerAggregateBus();
        });
    }

    private Uni<Void> registerAggregateBus() {
        return Multi.createFrom().iterable(this.behaviourWraps).onItem().transformToUniAndMerge(behaviourWrap -> {
            return AggregateBus.registerCommandConsumer(this.vertx, this.aggregateClass, this.deploymentID, message -> {
                messageHandler(behaviourWrap, message);
            }, behaviourWrap.commandClass());
        }).collect().asList().flatMap(obj -> {
            return AggregateBus.waitForRegistration(this.deploymentID, this.aggregateClass);
        }).replaceWithVoid();
    }

    private <A extends Aggregate, C extends Command> void messageHandler(BehaviourWrap<A, C> behaviourWrap, Message<JsonObject> message) {
        UniSubscribe subscribe = this.commandHandler.process(parseCommand(behaviourWrap.commandClass(), message)).subscribe();
        Objects.requireNonNull(message);
        subscribe.with((v1) -> {
            r1.reply(v1);
        }, th -> {
            if (th instanceof Es4jException) {
                Es4jException es4jException = (Es4jException) th;
                message.fail(es4jException.error().externalErrorCode().intValue(), JsonObject.mapFrom(es4jException.error()).encodePrettily());
            } else {
                LOGGER.error("Unexpected exception raised", th);
                message.fail(500, JsonObject.mapFrom(new Es4jError(th.getMessage(), th.getLocalizedMessage(), 500)).encode());
            }
        });
    }

    private static Command parseCommand(Class<? extends Command> cls, Message<JsonObject> message) {
        try {
            LOGGER.debug("Incoming command {} {}", cls.getName(), ((JsonObject) message.body()).encodePrettily());
            return (Command) ((JsonObject) message.body()).mapTo(cls);
        } catch (Exception e) {
            message.fail(400, JsonObject.mapFrom(Es4jErrorBuilder.builder().cause("Invalid message body").errorSource(ErrorSource.UNKNOWN).hint(e.getMessage()).externalErrorCode(400).internalCode("400L").build()).encode());
            throw new IllegalArgumentException();
        }
    }

    private void addContextualData(DeliveryContext<Object> deliveryContext) {
        ContextualData.put("AGGREGATE", this.aggregateClass.getSimpleName());
        deliveryContext.next();
    }

    public static <T extends Aggregate> List<BehaviourWrap> loadBehaviours(Class<T> cls) {
        ArrayList arrayList = new ArrayList(Es4jServiceLoader.loadBehaviours().stream().filter(behaviour -> {
            return ((Class) parseCommandBehaviourGenericTypes(behaviour.getClass()).getItem1()).isAssignableFrom(cls);
        }).map(behaviour2 -> {
            return new BehaviourWrap(behaviour2, cls, (Class) parseCommandBehaviourGenericTypes(behaviour2.getClass()).getItem2());
        }).toList());
        if (arrayList.isEmpty()) {
            throw new IllegalStateException("Behaviours not found for " + cls.getSimpleName());
        }
        if (!Es4jMain.AGGREGATE_COMMANDS.containsKey(cls)) {
            ArrayList arrayList2 = new ArrayList(arrayList.stream().map(behaviourWrap -> {
                return behaviourWrap.commandClass();
            }).toList());
            arrayList2.add(LoadAggregate.class);
            Es4jMain.AGGREGATE_COMMANDS.put(cls, arrayList2);
        }
        arrayList.add(new BehaviourWrap((aggregate, command) -> {
            return List.of();
        }, cls, LoadAggregate.class));
        return arrayList;
    }

    public static <T extends Aggregate> List<AggregatorWrap> loadAggregators(Class<T> cls) {
        List<AggregatorWrap> list = Es4jServiceLoader.loadAggregators().stream().map(aggregator -> {
            Tuple2<Class<? extends Aggregate>, Class<?>> parseAggregatorClass = parseAggregatorClass(aggregator.getClass());
            return new AggregatorWrap(aggregator, (Class) parseAggregatorClass.getItem1(), (Class) parseAggregatorClass.getItem2());
        }).filter(aggregatorWrap -> {
            return aggregatorWrap.entityAggregateClass().isAssignableFrom(cls);
        }).toList();
        if (list.isEmpty()) {
            throw new IllegalStateException("Aggregators not found for " + cls.getSimpleName());
        }
        Es4jMain.AGGREGATE_EVENTS.putIfAbsent(cls, list.stream().map(aggregatorWrap2 -> {
            return aggregatorWrap2.eventClass();
        }).toList());
        return list;
    }

    public static Tuple2<Class<? extends Aggregate>, Class<?>> parseAggregatorClass(Class<? extends Aggregator> cls) {
        Type[] genericInterfaces = cls.getGenericInterfaces();
        if (genericInterfaces.length > 1) {
            throw new IllegalArgumentException(cls.getName() + " should only implement Aggregator interface");
        }
        if (genericInterfaces.length == 0) {
            throw new IllegalArgumentException(cls.getName() + " should implement Aggregator interface");
        }
        Type type = genericInterfaces[0];
        if (!(type instanceof ParameterizedType)) {
            throw new IllegalArgumentException("Invalid Interface -> " + String.valueOf(type.getClass()));
        }
        Type[] actualTypeArguments = ((ParameterizedType) type).getActualTypeArguments();
        try {
            return Tuple2.of(Class.forName(actualTypeArguments[0].getTypeName()), Class.forName(actualTypeArguments[1].getTypeName()));
        } catch (ClassNotFoundException e) {
            throw new IllegalArgumentException("Unable to parse generic type", e);
        }
    }

    public static Tuple2<Class<? extends Aggregate>, Class<? extends Command>> parseCommandBehaviourGenericTypes(Class<? extends Behaviour> cls) {
        Type[] genericInterfaces = cls.getGenericInterfaces();
        if (genericInterfaces.length > 1) {
            throw new IllegalArgumentException(cls.getName() + "should only implement Behaviour interface");
        }
        if (genericInterfaces.length == 0) {
            throw new IllegalArgumentException(cls.getName() + " should implement Behaviour interface");
        }
        Type type = genericInterfaces[0];
        if (!(type instanceof ParameterizedType)) {
            throw new IllegalArgumentException("Invalid interface -> " + String.valueOf(type.getClass()));
        }
        Type[] actualTypeArguments = ((ParameterizedType) type).getActualTypeArguments();
        try {
            return Tuple2.of(Class.forName(actualTypeArguments[0].getTypeName()), Class.forName(actualTypeArguments[1].getTypeName()));
        } catch (ClassNotFoundException e) {
            throw new IllegalArgumentException("Unable to get behaviour generic types -> ", e);
        }
    }

    public Uni<Void> asyncStop() {
        LOGGER.info("Stopping {} {}", this.aggregateClass.getSimpleName(), this.deploymentID);
        AggregateBus.stop(this.vertx, this.aggregateClass, this.deploymentID);
        return this.infrastructure.stop();
    }

    public void beforeCheckpoint(Context<? extends Resource> context) throws Exception {
        Promise promise = Promise.promise();
        stop(promise);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        promise.future().onComplete(asyncResult -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }

    public void afterRestore(Context<? extends Resource> context) throws Exception {
        Promise promise = Promise.promise();
        start(promise);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        promise.future().onComplete(asyncResult -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }
}
