package io.es4j.launcher;

import io.es4j.Aggregate;
import io.es4j.Bootstrap;
import io.es4j.Command;
import io.es4j.Event;
import io.es4j.core.tasks.AggregateHeartbeat;
import io.es4j.core.tasks.EventProjectionPoller;
import io.es4j.core.tasks.StateProjectionPoller;
import io.es4j.core.verticles.AggregateBridge;
import io.es4j.infrastructure.config.Es4jConfigurationHandler;
import io.es4j.infrastructure.misc.Es4jServiceLoader;
import io.es4j.task.CronTaskDeployer;
import io.es4j.task.TimerTaskDeployer;
import io.reactiverse.contextual.logging.ContextualData;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.vertx.UniHelper;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.impl.cpu.CpuCoreSensor;
import io.vertx.mutiny.core.eventbus.DeliveryContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.crac.Context;
import org.crac.Core;
import org.crac.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/launcher/Es4jMain.class */
public class Es4jMain extends AbstractVerticle implements Resource {
    private CronTaskDeployer cronTaskDeployer;
    private TimerTaskDeployer timerTaskDeployer;
    protected static final Logger LOGGER = LoggerFactory.getLogger(Es4jMain.class);
    public static final List<Bootstrap> AGGREGATES = Es4jServiceLoader.bootstrapList();
    public static final List<EventProjectionPoller> EVENT_PROJECTIONS = new ArrayList();
    public static final List<StateProjectionPoller> STATE_PROJECTIONS = new ArrayList();
    private static final List<AggregateDeployer<? extends Aggregate>> AGGREGATE_DEPLOYERS = new ArrayList();
    public static final Map<Class<? extends Aggregate>, List<Class<? extends Command>>> AGGREGATE_COMMANDS = new HashMap();
    public static final Map<Class<? extends Aggregate>, List<Class<Event>>> AGGREGATE_EVENTS = new HashMap();
    public static final List<AggregateHeartbeat<? extends Aggregate>> HEARTBEATS = new ArrayList();

    public Es4jMain() {
        Core.getGlobalContext().register(this);
    }

    public void beforeCheckpoint(Context<? extends Resource> context) throws Exception {
        Promise<Void> promise = Promise.promise();
        stop(promise);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        promise.future().onComplete(asyncResult -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }

    public void afterRestore(Context<? extends Resource> context) throws Exception {
        Promise<Void> promise = Promise.promise();
        start(promise);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        promise.future().onComplete(asyncResult -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }

    public void start(Promise<Void> promise) {
        LOGGER.info(" ---- Starting {}::{} ---- ", getClass().getName(), this.context.deploymentID());
        Infrastructure.setDroppedExceptionHandler(th -> {
            LOGGER.error("[-- [Event.x]  had to drop the following exception --]", th);
        });
        this.vertx.exceptionHandler(this::handleException);
        this.cronTaskDeployer = new CronTaskDeployer(this.vertx);
        this.timerTaskDeployer = new TimerTaskDeployer(this.vertx);
        addEventBusInterceptors();
        startAggregateResources(promise);
    }

    private void addEventBusInterceptors() {
        this.vertx.eventBus().addOutboundInterceptor(this::addContextualData);
        this.vertx.eventBus().addInboundInterceptor(this::addContextualData);
    }

    private void addContextualData(DeliveryContext<Object> deliveryContext) {
        String str = deliveryContext.message().headers().get("TENANT");
        String str2 = deliveryContext.message().headers().get("AGGREGATE");
        if (str != null) {
            ContextualData.put("TENANT", str);
        }
        if (str2 != null) {
            ContextualData.put("AGGREGATE", str2);
        }
        deliveryContext.next();
    }

    private void startAggregateResources(Promise<Void> promise) {
        Stream<R> map = AGGREGATES.stream().map(bootstrap -> {
            return new AggregateDeployer(bootstrap.fileConfigurations(), bootstrap.aggregateClass(), this.vertx, this.context.deploymentID());
        });
        List<AggregateDeployer<? extends Aggregate>> list = AGGREGATE_DEPLOYERS;
        Objects.requireNonNull(list);
        map.forEach((v1) -> {
            r1.add(v1);
        });
        List list2 = AGGREGATE_DEPLOYERS.stream().map(aggregateDeployer -> {
            Promise<Void> promise2 = Promise.promise();
            aggregateDeployer.deploy(promise2);
            return UniHelper.toUni(promise2.future());
        }).toList();
        if (list2.isEmpty()) {
            throw new IllegalStateException("Aggregates not found");
        }
        Uni.join().all(list2).andFailFast().invoke(list3 -> {
            deployHeartBeat();
        }).invoke(list4 -> {
            deployProjections();
        }).flatMap(list5 -> {
            return deployBridges();
        }).subscribe().with(r7 -> {
            promise.complete();
            LOGGER.info(" ----  {}::{} Started  ---- ", getClass().getName(), this.context.deploymentID());
        }, th -> {
            LOGGER.error(" ----  {}::{} Stopped  ---- ", new Object[]{getClass().getName(), this.context.deploymentID(), th});
            this.vertx.closeAndForget();
            promise.fail(th);
        });
    }

    private void deployProjections() {
        List<EventProjectionPoller> list = EVENT_PROJECTIONS;
        CronTaskDeployer cronTaskDeployer = this.cronTaskDeployer;
        Objects.requireNonNull(cronTaskDeployer);
        list.forEach((v1) -> {
            r1.deploy(v1);
        });
        List<StateProjectionPoller> list2 = STATE_PROJECTIONS;
        CronTaskDeployer cronTaskDeployer2 = this.cronTaskDeployer;
        Objects.requireNonNull(cronTaskDeployer2);
        list2.forEach((v1) -> {
            r1.deploy(v1);
        });
    }

    private void deployHeartBeat() {
        List<AggregateHeartbeat<? extends Aggregate>> list = HEARTBEATS;
        TimerTaskDeployer timerTaskDeployer = this.timerTaskDeployer;
        Objects.requireNonNull(timerTaskDeployer);
        list.forEach((v1) -> {
            r1.deploy(v1);
        });
    }

    private Uni<Void> deployBridges() {
        return this.vertx.deployVerticle(AggregateBridge::new, new DeploymentOptions().setInstances(CpuCoreSensor.availableProcessors() * 2)).replaceWithVoid();
    }

    private void handleException(Throwable th) {
        LOGGER.error("[-- Event.x Main had to drop the following exception --]", th);
    }

    public void stop(Promise<Void> promise) {
        LOGGER.warn(" ---- Stopping  {}::{}  ---- ", getClass().getName(), this.context.deploymentID());
        UniSubscribe subscribe = close().subscribe();
        Consumer consumer = r3 -> {
            promise.complete();
        };
        Objects.requireNonNull(promise);
        subscribe.with(consumer, promise::fail);
    }

    private Uni<Void> close() {
        Es4jConfigurationHandler.close();
        this.timerTaskDeployer.close();
        this.cronTaskDeployer.close();
        return Multi.createFrom().iterable(AGGREGATE_DEPLOYERS).onItem().transformToUniAndMerge((v0) -> {
            return v0.close();
        }).collect().asList().replaceWithVoid();
    }
}
