package io.es4j.core;

import io.es4j.Aggregate;
import io.es4j.Command;
import io.es4j.Event;
import io.es4j.core.exceptions.CommandRejected;
import io.es4j.core.exceptions.UnknownCommand;
import io.es4j.core.exceptions.UnknownEvent;
import io.es4j.core.objects.AggregateConfiguration;
import io.es4j.core.objects.AggregateState;
import io.es4j.core.objects.AggregatorWrap;
import io.es4j.core.objects.BehaviourWrap;
import io.es4j.core.objects.Es4jError;
import io.es4j.core.objects.EventbusLiveStreams;
import io.es4j.core.objects.LoadAggregate;
import io.es4j.core.objects.SnapshotEvent;
import io.es4j.infrastructure.Infrastructure;
import io.es4j.infrastructure.misc.EventParser;
import io.es4j.infrastructure.models.AggregateEventStream;
import io.es4j.infrastructure.models.AggregateKey;
import io.es4j.infrastructure.models.AppendInstruction;
import io.es4j.infrastructure.models.EventStream;
import io.es4j.infrastructure.models.EventStreamBuilder;
import io.es4j.infrastructure.models.PruneEventStream;
import io.es4j.infrastructure.models.StartStream;
import io.es4j.sql.exceptions.Conflict;
import io.smallrye.mutiny.Uni;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.mutiny.core.Vertx;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/core/CommandHandler.class */
public class CommandHandler<T extends Aggregate> {
    private final List<BehaviourWrap> behaviours;
    private final List<AggregatorWrap> aggregators;
    private final Infrastructure infrastructure;
    private static final Logger LOGGER = LoggerFactory.getLogger(CommandHandler.class);
    private final Class<T> aggregateClass;
    private final AggregateConfiguration aggregateConfiguration;
    private final Vertx vertx;

    public CommandHandler(Vertx vertx, Class<T> cls, List<AggregatorWrap> list, List<BehaviourWrap> list2, Infrastructure infrastructure, AggregateConfiguration aggregateConfiguration) {
        this.vertx = vertx;
        this.infrastructure = infrastructure;
        this.aggregateClass = cls;
        this.aggregators = list;
        this.behaviours = list2;
        if (list2.isEmpty()) {
            throw new IllegalStateException("Empty behaviours");
        }
        if (list.isEmpty()) {
            throw new IllegalStateException("Empty behaviours");
        }
        this.aggregateConfiguration = aggregateConfiguration;
    }

    private Uni<JsonObject> replay(LoadAggregate loadAggregate) {
        return (Objects.nonNull(loadAggregate.dateTo()) || Objects.nonNull(loadAggregate.versionTo())) ? replayAndAggregate(loadAggregate).map((v0) -> {
            return v0.toJson();
        }) : replayAggregateAndCache(loadAggregate.aggregateId(), loadAggregate.tenant()).map((v0) -> {
            return v0.toJson();
        });
    }

    public Uni<JsonObject> process(Command command) {
        return command instanceof LoadAggregate ? replay((LoadAggregate) command) : command.options().simulate() ? replayAndSimulate(command) : replayAndAppend(command);
    }

    private Uni<JsonObject> replayAndAppend(Command command) {
        return replayAggregateAndCache(command.aggregateId(), command.tenant()).flatMap(aggregateState -> {
            return processCommand(aggregateState, command).onFailure(Conflict.class).recoverWithUni(() -> {
                return playFromLastJournalOffset(command.aggregateId(), command.tenant(), aggregateState).flatMap(aggregateState -> {
                    return processCommand(aggregateState, command);
                }).onFailure(Conflict.class).retry().atMost(5L);
            }).onFailure().invoke(th -> {
                logRejectedCommand(th, command);
            });
        }).map((v0) -> {
            return v0.toJson();
        });
    }

    private Uni<JsonObject> replayAndSimulate(Command command) {
        return replayAggregateAndCache(command.aggregateId(), command.tenant()).map(aggregateState -> {
            checkCommandId(aggregateState, command);
            applyEvents(aggregateState, applyCommandBehaviour(aggregateState, command));
            return aggregateState.toJson();
        });
    }

    private T aggregateEvent(T t, Event event) {
        Event event2 = event;
        AggregatorWrap findAggregator = findAggregator(event);
        LOGGER.debug("Applying {} schema versionTo {} ", findAggregator.delegate().getClass().getSimpleName(), Integer.valueOf(findAggregator.delegate().currentSchemaVersion()));
        if (findAggregator.delegate().currentSchemaVersion() != event.schemaVersion()) {
            LOGGER.debug("Schema versionTo mismatch, migrating event {} {} ", event.getClass().getName(), JsonObject.mapFrom(event).encodePrettily());
            event2 = findAggregator.delegate().transformFrom(event.schemaVersion(), JsonObject.mapFrom(event));
        }
        T t2 = (T) findAggregator.delegate().apply(t, event2);
        LOGGER.debug("State after aggregation {}", t2);
        return t2;
    }

    private AggregatorWrap findAggregator(Event event) {
        return this.aggregators.stream().filter(aggregatorWrap -> {
            return aggregatorWrap.eventClass().getName().equals(event.getClass().getName());
        }).findFirst().orElseThrow(() -> {
            return UnknownEvent.unknown(event.getClass());
        });
    }

    private List<Event> applyCommandBehaviour(T t, Command command) {
        BehaviourWrap findBehaviour = findBehaviour(command);
        LOGGER.debug("Applying {} {} ", findBehaviour.delegate().getClass().getSimpleName(), JsonObject.mapFrom(command));
        List<Event> process = findBehaviour.process(t, command);
        LOGGER.debug("{} behaviour produced {}", findBehaviour.delegate().getClass().getSimpleName(), new JsonArray(process).encodePrettily());
        return process;
    }

    private BehaviourWrap findBehaviour(Command command) {
        return this.behaviours.stream().filter(behaviourWrap -> {
            return behaviourWrap.commandClass().getName().equals(command.getClass().getName());
        }).findFirst().orElseThrow(() -> {
            return UnknownCommand.unknown(command.getClass());
        });
    }

    private Uni<AggregateState<T>> replayAggregateAndCache(String str, String str2) {
        AggregateState<T> aggregateState = null;
        AggregateKey<T> aggregateKey = new AggregateKey<>(this.aggregateClass, str, str2);
        if (this.infrastructure.cache().isPresent()) {
            LOGGER.info("Fetching from cache-store {}", aggregateKey);
            aggregateState = this.infrastructure.cache().get().get(aggregateKey);
        }
        if (aggregateState != null) {
            return Uni.createFrom().item(aggregateState);
        }
        LOGGER.info("Fetching from event-store {}", aggregateKey);
        return playFromLastJournalOffset(str, str2, new AggregateState<>(this.aggregateClass));
    }

    private Uni<AggregateState<T>> playFromLastJournalOffset(String str, String str2, AggregateState<T> aggregateState) {
        AggregateEventStream<T> streamInstruction = streamInstruction(str, str2, aggregateState);
        LOGGER.debug("Playing aggregate stream with instruction {}", streamInstruction.toJson().encodePrettily());
        return this.infrastructure.eventStore().fetch(streamInstruction).map(list -> {
            list.forEach(event -> {
                applyEvent(aggregateState, event);
            });
            return cacheState(aggregateState);
        });
    }

    private Uni<AggregateState<T>> replayAndAggregate(LoadAggregate loadAggregate) {
        AggregateState aggregateState = new AggregateState(this.aggregateClass);
        EventStream eventStreamInstruction = eventStreamInstruction(loadAggregate);
        LOGGER.debug("Playing aggregate stream with instruction {}", eventStreamInstruction);
        return this.infrastructure.eventStore().stream(eventStreamInstruction, event -> {
            applyEvent(aggregateState, event);
        }).replaceWith(aggregateState);
    }

    private EventStream eventStreamInstruction(LoadAggregate loadAggregate) {
        return EventStreamBuilder.builder().aggregateIds(List.of(loadAggregate.aggregateId())).tenantId(loadAggregate.tenant()).offset(0L).to(loadAggregate.dateTo()).versionTo(loadAggregate.versionTo()).build();
    }

    private AggregateEventStream<T> streamInstruction(String str, String str2, AggregateState<T> aggregateState) {
        return new AggregateEventStream<>(str, str2, aggregateState.currentVersion(), aggregateState.currentJournalOffset(), SnapshotEvent.class, this.aggregateConfiguration.snapshotThreshold());
    }

    private void applyEvents(AggregateState<T> aggregateState, List<io.es4j.infrastructure.models.Event> list) {
        list.stream().sorted(Comparator.comparingLong((v0) -> {
            return v0.eventVersion();
        })).forEachOrdered(event -> {
            Event event = EventParser.getEvent(event.eventClass(), event.event());
            if (event.getClass().isAssignableFrom(SnapshotEvent.class)) {
                LOGGER.debug("Aggregating snapshot {}", event.event().encodePrettily());
                applySnapshot(aggregateState, event, event);
            } else {
                applyEvent(aggregateState, event, event);
            }
            aggregateState.addKnownCommand(event.commandId()).setCurrentVersion(event.eventVersion());
        });
    }

    private void applyEvent(AggregateState<T> aggregateState, io.es4j.infrastructure.models.Event event, Event event2) {
        T aggregateEvent = aggregateEvent(aggregateState.state(), event2);
        if (aggregateState.knownCommands().stream().noneMatch(str -> {
            return str.equals(event.commandId());
        })) {
            LOGGER.debug("Acknowledging command {}", event.commandId());
            aggregateState.knownCommands().add(event.commandId());
        }
        aggregateState.setState(aggregateEvent);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void applySnapshot(AggregateState<T> aggregateState, io.es4j.infrastructure.models.Event event, Event event2) {
        LOGGER.debug("Applying snapshot {}", JsonObject.mapFrom(event).encodePrettily());
        if (event2.schemaVersion() != aggregateState.state().schemaVersion()) {
            LOGGER.debug("Aggregate schema versionTo mismatch, migrating schema from {} to {}", Integer.valueOf(event2.schemaVersion()), Integer.valueOf(aggregateState.state().schemaVersion()));
            aggregateState.setState((Aggregate) aggregateState.aggregateClass().cast(aggregateState.state().transformSnapshot(event2.schemaVersion(), event.event())));
        } else {
            LOGGER.debug("Applying snapshot with schema versionTo {} to aggregate with schema versionTo {}", Integer.valueOf(event2.schemaVersion()), Integer.valueOf(aggregateState.state().schemaVersion()));
            aggregateState.setState((Aggregate) JsonObject.mapFrom(((SnapshotEvent) event2).state()).mapTo(aggregateState.aggregateClass()));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void applyEvent(AggregateState<T> aggregateState, io.es4j.infrastructure.models.Event event) {
        LOGGER.info("Aggregating event {} ", event.eventClass());
        Event event2 = EventParser.getEvent(event.eventClass(), event.event());
        if (event2.getClass().isAssignableFrom(SnapshotEvent.class)) {
            SnapshotEvent snapshotEvent = (SnapshotEvent) event2;
            aggregateState.knownCommands().clear();
            aggregateState.setState((Aggregate) JsonObject.mapFrom(snapshotEvent.state()).mapTo(aggregateState.aggregateClass())).addKnownCommands(snapshotEvent.knownCommands()).setCurrentVersion(event.eventVersion()).setCurrentJournalOffset(event.journalOffset());
        } else {
            T aggregateEvent = aggregateEvent(aggregateState.state(), event2);
            LOGGER.debug("State after aggregation {} ", JsonObject.mapFrom(aggregateEvent).encodePrettily());
            aggregateState.setState(aggregateEvent).addKnownCommand(event.commandId()).setCurrentJournalOffset(event.journalOffset()).setCurrentVersion(event.eventVersion());
        }
    }

    private List<io.es4j.infrastructure.models.Event> applyCommandBehaviour(AggregateState<T> aggregateState, Command command) {
        ArrayList<io.es4j.infrastructure.models.Event> transformEvents = transformEvents(aggregateState, command, (Event[]) applyCommandBehaviour((CommandHandler<T>) aggregateState.state(), command).toArray(new Event[0]));
        addOptionalSnapshot(aggregateState, command, transformEvents);
        return transformEvents;
    }

    public static <X extends Aggregate> ArrayList<io.es4j.infrastructure.models.Event> transformEvents(AggregateState<X> aggregateState, Command command, Event[] eventArr) {
        long longValue = aggregateState.currentVersion() == null ? 0L : aggregateState.currentVersion().longValue();
        return new ArrayList<>(IntStream.range(1, eventArr.length + 1).mapToObj(i -> {
            Event event = eventArr[i - 1];
            return new io.es4j.infrastructure.models.Event(command.aggregateId(), event.getClass().getName(), Long.valueOf(longValue + i), JsonObject.mapFrom(event), command.tenant(), command.uniqueId(), event.tags(), Integer.valueOf(event.schemaVersion()));
        }).toList());
    }

    private void addOptionalSnapshot(AggregateState<T> aggregateState, Command command, ArrayList<io.es4j.infrastructure.models.Event> arrayList) {
        if (aggregateState.state() == null || !Objects.nonNull(this.aggregateConfiguration.snapshotThreshold())) {
            return;
        }
        if (this.aggregateConfiguration.snapshotThreshold().intValue() <= Math.floorMod(aggregateState.currentVersion().longValue(), this.aggregateConfiguration.snapshotThreshold().intValue())) {
            aggregateState.setCurrentVersion(Long.valueOf(aggregateState.currentVersion().longValue() + 1));
            SnapshotEvent snapshotEvent = new SnapshotEvent(JsonObject.mapFrom(aggregateState.state()).getMap(), aggregateState.knownCommands().stream().toList(), aggregateState.currentVersion());
            LOGGER.debug("Appending a snapshot {}", JsonObject.mapFrom(snapshotEvent).encodePrettily());
            arrayList.add(new io.es4j.infrastructure.models.Event(command.aggregateId(), SnapshotEvent.class.getName(), aggregateState.currentVersion(), JsonObject.mapFrom(snapshotEvent), command.tenant(), command.uniqueId(), List.of("system"), Integer.valueOf(aggregateState.state().schemaVersion())));
        }
    }

    private <C extends Command> Uni<AggregateState<T>> processCommand(AggregateState<T> aggregateState, C c) {
        checkCommandId(aggregateState, c);
        List<io.es4j.infrastructure.models.Event> applyCommandBehaviour = applyCommandBehaviour(aggregateState, c);
        applyEvents(aggregateState, applyCommandBehaviour);
        return appendEvents(aggregateState, applyCommandBehaviour).map(r5 -> {
            return cacheState(aggregateState);
        }).invoke(aggregateState2 -> {
            publishToEventStream(aggregateState, applyCommandBehaviour);
        }).invoke(aggregateState3 -> {
            publishToStateStream(aggregateState);
        });
    }

    private void publishToEventStream(AggregateState<T> aggregateState, List<io.es4j.infrastructure.models.Event> list) {
        list.forEach(event -> {
            String eventLiveStream = EventbusLiveStreams.eventLiveStream(aggregateState.aggregateClass(), aggregateState.state().aggregateId(), aggregateState.state().tenant());
            try {
                this.vertx.eventBus().publish(eventLiveStream, JsonObject.mapFrom(event), new DeliveryOptions().setLocalOnly(false).setTracingPolicy(TracingPolicy.ALWAYS));
            } catch (Exception e) {
                LOGGER.error("Unable to publish state update for {}::{} on address {}", new Object[]{aggregateState.aggregateClass().getSimpleName(), aggregateState.state().aggregateId(), eventLiveStream});
            }
            LOGGER.debug("Event stream published for {}::{} to address {}", new Object[]{aggregateState.aggregateClass().getSimpleName(), aggregateState.state().aggregateId(), eventLiveStream});
        });
    }

    private void publishToStateStream(AggregateState<T> aggregateState) {
        String stateLiveStream = EventbusLiveStreams.stateLiveStream(aggregateState.aggregateClass(), aggregateState.state().aggregateId(), aggregateState.state().tenant());
        try {
            this.vertx.eventBus().publish(stateLiveStream, aggregateState.toJson(), new DeliveryOptions().setLocalOnly(false).setTracingPolicy(TracingPolicy.ALWAYS));
        } catch (Exception e) {
            LOGGER.error("Unable to publish state update for {}::{} on address {}", new Object[]{aggregateState.aggregateClass().getSimpleName(), aggregateState.state().aggregateId(), stateLiveStream});
        }
        LOGGER.debug("State update published for {}::{} to address {}", new Object[]{aggregateState.aggregateClass().getSimpleName(), aggregateState.state().aggregateId(), stateLiveStream});
    }

    private <C extends Command> void checkCommandId(AggregateState<T> aggregateState, C c) {
        if (aggregateState.knownCommands() == null || aggregateState.knownCommands().isEmpty()) {
            return;
        }
        aggregateState.knownCommands().stream().filter(str -> {
            return str.equals(c.uniqueId());
        }).findAny().ifPresent(str2 -> {
            throw new CommandRejected(new Es4jError("Command was already processed", "Command was already processed by aggregate", 400));
        });
    }

    private AggregateState<T> cacheState(AggregateState<T> aggregateState) {
        if (aggregateState.state() != null) {
            this.infrastructure.cache().ifPresent(aggregateCache -> {
                aggregateCache.put(new AggregateKey<>(this.aggregateClass, aggregateState.state().aggregateId(), aggregateState.state().tenant()), aggregateState);
            });
        }
        return aggregateState;
    }

    private Uni<Void> appendEvents(AggregateState<T> aggregateState, List<io.es4j.infrastructure.models.Event> list) {
        Uni<Void> voidItem = Uni.createFrom().voidItem();
        if (list.stream().anyMatch(event -> {
            return event.eventVersion().longValue() == 1;
        })) {
            voidItem = this.infrastructure.eventStore().startStream(new StartStream<>(this.aggregateClass, aggregateState.state().aggregateId(), aggregateState.state().tenant()));
        }
        return voidItem.flatMap(r11 -> {
            return this.infrastructure.eventStore().append(new AppendInstruction<>(this.aggregateClass, aggregateState.state().aggregateId(), aggregateState.state().tenant(), list));
        }).invoke(r7 -> {
            dumpToSecondaryStore(aggregateState, list);
        });
    }

    private void dumpToSecondaryStore(AggregateState<T> aggregateState, List<io.es4j.infrastructure.models.Event> list) {
        if (this.infrastructure.secondaryEventStore().isPresent() && list.stream().anyMatch(event -> {
            return event.eventClass().equals(SnapshotEvent.class.getName());
        })) {
            this.infrastructure.eventStore().fetch(new AggregateEventStream<>(aggregateState.state().aggregateId(), aggregateState.state().tenant(), 0L, null, null, null)).flatMap(list2 -> {
                Long findMaxSnapshotVersion = findMaxSnapshotVersion(list2);
                return this.infrastructure.secondaryEventStore().get().append(new AppendInstruction<>(aggregateState.aggregateClass(), aggregateState.state().aggregateId(), aggregateState.state().tenant(), getEventsToTrim(list2, findMaxSnapshotVersion))).replaceWith(findMaxSnapshotVersion);
            }).flatMap(l -> {
                return this.infrastructure.eventStore().trim(new PruneEventStream<>(aggregateState.aggregateClass(), aggregateState.state().aggregateId(), aggregateState.state().tenant(), l));
            }).subscribe().with(r3 -> {
                LOGGER.info("Primary event store pruned");
            }, th -> {
                LOGGER.info("Unable to prune primary store", th);
            });
        }
    }

    private static List<io.es4j.infrastructure.models.Event> getEventsToTrim(List<io.es4j.infrastructure.models.Event> list, Long l) {
        return list.stream().filter(event -> {
            return event.eventVersion().longValue() < l.longValue();
        }).toList();
    }

    private static Long findMaxSnapshotVersion(List<io.es4j.infrastructure.models.Event> list) {
        return (Long) list.stream().filter(event -> {
            return event.eventClass().equals(SnapshotEvent.class.getName());
        }).max(Comparator.comparingLong((v0) -> {
            return v0.eventVersion();
        })).map((v0) -> {
            return v0.eventVersion();
        }).orElseThrow(() -> {
            return new IllegalStateException("Snapshot not found");
        });
    }

    private void logRejectedCommand(Throwable th, Command command) {
        LOGGER.error("{} command rejected {}", new Object[]{command.getClass().getName(), JsonObject.mapFrom(command).encodePrettily(), th});
    }

    public static String camelToKebab(String str) {
        return str.replaceAll("([a-z])([A-Z]+)", "$1-$2").toLowerCase();
    }
}
