package io.datakernel.ot;

import io.datakernel.async.function.AsyncSupplier;
import io.datakernel.async.util.LogUtils;
import io.datakernel.codec.StructuredCodec;
import io.datakernel.codec.StructuredCodecs;
import io.datakernel.codec.json.JsonUtils;
import io.datakernel.common.Preconditions;
import io.datakernel.common.Utils;
import io.datakernel.common.parse.ParseException;
import io.datakernel.common.sql.SqlUtils;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.jmx.EventloopJmxMBeanEx;
import io.datakernel.jmx.api.JmxAttribute;
import io.datakernel.ot.OTCommitFactory;
import io.datakernel.ot.util.IdGenerator;
import io.datakernel.promise.Promise;
import io.datakernel.promise.Promises;
import io.datakernel.promise.RetryPolicy;
import io.datakernel.promise.jmx.PromiseStats;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLTransactionRollbackException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/ot/OTRepositoryMySql.class */
public class OTRepositoryMySql<D> implements OTRepositoryEx<Long, D>, EventloopJmxMBeanEx {
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);
    public static final String DEFAULT_REVISION_TABLE = "ot_revisions";
    public static final String DEFAULT_DIFFS_TABLE = "ot_diffs";
    public static final String DEFAULT_BACKUP_TABLE = "ot_revisions_backup";
    private final Eventloop eventloop;
    private final Executor executor;
    private final DataSource dataSource;
    private final IdGenerator<Long> idGenerator;
    private final OTSystem<D> otSystem;
    private final StructuredCodec<List<D>> diffsCodec;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private String tableRevision = DEFAULT_REVISION_TABLE;
    private String tableDiffs = DEFAULT_DIFFS_TABLE;

    @Nullable
    private String tableBackup = DEFAULT_BACKUP_TABLE;
    private String createdBy = null;
    private final PromiseStats promiseCreateCommitId = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promisePush = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseGetHeads = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseLoadCommit = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseIsSnapshot = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseUpdateHeads = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseHasSnapshot = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseLoadSnapshot = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseSaveSnapshot = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);

    private OTRepositoryMySql(Eventloop eventloop, Executor executor, DataSource dataSource, IdGenerator<Long> idGenerator, OTSystem<D> oTSystem, StructuredCodec<List<D>> structuredCodec) {
        this.eventloop = eventloop;
        this.executor = executor;
        this.dataSource = dataSource;
        this.idGenerator = idGenerator;
        this.otSystem = oTSystem;
        this.diffsCodec = structuredCodec;
    }

    public static <D> OTRepositoryMySql<D> create(Eventloop eventloop, Executor executor, DataSource dataSource, IdGenerator<Long> idGenerator, OTSystem<D> oTSystem, StructuredCodec<D> structuredCodec) {
        return new OTRepositoryMySql<>(eventloop, executor, dataSource, idGenerator, oTSystem, JsonUtils.indent(StructuredCodecs.ofList(structuredCodec), "\t"));
    }

    public OTRepositoryMySql<D> withCreatedBy(String str) {
        this.createdBy = str;
        return this;
    }

    public OTRepositoryMySql<D> withCustomTableNames(String str, String str2, @Nullable String str3) {
        this.tableRevision = str;
        this.tableDiffs = str2;
        this.tableBackup = str3;
        return this;
    }

    public DataSource getDataSource() {
        return this.dataSource;
    }

    public StructuredCodec<List<D>> getDiffsCodec() {
        return this.diffsCodec;
    }

    private String sql(String str) {
        return str.replace("{revisions}", this.tableRevision).replace("{diffs}", this.tableDiffs).replace("{backup}", Objects.toString(this.tableBackup, ""));
    }

    private static <T> Promise<T> retryRollbacks(AsyncSupplier<T> asyncSupplier) {
        return Promises.retry(asyncSupplier, (obj, th) -> {
            return th == null || !(th instanceof SQLTransactionRollbackException);
        }, RetryPolicy.exponentialBackoff(Duration.ofMillis(1L), Duration.ofSeconds(1L)));
    }

    public void initialize() throws IOException, SQLException {
        this.logger.trace("Initializing tables");
        SqlUtils.execute(this.dataSource, sql(new String(Utils.loadResource("sql/ot_diffs.sql"), StandardCharsets.UTF_8)));
        SqlUtils.execute(this.dataSource, sql(new String(Utils.loadResource("sql/ot_revisions.sql"), StandardCharsets.UTF_8)));
        if (this.tableBackup != null) {
            SqlUtils.execute(this.dataSource, sql(new String(Utils.loadResource("sql/ot_revisions_backup.sql"), StandardCharsets.UTF_8)));
        }
    }

    public void truncateTables() throws SQLException {
        this.logger.trace("Truncate tables");
        Connection connection = this.dataSource.getConnection();
        try {
            Statement createStatement = connection.createStatement();
            createStatement.execute(sql("TRUNCATE TABLE {diffs}"));
            createStatement.execute(sql("TRUNCATE TABLE {revisions}"));
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public Promise<Long> createCommitId() {
        return this.idGenerator.createId();
    }

    @Override // io.datakernel.ot.OTCommitFactory
    public Promise<OTCommit<Long, D>> createCommit(Map<Long, OTCommitFactory.DiffsWithLevel<D>> map) {
        return createCommitId().map(l -> {
            return OTCommit.of(0, l, map);
        });
    }

    private String toJson(List<D> list) {
        return JsonUtils.toJson(this.diffsCodec, list);
    }

    private List<D> fromJson(String str) throws ParseException {
        return (List) JsonUtils.fromJson(this.diffsCodec, str);
    }

    @Override // io.datakernel.ot.OTRepository
    public Promise<Void> push(Collection<OTCommit<Long, D>> collection) {
        return collection.isEmpty() ? Promise.complete() : retryRollbacks(() -> {
            return doPush(collection);
        });
    }

    @NotNull
    private Promise<Void> doPush(Collection<OTCommit<Long, D>> collection) {
        return Promise.ofBlockingCallable(this.executor, () -> {
            Connection connection = this.dataSource.getConnection();
            try {
                connection.setAutoCommit(false);
                connection.setTransactionIsolation(2);
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    OTCommit oTCommit = (OTCommit) it.next();
                    PreparedStatement prepareStatement = connection.prepareStatement(sql("INSERT INTO {revisions}(`id`, `epoch`, `type`, `created_by`, `level`) VALUES (?, ?, 'INNER', ?, ?)"));
                    try {
                        prepareStatement.setLong(1, ((Long) oTCommit.getId()).longValue());
                        prepareStatement.setInt(2, oTCommit.getEpoch());
                        prepareStatement.setString(3, this.createdBy);
                        prepareStatement.setLong(4, oTCommit.getLevel());
                        prepareStatement.executeUpdate();
                        if (prepareStatement != null) {
                            prepareStatement.close();
                        }
                        for (Long l : oTCommit.getParents().keySet()) {
                            List<D> list = (List) oTCommit.getParents().get(l);
                            prepareStatement = connection.prepareStatement(sql("INSERT INTO {diffs}(`revision_id`, `parent_id`, `diff`) VALUES (?, ?, ?)"));
                            try {
                                prepareStatement.setLong(1, ((Long) oTCommit.getId()).longValue());
                                prepareStatement.setLong(2, l.longValue());
                                prepareStatement.setString(3, toJson(list));
                                prepareStatement.executeUpdate();
                                if (prepareStatement != null) {
                                    prepareStatement.close();
                                }
                            } finally {
                            }
                        }
                    } finally {
                    }
                }
                connection.commit();
                if (connection != null) {
                    connection.close();
                }
                return (Void) null;
            } catch (Throwable th) {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }).whenComplete(this.promisePush.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{collection}));
    }

    @Override // io.datakernel.ot.OTRepository
    @NotNull
    public Promise<Void> updateHeads(Set<Long> set, Set<Long> set2) {
        return retryRollbacks(() -> {
            return doUpdateHeads(set, set2);
        });
    }

    @NotNull
    private Promise<Void> doUpdateHeads(Set<Long> set, Set<Long> set2) {
        return Promise.ofBlockingCallable(this.executor, () -> {
            Connection connection = this.dataSource.getConnection();
            try {
                connection.setAutoCommit(false);
                connection.setTransactionIsolation(2);
                updateRevisions(set, connection, "HEAD");
                updateRevisions(set2, connection, "INNER");
                connection.commit();
                if (connection != null) {
                    connection.close();
                }
                return (Void) null;
            } catch (Throwable th) {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }).whenComplete(this.promiseUpdateHeads.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{set, set2}));
    }

    @Override // io.datakernel.ot.OTRepository
    @NotNull
    public Promise<Set<Long>> getAllHeads() {
        return Promise.ofBlockingCallable(this.executor, () -> {
            Connection connection = this.dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(sql("SELECT `id` FROM {revisions} WHERE `type`='HEAD'"));
                try {
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    HashSet hashSet = new HashSet();
                    while (executeQuery.next()) {
                        hashSet.add(Long.valueOf(executeQuery.getLong(1)));
                    }
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                    return hashSet;
                } catch (Throwable th) {
                    if (prepareStatement != null) {
                        try {
                            prepareStatement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }).whenComplete(this.promiseGetHeads.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[0]));
    }

    @Override // io.datakernel.ot.OTRepository
    @NotNull
    public Promise<OTCommit<Long, D>> loadCommit(@NotNull Long l) {
        return Promise.ofBlockingCallable(this.executor, () -> {
            Connection connection = this.dataSource.getConnection();
            try {
                HashMap hashMap = new HashMap();
                int i = 0;
                long j = 0;
                PreparedStatement prepareStatement = connection.prepareStatement(sql("SELECT  {revisions}.`epoch`, {revisions}.`level`, UNIX_TIMESTAMP({revisions}.`timestamp`) AS `timestamp`,  {diffs}.`parent_id`,  {diffs}.`diff` FROM {revisions} LEFT JOIN {diffs} ON {diffs}.`revision_id`={revisions}.`id` WHERE {revisions}.`id`=? AND `type` IN ('HEAD', 'INNER')"));
                try {
                    prepareStatement.setLong(1, l.longValue());
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    while (executeQuery.next()) {
                        i = executeQuery.getInt(1);
                        long j2 = executeQuery.getLong(2);
                        j = executeQuery.getLong(3) * 1000;
                        long j3 = executeQuery.getLong(4);
                        String string = executeQuery.getString(5);
                        if (string != null) {
                            hashMap.put(Long.valueOf(j3), new OTCommitFactory.DiffsWithLevel(j2 - 1, fromJson(string)));
                        }
                    }
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    if (j == 0) {
                        throw new IOException("No commit with id: " + l);
                    }
                    OTCommit withTimestamp = OTCommit.of(i, l, hashMap).withTimestamp(j);
                    if (connection != null) {
                        connection.close();
                    }
                    return withTimestamp;
                } finally {
                }
            } catch (Throwable th) {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }).whenComplete(this.promiseLoadCommit.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{l}));
    }

    @Override // io.datakernel.ot.OTRepository
    @NotNull
    public Promise<Boolean> hasSnapshot(@NotNull Long l) {
        return Promise.ofBlockingCallable(this.executor, () -> {
            Connection connection = this.dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(sql("SELECT `snapshot` IS NOT NULL FROM {revisions} WHERE `id`=?"));
                try {
                    prepareStatement.setLong(1, l.longValue());
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    if (!executeQuery.next()) {
                        if (prepareStatement != null) {
                            prepareStatement.close();
                        }
                        if (connection != null) {
                            connection.close();
                        }
                        return false;
                    }
                    Boolean valueOf = Boolean.valueOf(executeQuery.getBoolean(1));
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                    return valueOf;
                } finally {
                }
            } catch (Throwable th) {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }).whenComplete(this.promiseHasSnapshot.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{l}));
    }

    @Override // io.datakernel.ot.OTRepository
    @NotNull
    public Promise<Optional<List<D>>> loadSnapshot(@NotNull Long l) {
        return Promise.ofBlockingCallable(this.executor, () -> {
            Connection connection = this.dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(sql("SELECT `snapshot` FROM {revisions} WHERE `id`=?"));
                try {
                    prepareStatement.setLong(1, l.longValue());
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    if (!executeQuery.next()) {
                        Optional empty = Optional.empty();
                        if (prepareStatement != null) {
                            prepareStatement.close();
                        }
                        if (connection != null) {
                            connection.close();
                        }
                        return empty;
                    }
                    String string = executeQuery.getString(1);
                    if (string == null) {
                        Optional empty2 = Optional.empty();
                        if (prepareStatement != null) {
                            prepareStatement.close();
                        }
                        if (connection != null) {
                            connection.close();
                        }
                        return empty2;
                    }
                    Optional of = Optional.of(this.otSystem.squash(fromJson(string)));
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                    return of;
                } finally {
                }
            } catch (Throwable th) {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }).whenComplete(this.promiseLoadSnapshot.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{l}));
    }

    @Override // io.datakernel.ot.OTRepository
    @NotNull
    public Promise<Void> saveSnapshot(@NotNull Long l, @NotNull List<D> list) {
        return Promise.ofBlockingCallable(this.executor, () -> {
            Connection connection = this.dataSource.getConnection();
            try {
                connection.setAutoCommit(true);
                connection.setTransactionIsolation(2);
                String json = toJson(this.otSystem.squash(list));
                PreparedStatement prepareStatement = connection.prepareStatement(sql("UPDATE {revisions} SET `snapshot`=? WHERE `id`=?"));
                try {
                    prepareStatement.setString(1, json);
                    prepareStatement.setLong(2, l.longValue());
                    prepareStatement.executeUpdate();
                    Void r0 = (Void) null;
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                    return r0;
                } finally {
                }
            } catch (Throwable th) {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }).whenComplete(this.promiseSaveSnapshot.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{l, list}));
    }

    @Override // io.datakernel.ot.OTRepositoryEx
    public Promise<Void> cleanup(Long l) {
        return retryRollbacks(() -> {
            return doCleanup(l);
        });
    }

    @NotNull
    private Promise<Void> doCleanup(Long l) {
        return Promise.ofBlockingCallable(this.executor, () -> {
            Connection connection = this.dataSource.getConnection();
            try {
                connection.setAutoCommit(false);
                connection.setTransactionIsolation(2);
                PreparedStatement prepareStatement = connection.prepareStatement(sql("DELETE FROM {revisions} WHERE `type` in ('HEAD', 'INNER') AND `level` <   (SELECT t2.`level` FROM (SELECT t.`level` FROM {revisions} t WHERE t.`id`=?) AS t2)-1"));
                try {
                    prepareStatement.setLong(1, l.longValue());
                    prepareStatement.executeUpdate();
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    prepareStatement = connection.prepareStatement(sql("DELETE FROM {diffs} WHERE NOT EXISTS (SELECT * FROM {revisions} WHERE {revisions}.`id`={diffs}.`revision_id`)"));
                    try {
                        prepareStatement.executeUpdate();
                        if (prepareStatement != null) {
                            prepareStatement.close();
                        }
                        connection.commit();
                        if (connection != null) {
                            connection.close();
                        }
                        return (Void) null;
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th) {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{l}));
    }

    @Override // io.datakernel.ot.OTRepositoryEx
    public Promise<Void> backup(OTCommit<Long, D> oTCommit, List<D> list) {
        Preconditions.checkNotNull(this.tableBackup, "Cannot backup when backup table is null");
        return Promise.ofBlockingCallable(this.executor, () -> {
            Connection connection = this.dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(sql("INSERT INTO {backup}(`id`, `epoch`, `level`, `snapshot`) VALUES (?, ?, ?, ?)"));
                try {
                    prepareStatement.setLong(1, ((Long) oTCommit.getId()).longValue());
                    prepareStatement.setInt(2, oTCommit.getEpoch());
                    prepareStatement.setLong(3, oTCommit.getLevel());
                    prepareStatement.setString(4, toJson(list));
                    prepareStatement.executeUpdate();
                    Void r0 = (Void) null;
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                    return r0;
                } finally {
                }
            } catch (Throwable th) {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{oTCommit.getId(), list}));
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    private void updateRevisions(Collection<Long> collection, Connection connection, String str) throws SQLException {
        if (collection.isEmpty()) {
            return;
        }
        PreparedStatement prepareStatement = connection.prepareStatement(sql("UPDATE {revisions} SET `type`='" + str + "' WHERE `id` IN " + ((String) Stream.generate(() -> {
            return "?";
        }).limit(collection.size()).collect(Collectors.joining(", ", "(", ")")))));
        try {
            int i = 1;
            Iterator<Long> it = collection.iterator();
            while (it.hasNext()) {
                int i2 = i;
                i++;
                prepareStatement.setLong(i2, it.next().longValue());
            }
            prepareStatement.executeUpdate();
            if (prepareStatement != null) {
                prepareStatement.close();
            }
        } catch (Throwable th) {
            if (prepareStatement != null) {
                try {
                    prepareStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @JmxAttribute
    public PromiseStats getPromiseCreateCommitId() {
        return this.promiseCreateCommitId;
    }

    @JmxAttribute
    public PromiseStats getPromisePush() {
        return this.promisePush;
    }

    @JmxAttribute
    public PromiseStats getPromiseGetHeads() {
        return this.promiseGetHeads;
    }

    @JmxAttribute
    public PromiseStats getPromiseLoadCommit() {
        return this.promiseLoadCommit;
    }

    @JmxAttribute
    public PromiseStats getPromiseIsSnapshot() {
        return this.promiseIsSnapshot;
    }

    @JmxAttribute
    public PromiseStats getPromiseHasSnapshot() {
        return this.promiseHasSnapshot;
    }

    @JmxAttribute
    public PromiseStats getPromiseLoadSnapshot() {
        return this.promiseLoadSnapshot;
    }

    @JmxAttribute
    public PromiseStats getPromiseSaveSnapshot() {
        return this.promiseSaveSnapshot;
    }
}
