package io.es4j.infrastructure.bus;

import io.es4j.Aggregate;
import io.es4j.Command;
import io.es4j.core.exceptions.CommandRejected;
import io.es4j.core.exceptions.Es4jException;
import io.es4j.core.exceptions.NodeUnavailable;
import io.es4j.core.exceptions.UnknownCommand;
import io.es4j.core.objects.AggregateState;
import io.es4j.core.objects.ErrorSource;
import io.es4j.core.objects.Es4jError;
import io.es4j.infrastructure.models.AggregatePlainKey;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.FixedDemandPacer;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.json.JsonObject;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.eventbus.Message;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.function.Consumer;
import org.ishugaliy.allgood.consistent.hash.HashRing;
import org.ishugaliy.allgood.consistent.hash.hasher.DefaultHasher;
import org.ishugaliy.allgood.consistent.hash.node.SimpleNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/bus/AggregateBus.class */
public class AggregateBus {
    public static final String COMMAND_BRIDGE = "command-bridge";
    private static final Logger LOGGER = LoggerFactory.getLogger(AggregateBus.class);
    public static final Map<Class<? extends Aggregate>, HashRing<SimpleNode>> HASH_RING_MAP = new HashMap();

    private AggregateBus() {
    }

    private static HashRing<SimpleNode> startHashRing(Class<? extends Aggregate> cls) {
        return HashRing.newBuilder().name(cls.getName()).hasher(DefaultHasher.MURMUR_3).partitionRate(100000).build();
    }

    public static <T extends Aggregate> Uni<Void> startChannel(Vertx vertx, Class<T> cls, String str) {
        HASH_RING_MAP.put(cls, startHashRing(cls));
        return vertx.eventBus().consumer(AddressResolver.invokeChannel(cls)).handler(message -> {
            broadcastActorAddress(vertx, cls, str);
        }).exceptionHandler(th -> {
            handlerThrowable(th, cls);
        }).completionHandler().call(r5 -> {
            return vertx.eventBus().consumer(AddressResolver.broadcastChannel(cls)).handler(message2 -> {
                synchronizeChannel(message2, cls);
            }).exceptionHandler(th2 -> {
                handlerThrowable(th2, cls);
            }).completionHandler();
        });
    }

    public static <T extends Aggregate> Uni<Void> waitForRegistration(String str, Class<T> cls) {
        return Multi.createBy().repeating().supplier(() -> {
            return Boolean.valueOf(HASH_RING_MAP.get(cls).getNodes().stream().filter(simpleNode -> {
                return simpleNode.getKey().equals(AddressResolver.nodeAddress(cls, str));
            }).toList().isEmpty());
        }).atMost(10L).capDemandsTo(1L).paceDemand().using(new FixedDemandPacer(1L, Duration.ofMillis(500L))).collect().last().map(Unchecked.function(bool -> {
            if (Boolean.TRUE.equals(bool)) {
                throw new NodeUnavailable(new Es4jError(null, "Hash ring synchronizer was still empty after 10 seconds", null, null, null, null));
            }
            return bool;
        })).replaceWithVoid();
    }

    public static <T extends Aggregate> void broadcastActorAddress(Vertx vertx, Class<T> cls, String str) {
        LOGGER.debug("Publishing [{}] address[{}] ", cls.getSimpleName(), AddressResolver.nodeAddress(cls, str));
        vertx.eventBus().publish(AddressResolver.broadcastChannel(cls), AddressResolver.nodeAddress(cls, str), new DeliveryOptions().setLocalOnly(false).setTracingPolicy(TracingPolicy.ALWAYS).addHeader(Actions.ACTION.name(), Actions.ADD.name()));
    }

    public static <T extends Aggregate> void stop(Vertx vertx, Class<T> cls, String str) {
        vertx.eventBus().publish(AddressResolver.broadcastChannel(cls), AddressResolver.nodeAddress(cls, str), new DeliveryOptions().addHeader(Actions.ACTION.name(), Actions.REMOVE.name()));
    }

    public static <T extends Aggregate> void invokeActorsBroadcast(Class<T> cls, Vertx vertx) {
        vertx.eventBus().publish(AddressResolver.invokeChannel(cls), "", new DeliveryOptions().setLocalOnly(false));
    }

    public static <T extends Aggregate, C extends Command> Uni<Void> registerCommandConsumer(Vertx vertx, Class<T> cls, String str, Consumer<Message<JsonObject>> consumer, Class<C> cls2) {
        return registerEventBusBridge(vertx, cls, cls2).flatMap(r11 -> {
            return registerEventbusCommandConsumer(vertx, cls, str, consumer, cls2);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T extends Aggregate, C extends Command> Uni<Void> registerEventbusCommandConsumer(Vertx vertx, Class<T> cls, String str, Consumer<Message<JsonObject>> consumer, Class<C> cls2) {
        return vertx.eventBus().consumer(AddressResolver.commandConsumer(cls, str, cls2)).handler(consumer).exceptionHandler(th -> {
            dropped(cls, th, cls2);
        }).completionHandler().invoke(r7 -> {
            broadcastActorAddress(vertx, cls, str);
        });
    }

    private static <T extends Aggregate> Uni<Void> registerEventBusBridge(Vertx vertx, Class<T> cls, Class<? extends Command> cls2) {
        return vertx.eventBus().consumer(AddressResolver.commandBridge(cls, cls2)).handler(message -> {
            request(vertx, cls, (Command) ((JsonObject) message.body()).mapTo(cls2)).subscribe().with(aggregateState -> {
                message.reply(aggregateState.toJson());
            }, th -> {
                if (!(th instanceof Es4jException)) {
                    message.fail(400, th.getMessage());
                } else {
                    Es4jException es4jException = (Es4jException) th;
                    message.fail(es4jException.error().externalErrorCode().intValue(), JsonObject.mapFrom(es4jException.error()).encode());
                }
            });
        }).exceptionHandler(th -> {
            dropped(cls, th, cls2);
        }).completionHandler();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void dropped(Class<?> cls, Throwable th, Class<? extends Command> cls2) {
        LOGGER.error("[-- {} channel had to drop an exception during the handling of command {} --]", new Object[]{cls.getSimpleName(), cls2.getName(), th});
    }

    private static void addNode(String str, HashRing<SimpleNode> hashRing) {
        SimpleNode of = SimpleNode.of(str);
        if (hashRing.contains(of)) {
            return;
        }
        LOGGER.debug("Adding {} to {} hash-ring", hashRing.getName(), str);
        hashRing.add(of);
    }

    public static <T extends Aggregate> Uni<AggregateState<T>> request(Vertx vertx, Class<T> cls, Command command) {
        AggregatePlainKey aggregatePlainKey = new AggregatePlainKey(cls.getName(), command.aggregateId(), command.tenant());
        JsonObject mapFrom = JsonObject.mapFrom(command);
        String resolveNode = resolveNode(cls, aggregatePlainKey, command);
        LOGGER.debug("Proxying  {} -> {}", resolveNode, mapFrom.encodePrettily());
        return vertx.eventBus().request(resolveNode, mapFrom, new DeliveryOptions().setTracingPolicy(TracingPolicy.ALWAYS).setLocalOnly(!vertx.isClustered()).setSendTimeout(2000L)).map(message -> {
            return AggregateState.fromJson((JsonObject) message.body(), cls);
        }).onFailure().transform(Unchecked.function(AggregateBus::transformError));
    }

    public static <T extends Aggregate> Uni<AggregateState<T>> requestWithRoles(Vertx vertx, Class<T> cls, Command command, List<String> list) {
        AggregatePlainKey aggregatePlainKey = new AggregatePlainKey(cls.getName(), command.aggregateId(), command.tenant());
        JsonObject mapFrom = JsonObject.mapFrom(command);
        String resolveNode = resolveNode(cls, aggregatePlainKey, command);
        StringJoiner stringJoiner = new StringJoiner(",");
        Objects.requireNonNull(stringJoiner);
        list.forEach((v1) -> {
            r1.add(v1);
        });
        LOGGER.debug("Proxying  {} -> {}", resolveNode, mapFrom.encodePrettily());
        return vertx.eventBus().request(resolveNode, mapFrom, new DeliveryOptions().setTracingPolicy(TracingPolicy.ALWAYS).setLocalOnly(!vertx.isClustered()).setSendTimeout(2000L).addHeader("auth-roles", stringJoiner.toString())).map(message -> {
            return AggregateState.fromJson((JsonObject) message.body(), cls);
        }).onFailure().transform(Unchecked.function(AggregateBus::transformError));
    }

    private static Throwable transformError(Throwable th) {
        if (!(th instanceof ReplyException)) {
            LOGGER.error("Unknown exception from handler", th);
            return new CommandRejected(new Es4jError(th.getMessage(), null, 500));
        }
        ReplyException replyException = (ReplyException) th;
        if (replyException.failureType() != ReplyFailure.RECIPIENT_FAILURE) {
            return new CommandRejected(new Es4jError(ErrorSource.INFRASTRUCTURE, AggregateBus.class.getName(), replyException.getMessage(), replyException.failureType().name(), String.valueOf(replyException.failureCode()), 500));
        }
        try {
            return new CommandRejected((Es4jError) new JsonObject(replyException.getLocalizedMessage()).mapTo(Es4jError.class));
        } catch (IllegalArgumentException e) {
            LOGGER.error("Unable to parse command", e);
            return new CommandRejected(new Es4jError(th.getMessage(), null, 500));
        }
    }

    public static <T extends Aggregate> String resolveNode(Class<T> cls, AggregatePlainKey aggregatePlainKey, Command command) {
        return AddressResolver.resolveCommandConsumer(((SimpleNode) HASH_RING_MAP.get(cls).locate(cls.getSimpleName() + aggregatePlainKey.aggregateId()).orElse((SimpleNode) HASH_RING_MAP.get(cls).getNodes().stream().findFirst().orElseThrow(() -> {
            return new NodeUnavailable(aggregatePlainKey.aggregateId());
        }))).getKey(), command.getClass());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void handlerThrowable(Throwable th, Class<?> cls) {
        LOGGER.error("[-- Channel for entity {} had to drop the following exception --]", cls.getSimpleName(), th);
    }

    private static void removeActor(String str, HashRing<SimpleNode> hashRing) {
        SimpleNode of = SimpleNode.of(str);
        if (!hashRing.contains(of)) {
            LOGGER.info("{} not present in hash-ring {}", str, hashRing.getName());
        } else {
            LOGGER.info("Removing {} form hash-ring {}", str, hashRing.getName());
            hashRing.remove(of);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void synchronizeChannel(Message<String> message, Class<? extends Aggregate> cls) {
        HASH_RING_MAP.computeIfAbsent(cls, cls2 -> {
            return HASH_RING_MAP.put(cls2, startHashRing(cls2));
        });
        switch (Actions.valueOf(message.headers().get(Actions.ACTION.name()))) {
            case ADD:
                addNode((String) message.body(), HASH_RING_MAP.get(cls));
                return;
            case REMOVE:
                removeActor((String) message.body(), HASH_RING_MAP.get(cls));
                return;
            default:
                throw UnknownCommand.unknown(((String) message.body()).getClass());
        }
    }
}
