package io.es4j.core.verticles;

import io.es4j.infrastructure.misc.Es4jServiceLoader;
import io.es4j.queue.MessageProcessor;
import io.es4j.queue.QueueTransactionManager;
import io.es4j.queue.TaskSubscriber;
import io.es4j.queue.exceptions.MessageException;
import io.es4j.queue.exceptions.QueueError;
import io.es4j.queue.models.MessageProcessorManager;
import io.es4j.queue.models.MessageProcessorWrapper;
import io.es4j.queue.models.QueueConfiguration;
import io.es4j.queue.models.QueueImplementation;
import io.es4j.queue.models.TransactionProvider;
import io.es4j.queue.postgres.PgQueueTransaction;
import io.es4j.queue.postgres.PgTaskSubscriber;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.crac.Context;
import org.crac.Core;
import org.crac.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/core/verticles/TaskProcessorVerticle.class */
public class TaskProcessorVerticle extends AbstractVerticle implements Resource {
    private final List<MessageProcessor> messageProcessors = ServiceLoader.load(MessageProcessor.class).stream().map((v0) -> {
        return v0.get();
    }).toList();
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskProcessorVerticle.class);
    private TaskSubscriber subscriber;
    private QueueConfiguration taskConfiguration;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.es4j.core.verticles.TaskProcessorVerticle$1, reason: invalid class name */
    /* loaded from: input_file:io/es4j/core/verticles/TaskProcessorVerticle$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$es4j$queue$models$TransactionProvider;
        static final /* synthetic */ int[] $SwitchMap$io$es4j$queue$models$QueueImplementation = new int[QueueImplementation.values().length];

        static {
            try {
                $SwitchMap$io$es4j$queue$models$QueueImplementation[QueueImplementation.PG_QUEUE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$es4j$queue$models$QueueImplementation[QueueImplementation.RABBITMQ.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$es4j$queue$models$QueueImplementation[QueueImplementation.SOLACE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$io$es4j$queue$models$TransactionProvider = new int[TransactionProvider.values().length];
            try {
                $SwitchMap$io$es4j$queue$models$TransactionProvider[TransactionProvider.VERTX_PG_CLIENT.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public void beforeCheckpoint(Context<? extends Resource> context) throws Exception {
        Promise promise = Promise.promise();
        stop(promise);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        promise.future().onComplete(asyncResult -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }

    public void afterRestore(Context<? extends Resource> context) throws Exception {
        Promise promise = Promise.promise();
        start(promise);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        promise.future().onComplete(asyncResult -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }

    public TaskProcessorVerticle() {
        Core.getGlobalContext().register(this);
    }

    public static Uni<Void> deploy(Vertx vertx, JsonObject jsonObject) {
        return Uni.createFrom().voidItem();
    }

    public Uni<Void> asyncStop() {
        return this.subscriber.unsubscribe();
    }

    public Uni<Void> asyncStart() {
        this.taskConfiguration = (QueueConfiguration) config().getJsonObject("task-queue", new JsonObject()).mapTo(QueueConfiguration.class);
        QueueTransactionManager transactionManager = getTransactionManager();
        this.subscriber = getSubscriber();
        return this.subscriber.subscribe(new MessageProcessorManager(this.taskConfiguration, bootstrapProcessors(deploymentID()), transactionManager, this.vertx)).replaceWithVoid();
    }

    private QueueTransactionManager getTransactionManager() {
        switch (AnonymousClass1.$SwitchMap$io$es4j$queue$models$TransactionProvider[this.taskConfiguration.transactionManagerImplementation().ordinal()]) {
            case 1:
                return new PgQueueTransaction(this.vertx, config());
            default:
                throw new IncompatibleClassChangeError();
        }
    }

    private TaskSubscriber getSubscriber() {
        switch (AnonymousClass1.$SwitchMap$io$es4j$queue$models$QueueImplementation[this.taskConfiguration.queueImplementation().ordinal()]) {
            case 1:
                return new PgTaskSubscriber(this.vertx, config());
            case 2:
                throw new MessageException(new QueueError("queue type not supported", "rabbit task queue is not yet implemented", 999));
            case 3:
                throw new MessageException(new QueueError("queue type not supported", "solace task queue is not yet implemented", 999));
            default:
                throw new IncompatibleClassChangeError();
        }
    }

    public List<MessageProcessorWrapper> bootstrapProcessors(String str) {
        HashMap hashMap = new HashMap();
        this.messageProcessors.forEach(messageProcessor -> {
            Class<?> firstGenericType = Es4jServiceLoader.getFirstGenericType(messageProcessor);
            if (hashMap.containsKey(firstGenericType)) {
                ((List) hashMap.get(firstGenericType)).add(messageProcessor);
                return;
            }
            ArrayList arrayList = new ArrayList();
            arrayList.add(messageProcessor);
            hashMap.put(firstGenericType, arrayList);
        });
        return hashMap.entrySet().stream().map(entry -> {
            Class cls = (Class) entry.getKey();
            validateProcessors((List) entry.getValue(), cls);
            MessageProcessorWrapper messageProcessorWrapper = new MessageProcessorWrapper(str, (MessageProcessor) ((List) entry.getValue()).stream().filter(messageProcessor2 -> {
                return messageProcessor2.tenants() == null;
            }).findFirst().orElseThrow(), (Map) ((List) entry.getValue()).stream().filter(messageProcessor3 -> {
                return messageProcessor3.tenants() != null;
            }).collect(Collectors.groupingBy((v0) -> {
                return v0.tenants();
            })), cls);
            logQueueConfiguration(messageProcessorWrapper, this.taskConfiguration);
            return messageProcessorWrapper;
        }).toList();
    }

    private static void validateProcessors(List<MessageProcessor> list, Class<?> cls) {
        if (list.stream().filter(messageProcessor -> {
            return messageProcessor.tenants() == null;
        }).toList().size() > 1) {
            throw new IllegalStateException("More than one default implementation for -> " + String.valueOf(cls));
        }
        ((Map) list.stream().filter(messageProcessor2 -> {
            return messageProcessor2.tenants() != null;
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.tenants();
        }))).forEach((list2, list3) -> {
            if (list3.size() > 1) {
                throw new IllegalStateException("More than one custom implementation for tenant " + String.valueOf(list2) + " queue -> " + String.valueOf(cls));
            }
        });
    }

    public static <T> void logQueueConfiguration(MessageProcessorWrapper<T> messageProcessorWrapper, QueueConfiguration queueConfiguration) {
        JsonObject jsonObject = new JsonObject();
        messageProcessorWrapper.customProcessors().forEach((list, messageProcessor) -> {
            list.forEach(str -> {
                jsonObject.put(str, messageProcessor.getClass().getName());
            });
        });
        LOGGER.info("Queue configuration {} ", new JsonObject().put("defaultProcessor", messageProcessorWrapper.defaultProcessor().getClass().getName()).put("customProcessors", jsonObject.encodePrettily()).put("payloadClass", messageProcessorWrapper.payloadClass().getName()).put("configuration", JsonObject.mapFrom(queueConfiguration).encodePrettily()).encodePrettily());
    }
}
