package io.es4j.infrastructure.bus;

import com.fasterxml.jackson.databind.JsonNode;
import com.github.victools.jsonschema.generator.OptionPreset;
import com.github.victools.jsonschema.generator.SchemaGenerator;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder;
import com.github.victools.jsonschema.generator.SchemaVersion;
import io.es4j.Aggregate;
import io.es4j.core.CommandHandler;
import io.es4j.core.exceptions.Es4jException;
import io.es4j.core.objects.AggregatorWrap;
import io.es4j.core.objects.BehaviourWrap;
import io.es4j.core.objects.Es4jError;
import io.es4j.core.objects.Offset;
import io.es4j.core.objects.OffsetBuilder;
import io.es4j.core.objects.OffsetKey;
import io.es4j.infrastructure.EventStore;
import io.es4j.infrastructure.OffsetStore;
import io.es4j.infrastructure.models.AvailableTypes;
import io.es4j.infrastructure.models.Event;
import io.es4j.infrastructure.models.EventFilter;
import io.es4j.infrastructure.models.EventStreamBuilder;
import io.es4j.infrastructure.models.FetchNextEvents;
import io.es4j.infrastructure.models.OffsetFilter;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.eventbus.Message;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/bus/Es4jService.class */
public class Es4jService {
    private final OffsetStore offsetStore;
    private final EventStore eventStore;
    private final Class<? extends Aggregate> aClass;
    protected static final Logger LOGGER = LoggerFactory.getLogger(Es4jService.class);
    private final Set<String> eventTypes;
    private final Map<String, JsonNode> commandSchemas;

    public Es4jService(OffsetStore offsetStore, EventStore eventStore, Class<? extends Aggregate> cls, List<AggregatorWrap> list, List<BehaviourWrap> list2) {
        this.offsetStore = offsetStore;
        this.eventStore = eventStore;
        this.aClass = cls;
        SchemaGenerator schemaGenerator = new SchemaGenerator(new SchemaGeneratorConfigBuilder(SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON).build());
        this.commandSchemas = (Map) list2.stream().map((v0) -> {
            return v0.commandClass();
        }).map(cls2 -> {
            try {
                schemaGenerator.generateSchema(cls2, new Type[0]);
                return Tuple2.of(CommandHandler.camelToKebab(cls2.getSimpleName()), schemaGenerator.generateSchema(cls2, new Type[0]));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toMap((v0) -> {
            return v0.getItem1();
        }, (v0) -> {
            return v0.getItem2();
        }));
        this.eventTypes = (Set) list.stream().map(aggregatorWrap -> {
            return aggregatorWrap.delegate().eventType();
        }).collect(Collectors.toSet());
    }

    public Uni<Void> register(Vertx vertx) {
        return vertx.eventBus().consumer(fetchNextEventsAddress(this.aClass)).handler(message -> {
            fetchNextEvents((FetchNextEvents) ((JsonObject) message.body()).mapTo(FetchNextEvents.class)).subscribe().with(list -> {
                message.reply(new JsonArray(list));
            }, th -> {
                handleThrowable(message, th);
            });
        }).exceptionHandler(this::handle).completionHandler().flatMap(r6 -> {
            return vertx.eventBus().consumer(resetOffsetAddress(this.aClass)).handler(message2 -> {
                reset(((JsonObject) message2.body()).getString("projectionId"), ((JsonObject) message2.body()).getString("tenant", "default"), ((JsonObject) message2.body()).getLong("idOffset", 0L)).subscribe().with(r4 -> {
                    message2.reply("void");
                }, th -> {
                    handleThrowable(message2, th);
                });
            }).exceptionHandler(this::handle).completionHandler().flatMap(r5 -> {
                return vertx.eventBus().consumer(fetchEventsAddress(this.aClass)).handler(message3 -> {
                    fetchEvents((EventFilter) ((JsonObject) message3.body()).mapTo(EventFilter.class)).subscribe().with(list -> {
                        message3.reply(new JsonArray(list));
                    }, th -> {
                        handleThrowable(message3, th);
                    });
                }).exceptionHandler(this::handle).completionHandler();
            }).flatMap(r52 -> {
                return vertx.eventBus().consumer(fetchOffsetsAddress(this.aClass)).handler(message3 -> {
                    offsets((OffsetFilter) ((JsonObject) message3.body()).mapTo(OffsetFilter.class)).subscribe().with(list -> {
                        message3.reply(new JsonArray(list));
                    }, th -> {
                        handleThrowable(message3, th);
                    });
                }).exceptionHandler(this::handle).completionHandler();
            }).flatMap(r53 -> {
                return vertx.eventBus().consumer(availableTypes(this.aClass)).handler(message3 -> {
                    message3.reply(JsonObject.mapFrom(new AvailableTypes(this.eventTypes, this.commandSchemas)));
                }).exceptionHandler(this::handle).completionHandler();
            });
        });
    }

    public static String resetOffsetAddress(Class<? extends Aggregate> cls) {
        return "/%s/event-consumer/reset/offset".formatted(CommandHandler.camelToKebab(cls.getSimpleName()));
    }

    public static String fetchNextEventsAddress(Class<? extends Aggregate> cls) {
        return "/%s/event-consumer/next".formatted(CommandHandler.camelToKebab(cls.getSimpleName()));
    }

    public static String fetchEventsAddress(Class<? extends Aggregate> cls) {
        return "/%s/event".formatted(CommandHandler.camelToKebab(cls.getSimpleName()));
    }

    public static String fetchOffsetsAddress(Class<? extends Aggregate> cls) {
        return "/%s/event-consumers".formatted(CommandHandler.camelToKebab(cls.getSimpleName()));
    }

    public static String availableTypes(Class<? extends Aggregate> cls) {
        return "/%s/available-types".formatted(CommandHandler.camelToKebab(cls.getSimpleName()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void handleThrowable(Message<JsonObject> message, Throwable th) {
        if (!(th instanceof Es4jException)) {
            message.fail(500, JsonObject.mapFrom(new Es4jError(th.getMessage(), Objects.nonNull(th.getCause()) ? th.getCause().getMessage() : th.getLocalizedMessage(), 500)).encode());
        } else {
            Es4jException es4jException = (Es4jException) th;
            message.fail(es4jException.error().externalErrorCode().intValue(), JsonObject.mapFrom(es4jException.error()).encode());
        }
    }

    private void handle(Throwable th) {
        LOGGER.error("Unhandled exception", th);
    }

    public Uni<List<Event>> fetchNextEvents(FetchNextEvents fetchNextEvents) {
        return this.offsetStore.get(new OffsetKey(fetchNextEvents.projectionId(), fetchNextEvents.tenantId())).flatMap(offset -> {
            return this.eventStore.fetch(EventStreamBuilder.builder().offset(offset.idOffSet()).batchSize(fetchNextEvents.batchSize()).tenantId(fetchNextEvents.tenantId()).tags(fetchNextEvents.tags()).aggregateIds(fetchNextEvents.aggregateIds()).build()).flatMap(list -> {
                return this.offsetStore.put(offset.updateOffset(list)).replaceWith(list);
            });
        });
    }

    public Uni<List<Event>> fetchEvents(EventFilter eventFilter) {
        return this.eventStore.fetch(EventStreamBuilder.builder().offset(eventFilter.offset()).batchSize(eventFilter.batchSize()).tenantId(eventFilter.tenantId()).tags(eventFilter.tags()).to(eventFilter.to()).from(eventFilter.from()).eventTypes(eventFilter.eventTypes()).versionFrom(eventFilter.versionFrom()).versionTo(eventFilter.versionTo()).aggregateIds(eventFilter.aggregateIds()).build());
    }

    public Uni<List<Offset>> offsets(OffsetFilter offsetFilter) {
        return this.offsetStore.projections(offsetFilter);
    }

    public Uni<Void> reset(String str, String str2, Long l) {
        return this.offsetStore.get(new OffsetKey(str, str2)).flatMap(offset -> {
            return this.offsetStore.put(OffsetBuilder.builder(offset).idOffSet((Long) Objects.requireNonNullElse(l, 0L)).build());
        }).replaceWithVoid();
    }
}
