package io.memoria.jutils.core.eventsourcing.stateless;

import io.memoria.jutils.core.eventsourcing.Command;
import io.memoria.jutils.core.eventsourcing.CommandHandler;
import io.memoria.jutils.core.eventsourcing.Decider;
import io.memoria.jutils.core.eventsourcing.Event;
import io.memoria.jutils.core.eventsourcing.Evolver;
import io.memoria.jutils.core.eventsourcing.State;
import io.memoria.jutils.core.transformer.StringTransformer;
import io.vavr.collection.Iterator;
import io.vavr.collection.List;
import io.vavr.control.Try;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import javax.sql.PooledConnection;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/memoria/jutils/core/eventsourcing/stateless/SqlCommandHandler.class */
public final class SqlCommandHandler<S extends State, C extends Command> implements CommandHandler<C> {
    private static final String ID_COL = "id";
    private static final String CREATED_AT_COL = "createdAt";
    private static final String PAYLOAD_COL = "payload";
    private final PooledConnection pooledConnection;
    private final StringTransformer stringTransformer;
    private final S initialState;
    private final Evolver<S> evolver;
    private final Decider<S, C> decider;
    private final Scheduler scheduler;

    public SqlCommandHandler(PooledConnection pooledConnection, StringTransformer stringTransformer, S s, Evolver<S> evolver, Decider<S, C> decider, Scheduler scheduler) {
        this.pooledConnection = pooledConnection;
        this.stringTransformer = stringTransformer;
        this.initialState = s;
        this.evolver = evolver;
        this.decider = decider;
        this.scheduler = scheduler;
    }

    public Flux<Event> apply(C c) {
        return Mono.fromCallable(() -> {
            Connection connection = this.pooledConnection.getConnection();
            connection.setAutoCommit(false);
            connection.setTransactionIsolation(2);
            String tableName = toTableName(c.aggId().value());
            createTableIfNotExists(connection, tableName);
            List<Event> list = (List) ((Try) this.decider.apply(this.evolver.apply((Evolver<S>) this.initialState, query(connection, tableName)), c)).get();
            if (add(connection, tableName, list) == list.length()) {
                connection.commit();
                return list;
            }
            connection.rollback();
            throw new SQLException("Couldn't commit events, rolling back");
        }).flatMapMany((v0) -> {
            return Flux.fromIterable(v0);
        }).subscribeOn(this.scheduler);
    }

    private int add(Connection connection, String str, List<Event> list) throws SQLException {
        PreparedStatement prepareStatement = connection.prepareStatement("INSERT INTO %s (%s, %s, %s) ".formatted(str, ID_COL, CREATED_AT_COL, PAYLOAD_COL) + "VALUES(?, ?, ?)");
        Iterator it = list.iterator();
        while (it.hasNext()) {
            Event event = (Event) it.next();
            String str2 = (String) this.stringTransformer.serialize(event).get();
            prepareStatement.setString(1, event.eventId().value());
            prepareStatement.setTimestamp(2, Timestamp.valueOf(event.createdAt()));
            prepareStatement.setString(3, str2);
            prepareStatement.addBatch();
        }
        return prepareStatement.executeBatch().length;
    }

    private List<Event> query(Connection connection, String str) throws SQLException {
        ResultSet executeQuery = connection.prepareStatement("Select %s from %s".formatted(PAYLOAD_COL, str)).executeQuery();
        ArrayList arrayList = new ArrayList();
        while (executeQuery.next()) {
            arrayList.add((Event) this.stringTransformer.deserialize(executeQuery.getString(PAYLOAD_COL), Event.class).get());
        }
        return List.ofAll(arrayList);
    }

    private static boolean createTableIfNotExists(Connection connection, String str) throws SQLException {
        return connection.prepareStatement("  CREATE TABLE IF NOT EXISTS %s (\n  id VARCHAR(36) NOT NULL,\n  createdAt TIMESTAMP NOT NULL,\n  payload TEXT NOT NULL,\n  PRIMARY KEY (id)\n)\n".formatted(str)).execute();
    }

    private static String toTableName(String str) {
        return str.replace(" ", "").replaceAll("[^A-Za-z0-9]", "");
    }
}
