package io.vlingo.xoom.symbio.store.journal.jdbc;

import com.google.gson.Gson;
import io.vlingo.xoom.actors.Logger;
import io.vlingo.xoom.common.Failure;
import io.vlingo.xoom.common.Outcome;
import io.vlingo.xoom.common.Success;
import io.vlingo.xoom.common.Tuple2;
import io.vlingo.xoom.common.identity.IdentityGenerator;
import io.vlingo.xoom.symbio.BaseEntry;
import io.vlingo.xoom.symbio.Entry;
import io.vlingo.xoom.symbio.State;
import io.vlingo.xoom.symbio.store.Result;
import io.vlingo.xoom.symbio.store.StorageException;
import io.vlingo.xoom.symbio.store.common.jdbc.Configuration;
import io.vlingo.xoom.symbio.store.common.jdbc.DatabaseType;
import io.vlingo.xoom.symbio.store.dispatch.Dispatchable;
import io.vlingo.xoom.symbio.store.dispatch.Dispatcher;
import io.vlingo.xoom.symbio.store.dispatch.DispatcherControl;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/* loaded from: input_file:io/vlingo/xoom/symbio/store/journal/jdbc/JDBCJournalInstantWriter.class */
public class JDBCJournalInstantWriter implements JDBCJournalWriter {
    private final Configuration configuration;
    private final Connection connection;
    private final JDBCQueries queries;
    private final List<Dispatcher<Dispatchable<Entry<String>, State.TextState>>> dispatchers;
    private final DispatcherControl dispatcherControl;
    private final Gson gson = new Gson();
    private final IdentityGenerator dispatchablesIdentityGenerator = new IdentityGenerator.RandomIdentityGenerator();
    private Logger logger;

    public JDBCJournalInstantWriter(Configuration configuration, List<Dispatcher<Dispatchable<Entry<String>, State.TextState>>> list, DispatcherControl dispatcherControl) throws Exception {
        this.configuration = configuration;
        this.connection = configuration.connection;
        this.dispatchers = list;
        this.dispatcherControl = dispatcherControl;
        this.connection.setAutoCommit(false);
        this.queries = JDBCQueries.queriesFor(configuration.connection);
    }

    @Override // io.vlingo.xoom.symbio.store.journal.jdbc.JDBCJournalWriter
    public void appendEntry(String str, int i, Entry<String> entry, Optional<State.TextState> optional, Consumer<Outcome<StorageException, Result>> consumer) {
        insertEntry(str, i, entry, consumer);
        optional.ifPresent(textState -> {
            insertSnapshot(str, i, textState, consumer);
        });
        Dispatchable<Entry<String>, State.TextState> insertDispatchable = insertDispatchable(str, i, Collections.singletonList(entry), optional.orElse(null), consumer);
        doCommit(consumer);
        dispatch(insertDispatchable);
        consumer.accept(Success.of(Result.Success));
    }

    @Override // io.vlingo.xoom.symbio.store.journal.jdbc.JDBCJournalWriter
    public void appendEntries(String str, int i, List<Entry<String>> list, Optional<State.TextState> optional, Consumer<Outcome<StorageException, Result>> consumer) {
        int i2 = i;
        Iterator<Entry<String>> it = list.iterator();
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            insertEntry(str, i3, it.next(), consumer);
        }
        optional.ifPresent(textState -> {
            insertSnapshot(str, i, textState, consumer);
        });
        Dispatchable<Entry<String>, State.TextState> insertDispatchable = insertDispatchable(str, i, list, optional.orElse(null), consumer);
        doCommit(consumer);
        dispatch(insertDispatchable);
        consumer.accept(Success.of(Result.Success));
    }

    @Override // io.vlingo.xoom.symbio.store.journal.jdbc.JDBCJournalWriter
    public void flush() {
    }

    @Override // io.vlingo.xoom.symbio.store.journal.jdbc.JDBCJournalWriter
    public void stop() {
        if (this.dispatcherControl != null) {
            this.dispatcherControl.stop();
        }
        try {
            this.queries.close();
        } catch (SQLException e) {
        }
    }

    @Override // io.vlingo.xoom.symbio.store.journal.jdbc.JDBCJournalWriter
    public void setLogger(Logger logger) {
        this.logger = logger;
    }

    private String buildDispatchId(String str, int i) {
        return str + ":" + i + ":" + this.dispatchablesIdentityGenerator.generate().toString();
    }

    private void dispatch(Dispatchable<Entry<String>, State.TextState> dispatchable) {
        if (this.dispatchers != null) {
            this.dispatchers.forEach(dispatcher -> {
                dispatcher.dispatch(dispatchable);
            });
        }
    }

    private void doCommit(Consumer<Outcome<StorageException, Result>> consumer) {
        try {
            this.configuration.connection.commit();
        } catch (SQLException e) {
            consumer.accept(Failure.of(new StorageException(Result.Failure, e.getMessage(), e)));
            this.logger.error("xoom-symbio-jdbc:journal-" + this.configuration.databaseType + ": Could not complete transaction", e);
            throw new IllegalStateException(e);
        }
    }

    private Dispatchable<Entry<String>, State.TextState> insertDispatchable(String str, int i, List<Entry<String>> list, State.TextState textState, Consumer<Outcome<StorageException, Result>> consumer) {
        Tuple2<PreparedStatement, Optional<String>> prepareInsertDispatchableQuery;
        Dispatchable<Entry<String>, State.TextState> dispatchable = new Dispatchable<>(buildDispatchId(str, i), LocalDateTime.now(), textState, list);
        String databaseType = this.configuration.databaseType.toString();
        try {
            String str2 = dispatchable.hasEntries() ? (String) dispatchable.entries().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.joining("|")) : "";
            String id = dispatchable.id();
            if (dispatchable.state().isPresent()) {
                State typedState = dispatchable.typedState();
                prepareInsertDispatchableQuery = this.queries.prepareInsertDispatchableQuery(id, this.configuration.originatorId, typedState.id, (String) typedState.data, typedState.dataVersion, typedState.type, typedState.typeVersion, this.gson.toJson(typedState.metadata), str2);
            } else {
                prepareInsertDispatchableQuery = this.queries.prepareInsertDispatchableQuery(id, this.configuration.originatorId, null, null, 0, null, 0, null, str2);
            }
            if (((PreparedStatement) prepareInsertDispatchableQuery._1).executeUpdate() == 1) {
                return dispatchable;
            }
            this.logger.error("xoom-symbio-jdbc:journal-" + databaseType + ": Could not insert dispatchable with id " + dispatchable.id());
            throw new IllegalStateException("xoom-symbio-jdbc:journal-" + databaseType + ": Could not insert snapshot");
        } catch (SQLException e) {
            consumer.accept(Failure.of(new StorageException(Result.Failure, e.getMessage(), e)));
            this.logger.error("xoom-symbio-jdbc:journal-" + databaseType + ": Could not insert dispatchable with id " + dispatchable.id(), e);
            throw new IllegalStateException(e);
        }
    }

    private void insertSnapshot(String str, int i, State.TextState textState, Consumer<Outcome<StorageException, Result>> consumer) {
        DatabaseType databaseType = this.configuration.databaseType;
        try {
            if (((PreparedStatement) this.queries.prepareInsertSnapshotQuery(str, i, (String) textState.data, textState.dataVersion, textState.type, textState.typeVersion, this.gson.toJson(textState.metadata))._1).executeUpdate() != 1) {
                this.logger.error("xoom-symbio-jdbc:journal-" + databaseType + ": Could not insert snapshot with id " + textState.id);
                throw new IllegalStateException("xoom-symbio-jdbc:journal-" + databaseType + ": Could not insert snapshot");
            }
        } catch (SQLException e) {
            consumer.accept(Failure.of(new StorageException(Result.Failure, e.getMessage(), e)));
            this.logger.error("xoom-symbio-jdbc:journal-" + databaseType + ": Could not insert event with id " + textState.id, e);
            throw new IllegalStateException(e);
        }
    }

    private void insertEntry(String str, int i, Entry<String> entry, Consumer<Outcome<StorageException, Result>> consumer) {
        DatabaseType databaseType = this.configuration.databaseType;
        try {
            Tuple2<PreparedStatement, Optional<String>> prepareInsertEntryQuery = this.queries.prepareInsertEntryQuery(str, i, (String) entry.entryData(), entry.typeName(), entry.typeVersion(), this.gson.toJson(entry.metadata()));
            if (((PreparedStatement) prepareInsertEntryQuery._1).executeUpdate() != 1) {
                this.logger.error("xoom-symbio-jdbc:journal-" + databaseType + ": Could not insert event " + entry.toString());
                throw new IllegalStateException("xoom-symbio-jdbc:journal-" + databaseType + ": Could not insert event");
            }
            if (((Optional) prepareInsertEntryQuery._2).isPresent()) {
                ((BaseEntry) entry).__internal__setId((String) ((Optional) prepareInsertEntryQuery._2).get());
            } else {
                long generatedKeyFrom = this.queries.generatedKeyFrom((PreparedStatement) prepareInsertEntryQuery._1);
                if (generatedKeyFrom > 0) {
                    ((BaseEntry) entry).__internal__setId(String.valueOf(generatedKeyFrom));
                }
            }
        } catch (SQLException e) {
            consumer.accept(Failure.of(new StorageException(Result.Failure, e.getMessage(), e)));
            this.logger.error("xoom-symbio-jdbc:journal-" + databaseType + ": Could not insert event " + entry.toString(), e);
            throw new IllegalStateException(e);
        }
    }
}
