package io.es4j.infra.pg;

import com.google.auto.service.AutoService;
import io.es4j.Aggregate;
import io.es4j.core.CommandHandler;
import io.es4j.infra.pg.mappers.EventStoreMapper;
import io.es4j.infra.pg.models.EventRecord;
import io.es4j.infra.pg.models.EventRecordKey;
import io.es4j.infra.pg.models.EventRecordQuery;
import io.es4j.infrastructure.EventStore;
import io.es4j.infrastructure.models.AggregateEventStream;
import io.es4j.infrastructure.models.AppendInstruction;
import io.es4j.infrastructure.models.Event;
import io.es4j.infrastructure.models.EventStream;
import io.es4j.infrastructure.models.PruneEventStream;
import io.es4j.infrastructure.models.StartStream;
import io.es4j.sql.LiquibaseHandler;
import io.es4j.sql.Repository;
import io.es4j.sql.RepositoryHandler;
import io.es4j.sql.exceptions.NotFound;
import io.es4j.sql.models.BaseRecord;
import io.es4j.sql.models.QueryOptions;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService({EventStore.class})
/* loaded from: input_file:io/es4j/infra/pg/PgEventStore.class */
public class PgEventStore implements EventStore {
    private Repository<EventRecordKey, EventRecord, EventRecordQuery> eventJournal;
    private final Logger LOGGER = LoggerFactory.getLogger(PgEventStore.class);

    public void start(Class<? extends Aggregate> cls, Vertx vertx, JsonObject jsonObject) {
        this.eventJournal = new Repository<>(EventStoreMapper.INSTANCE, RepositoryHandler.leasePool(jsonObject, vertx));
    }

    public <T extends Aggregate> Uni<List<Event>> fetch(AggregateEventStream<T> aggregateEventStream) {
        return this.eventJournal.query(eventJournalQuery(aggregateEventStream)).onFailure(NotFound.class).recoverWithItem(new ArrayList()).map(list -> {
            return list.stream().map(eventRecord -> {
                return new Event(eventRecord.aggregateId(), eventRecord.eventClass(), eventRecord.eventVersion(), eventRecord.event(), eventRecord.baseRecord().tenant(), eventRecord.commandId(), eventRecord.tags(), eventRecord.schemaVersion());
            }).toList();
        });
    }

    public <T extends Aggregate> Uni<Void> stream(AggregateEventStream<T> aggregateEventStream, Consumer<Event> consumer) {
        return this.eventJournal.stream(eventRecord -> {
            consumer.accept(new Event(eventRecord.aggregateId(), eventRecord.eventClass(), eventRecord.eventVersion(), eventRecord.event(), eventRecord.baseRecord().tenant(), eventRecord.commandId(), eventRecord.tags(), eventRecord.schemaVersion()));
        }, eventJournalQuery(aggregateEventStream));
    }

    public Uni<Void> stream(EventStream eventStream, Consumer<Event> consumer) {
        return this.eventJournal.stream(eventRecord -> {
            consumer.accept(new Event(eventRecord.aggregateId(), eventRecord.eventClass(), eventRecord.eventVersion(), eventRecord.event(), eventRecord.baseRecord().tenant(), eventRecord.commandId(), eventRecord.tags(), eventRecord.schemaVersion()));
        }, eventJournalQuery(eventStream));
    }

    public Uni<List<Event>> fetch(EventStream eventStream) {
        return this.eventJournal.query(eventJournalQuery(eventStream)).map(list -> {
            return list.stream().map(eventRecord -> {
                return new Event(eventRecord.id(), eventRecord.aggregateId(), eventRecord.eventClass(), eventRecord.eventVersion(), eventRecord.event(), eventRecord.baseRecord().tenant(), eventRecord.commandId(), eventRecord.tags(), eventRecord.schemaVersion());
            }).toList();
        });
    }

    public <T extends Aggregate> Uni<Void> append(AppendInstruction<T> appendInstruction) {
        return this.eventJournal.insertBatch(parseInstruction(appendInstruction));
    }

    public <T extends Aggregate> Uni<Void> startStream(StartStream<T> startStream) {
        return Uni.createFrom().voidItem();
    }

    public Uni<Void> stop() {
        return this.eventJournal.repositoryHandler().close();
    }

    public Uni<Void> setup(Class<? extends Aggregate> cls, Vertx vertx, JsonObject jsonObject) {
        this.LOGGER.debug("Migrating database for {} with configuration {}", cls.getSimpleName(), jsonObject);
        return LiquibaseHandler.liquibaseString(vertx, jsonObject, "pg-event-store.xml", Map.of("schema", CommandHandler.camelToKebab(cls.getSimpleName())));
    }

    public <T extends Aggregate> Uni<Void> trim(PruneEventStream<T> pruneEventStream) {
        return Uni.createFrom().voidItem();
    }

    private <T extends Aggregate> List<EventRecord> parseInstruction(AppendInstruction<T> appendInstruction) {
        return appendInstruction.events().stream().map(event -> {
            return new EventRecord(event.aggregateId(), event.eventClass(), event.eventVersion(), event.event(), event.commandId(), event.tags(), event.schemaVersion(), BaseRecord.newRecord(event.tenantId()));
        }).toList();
    }

    private <T extends Aggregate> EventRecordQuery eventJournalQuery(AggregateEventStream<T> aggregateEventStream) {
        return new EventRecordQuery(List.of(aggregateEventStream.aggregateId()), null, null, null, aggregateEventStream.eventVersionOffset(), null, null, null, new QueryOptions(EventStoreMapper.EVENT_VERSION, false, (Instant) null, (Instant) null, (Instant) null, (Instant) null, 0, aggregateEventStream.maxSize(), startingOffset(aggregateEventStream), aggregateEventStream.tenantId()));
    }

    private static <T extends Aggregate> String startingOffset(AggregateEventStream<T> aggregateEventStream) {
        if (aggregateEventStream.journalOffset() != null && aggregateEventStream.journalOffset().longValue() == 0) {
            return String.valueOf(0);
        }
        if (aggregateEventStream.startFrom() != null) {
            return "(select max(id) from event_journal where event_class = '" + aggregateEventStream.startFrom().getName() + "' and aggregateId = '" + aggregateEventStream.aggregateId() + "')";
        }
        return null;
    }

    private EventRecordQuery eventJournalQuery(EventStream eventStream) {
        return new EventRecordQuery(eventStream.aggregateIds(), eventStream.events() != null ? eventStream.events().stream().map((v0) -> {
            return v0.getName();
        }).toList() : null, null, eventStream.tags(), null, eventStream.versionTo(), eventStream.offset(), null, new QueryOptions(EventStoreMapper.ID, false, eventStream.from(), eventStream.to(), (Instant) null, (Instant) null, (Integer) null, (Integer) null, (String) null, eventStream.tenantId()));
    }
}
