package io.es4j.launcher;

import io.es4j.Aggregate;
import io.es4j.Command;
import io.es4j.Es4jDeployment;
import io.es4j.Event;
import io.es4j.core.CommandHandler;
import io.es4j.core.verticles.AggregateBridge;
import io.es4j.infrastructure.config.Es4jConfigurationHandler;
import io.es4j.infrastructure.misc.Es4jServiceLoader;
import io.es4j.infrastructure.models.AvailableAggregate;
import io.reactiverse.contextual.logging.ContextualData;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiOnItem;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.vertx.UniHelper;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.impl.cpu.CpuCoreSensor;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.eventbus.DeliveryContext;
import io.vertx.mutiny.core.eventbus.Message;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.crac.Context;
import org.crac.Core;
import org.crac.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/launcher/Es4jMain.class */
public class Es4jMain extends AbstractVerticle implements Resource {
    private static final Logger LOGGER = LoggerFactory.getLogger(Es4jMain.class);
    public static final List<Es4jDeployment> AGGREGATES = Es4jServiceLoader.bootstrapList();
    private static final List<AggregateDeployer<? extends Aggregate>> AGGREGATE_DEPLOYERS = new ArrayList();
    public static final Map<Class<? extends Aggregate>, List<Class<? extends Command>>> AGGREGATE_COMMANDS = new HashMap();
    public static final Map<Class<? extends Aggregate>, List<Class<Event>>> AGGREGATE_EVENTS = new HashMap();
    public static final Set<String> bridges = new HashSet();

    public Es4jMain() {
        Core.getGlobalContext().register(this);
    }

    public void beforeCheckpoint(Context<? extends Resource> context) throws Exception {
        Promise<Void> promise = Promise.promise();
        stop(promise);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        promise.future().onComplete(asyncResult -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }

    public void afterRestore(Context<? extends Resource> context) throws Exception {
        Promise<Void> promise = Promise.promise();
        start(promise);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        promise.future().onComplete(asyncResult -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }

    public void start(Promise<Void> promise) {
        LOGGER.info(" ---- Starting {}::{} ---- ", getClass().getName(), this.context.deploymentID());
        Infrastructure.setDroppedExceptionHandler(th -> {
            LOGGER.error("[-- [Es4j]  had to drop the following exception --]", th);
        });
        this.vertx.exceptionHandler(this::handleException);
        addEventBusInterceptors();
        this.vertx.eventBus().consumer("/es4j/available-aggregates", this::availableAggregates);
        startAggregateResources(promise);
    }

    private void availableAggregates(Message<JsonObject> message) {
        message.reply(new JsonArray(AGGREGATES.stream().map(es4jDeployment -> {
            return new AvailableAggregate(CommandHandler.camelToKebab(es4jDeployment.aggregateClass().getSimpleName()), es4jDeployment.tenants());
        }).toList()));
    }

    private void addEventBusInterceptors() {
        this.vertx.eventBus().addOutboundInterceptor(this::addContextualData);
        this.vertx.eventBus().addInboundInterceptor(this::addContextualData);
    }

    private void addContextualData(DeliveryContext<Object> deliveryContext) {
        String str = deliveryContext.message().headers().get("TENANT");
        String str2 = deliveryContext.message().headers().get("AGGREGATE");
        if (str != null) {
            ContextualData.put("TENANT", str);
        }
        if (str2 != null) {
            ContextualData.put("AGGREGATE", str2);
        }
        deliveryContext.next();
    }

    private void startAggregateResources(Promise<Void> promise) {
        Stream<R> map = AGGREGATES.stream().map(es4jDeployment -> {
            return new AggregateDeployer(es4jDeployment.aggregateClass(), es4jDeployment, this.vertx, this.context.deploymentID());
        });
        List<AggregateDeployer<? extends Aggregate>> list = AGGREGATE_DEPLOYERS;
        Objects.requireNonNull(list);
        map.forEach((v1) -> {
            r1.add(v1);
        });
        List list2 = AGGREGATE_DEPLOYERS.stream().map(aggregateDeployer -> {
            Promise<Void> promise2 = Promise.promise();
            aggregateDeployer.deploy(promise2);
            return UniHelper.toUni(promise2.future());
        }).toList();
        if (list2.isEmpty()) {
            throw new IllegalStateException("Aggregates not found");
        }
        Uni.join().all(list2).andFailFast().flatMap(list3 -> {
            return deployBridges();
        }).subscribe().with(r7 -> {
            promise.complete();
            LOGGER.info(" ----  {}::{} Started  ---- ", getClass().getName(), this.context.deploymentID());
        }, th -> {
            LOGGER.error(" ----  {}::{} Stopped  ---- ", new Object[]{getClass().getName(), this.context.deploymentID(), th});
            this.vertx.closeAndForget();
            promise.fail(th);
        });
    }

    private Uni<Void> deployBridges() {
        Uni deployVerticle = this.vertx.deployVerticle(AggregateBridge::new, new DeploymentOptions().setInstances(CpuCoreSensor.availableProcessors() * 2));
        Set<String> set = bridges;
        Objects.requireNonNull(set);
        return deployVerticle.map((v1) -> {
            return r1.add(v1);
        }).replaceWithVoid();
    }

    private void handleException(Throwable th) {
        LOGGER.error("[-- Es4j  Main had to drop the following exception --]", th);
    }

    public void stop(Promise<Void> promise) {
        LOGGER.warn(" ---- Stopping  {}::{}  ---- ", getClass().getName(), this.context.deploymentID());
        UniSubscribe subscribe = close().subscribe();
        Consumer consumer = r3 -> {
            promise.complete();
        };
        Objects.requireNonNull(promise);
        subscribe.with(consumer, promise::fail);
    }

    private Uni<Void> close() {
        Es4jConfigurationHandler.close();
        ArrayList arrayList = new ArrayList();
        if (!bridges.isEmpty()) {
            MultiOnItem onItem = Multi.createFrom().iterable(bridges).onItem();
            Vertx vertx = this.vertx;
            Objects.requireNonNull(vertx);
            arrayList.add(onItem.transformToUniAndMerge(vertx::undeploy).collect().asList().replaceWithVoid());
        }
        return Multi.createFrom().iterable(AGGREGATE_DEPLOYERS).onItem().transformToUniAndMerge((v0) -> {
            return v0.close();
        }).collect().asList().replaceWithVoid();
    }
}
