package io.vlingo.symbio.store.journal.jdbc;

import com.google.gson.Gson;
import io.vlingo.actors.Actor;
import io.vlingo.actors.Definition;
import io.vlingo.common.Completes;
import io.vlingo.common.Failure;
import io.vlingo.common.Success;
import io.vlingo.common.Tuple2;
import io.vlingo.common.identity.IdentityGenerator;
import io.vlingo.symbio.BaseEntry;
import io.vlingo.symbio.Entry;
import io.vlingo.symbio.EntryAdapterProvider;
import io.vlingo.symbio.Metadata;
import io.vlingo.symbio.Source;
import io.vlingo.symbio.State;
import io.vlingo.symbio.StateAdapterProvider;
import io.vlingo.symbio.store.Result;
import io.vlingo.symbio.store.StorageException;
import io.vlingo.symbio.store.common.jdbc.Configuration;
import io.vlingo.symbio.store.common.jdbc.DatabaseType;
import io.vlingo.symbio.store.dispatch.Dispatchable;
import io.vlingo.symbio.store.dispatch.Dispatcher;
import io.vlingo.symbio.store.dispatch.DispatcherControl;
import io.vlingo.symbio.store.dispatch.control.DispatcherControlActor;
import io.vlingo.symbio.store.journal.Journal;
import io.vlingo.symbio.store.journal.JournalReader;
import io.vlingo.symbio.store.journal.StreamReader;
import io.vlingo.symbio.store.journal.jdbc.JDBCJournalReaderActor;
import io.vlingo.symbio.store.journal.jdbc.JDBCStreamReaderActor;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/* loaded from: input_file:io/vlingo/symbio/store/journal/jdbc/JDBCJournalActor.class */
public class JDBCJournalActor extends Actor implements Journal<String> {
    private final EntryAdapterProvider entryAdapterProvider;
    private final StateAdapterProvider stateAdapterProvider;
    private final Configuration configuration;
    private final Connection connection;
    private final DatabaseType databaseType;
    private final Gson gson;
    private final Map<String, JournalReader<BaseEntry.TextEntry>> journalReaders;
    private final Map<String, StreamReader<String>> streamReaders;
    private final IdentityGenerator dispatchablesIdentityGenerator;
    private final List<Dispatcher<Dispatchable<Entry<String>, State.TextState>>> dispatchers;
    private final DispatcherControl dispatcherControl;
    private final JDBCQueries queries;

    public JDBCJournalActor(Configuration configuration) throws Exception {
        this((List) null, configuration, 1000L, 1000L);
    }

    public JDBCJournalActor(Dispatcher<Dispatchable<Entry<String>, State.TextState>> dispatcher, Configuration configuration) throws Exception {
        this(Arrays.asList(dispatcher), configuration, 1000L, 1000L);
    }

    public JDBCJournalActor(List<Dispatcher<Dispatchable<Entry<String>, State.TextState>>> list, Configuration configuration, long j, long j2) throws Exception {
        this.configuration = configuration;
        this.connection = configuration.connection;
        this.databaseType = configuration.databaseType;
        this.connection.setAutoCommit(false);
        this.queries = JDBCQueries.queriesFor(configuration.connection);
        this.queries.createTables();
        this.gson = new Gson();
        this.entryAdapterProvider = EntryAdapterProvider.instance(stage().world());
        this.stateAdapterProvider = StateAdapterProvider.instance(stage().world());
        this.journalReaders = new HashMap();
        this.streamReaders = new HashMap();
        this.dispatchablesIdentityGenerator = new IdentityGenerator.RandomIdentityGenerator();
        if (list == null) {
            this.dispatchers = null;
            this.dispatcherControl = null;
        } else {
            this.dispatchers = list;
            this.dispatcherControl = (DispatcherControl) stage().actorFor(DispatcherControl.class, Definition.has(DispatcherControlActor.class, new DispatcherControl.DispatcherControlInstantiator(list, new JDBCDispatcherControlDelegate(Configuration.cloneOf(configuration), stage().world().defaultLogger()), j, j2)));
        }
    }

    public void stop() {
        if (this.dispatcherControl != null) {
            this.dispatcherControl.stop();
        }
        try {
            this.queries.close();
        } catch (SQLException e) {
        }
        super.stop();
    }

    public <S, ST> void append(String str, int i, Source<S> source, Metadata metadata, Journal.AppendResultInterest appendResultInterest, Object obj) {
        Consumer<Exception> consumer = exc -> {
            appendResultedInFailure(str, i, source, null, appendResultInterest, obj, exc);
        };
        Entry<String> asEntry = asEntry(source, i, metadata, consumer);
        insertEntry(str, i, asEntry, consumer);
        Dispatchable<Entry<String>, State.TextState> buildDispatchable = buildDispatchable(str, i, Collections.singletonList(asEntry), null);
        insertDispatchable(buildDispatchable, consumer);
        doCommit(consumer);
        dispatch(buildDispatchable);
        appendResultInterest.appendResultedIn(Success.of(Result.Success), str, i, source, Optional.empty(), obj);
    }

    public <S, ST> void appendWith(String str, int i, Source<S> source, Metadata metadata, ST st, Journal.AppendResultInterest appendResultInterest, Object obj) {
        Consumer<Exception> consumer = exc -> {
            appendResultedInFailure(str, i, source, st, appendResultInterest, obj, exc);
        };
        Entry<String> asEntry = asEntry(source, i, metadata, consumer);
        insertEntry(str, i, asEntry, consumer);
        Tuple2<Optional<ST>, Optional<State.TextState>> state = toState(str, st, i);
        ((Optional) state._2).ifPresent(textState -> {
            insertSnapshot(str, i, textState, consumer);
        });
        Dispatchable<Entry<String>, State.TextState> buildDispatchable = buildDispatchable(str, i, Collections.singletonList(asEntry), (State.TextState) ((Optional) state._2).orElse(null));
        insertDispatchable(buildDispatchable, consumer);
        doCommit(consumer);
        dispatch(buildDispatchable);
        appendResultInterest.appendResultedIn(Success.of(Result.Success), str, i, source, (Optional) state._1, obj);
    }

    public <S, ST> void appendAll(String str, int i, List<Source<S>> list, Metadata metadata, Journal.AppendResultInterest appendResultInterest, Object obj) {
        Consumer<Exception> consumer = exc -> {
            appendAllResultedInFailure(str, i, list, null, appendResultInterest, obj, exc);
        };
        List<Entry<String>> asEntries = asEntries(list, i, metadata, consumer);
        int i2 = i;
        Iterator<Entry<String>> it = asEntries.iterator();
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            insertEntry(str, i3, it.next(), consumer);
        }
        Dispatchable<Entry<String>, State.TextState> buildDispatchable = buildDispatchable(str, i, asEntries, null);
        insertDispatchable(buildDispatchable, consumer);
        doCommit(consumer);
        dispatch(buildDispatchable);
        appendResultInterest.appendAllResultedIn(Success.of(Result.Success), str, i, list, Optional.empty(), obj);
    }

    public <S, ST> void appendAllWith(String str, int i, List<Source<S>> list, Metadata metadata, ST st, Journal.AppendResultInterest appendResultInterest, Object obj) {
        Consumer<Exception> consumer = exc -> {
            appendAllResultedInFailure(str, i, list, st, appendResultInterest, obj, exc);
        };
        List<Entry<String>> asEntries = asEntries(list, i, metadata, consumer);
        int i2 = i;
        Iterator<Entry<String>> it = asEntries.iterator();
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            insertEntry(str, i3, it.next(), consumer);
        }
        Tuple2<Optional<ST>, Optional<State.TextState>> state = toState(str, st, i);
        ((Optional) state._2).ifPresent(textState -> {
            insertSnapshot(str, i, textState, consumer);
        });
        Dispatchable<Entry<String>, State.TextState> buildDispatchable = buildDispatchable(str, i, asEntries, (State.TextState) ((Optional) state._2).orElse(null));
        insertDispatchable(buildDispatchable, consumer);
        doCommit(consumer);
        dispatch(buildDispatchable);
        appendResultInterest.appendAllResultedIn(Success.of(Result.Success), str, i, list, (Optional) state._1, obj);
    }

    public Completes<JournalReader<? extends Entry<?>>> journalReader(String str) {
        return completes().with(this.journalReaders.computeIfAbsent(str, str2 -> {
            return (JournalReader) stage().actorFor(JournalReader.class, Definition.has(JDBCJournalReaderActor.class, new JDBCJournalReaderActor.JDBCJournalReaderInstantiator(this.configuration, str)), stage().world().addressFactory().uniquePrefixedWith("eventJournalReader-" + str));
        }));
    }

    public Completes<StreamReader<String>> streamReader(String str) {
        return completes().with(this.streamReaders.computeIfAbsent(str, str2 -> {
            return (StreamReader) stage().actorFor(StreamReader.class, Definition.has(JDBCStreamReaderActor.class, new JDBCStreamReaderActor.JDBCStreamReaderInstantiator(this.configuration)), stage().world().addressFactory().uniquePrefixedWith("eventStreamReader-" + str2));
        }));
    }

    protected final void insertEntry(String str, int i, Entry<String> entry, Consumer<Exception> consumer) {
        try {
            Tuple2<PreparedStatement, Optional<String>> prepareInsertEntryQuery = this.queries.prepareInsertEntryQuery(str, i, (String) entry.entryData(), entry.typeName(), entry.typeVersion(), this.gson.toJson(entry.metadata()));
            if (((PreparedStatement) prepareInsertEntryQuery._1).executeUpdate() != 1) {
                logger().error("vlingo-symbio-jdbc:journal-" + this.databaseType + ": Could not insert event " + entry.toString());
                throw new IllegalStateException("vlingo-symbio-jdbc:journal-" + this.databaseType + ": Could not insert event");
            }
            if (((Optional) prepareInsertEntryQuery._2).isPresent()) {
                ((BaseEntry) entry).__internal__setId(String.valueOf(((Optional) prepareInsertEntryQuery._2).get()));
            } else {
                long generatedKeyFrom = this.queries.generatedKeyFrom((PreparedStatement) prepareInsertEntryQuery._1);
                if (generatedKeyFrom > 0) {
                    ((BaseEntry) entry).__internal__setId(String.valueOf(generatedKeyFrom));
                }
            }
        } catch (SQLException e) {
            consumer.accept(e);
            logger().error("vlingo-symbio-jdbc:journal-" + this.databaseType + ": Could not insert event " + entry.toString(), e);
            throw new IllegalStateException(e);
        }
    }

    protected final void insertSnapshot(String str, int i, State.TextState textState, Consumer<Exception> consumer) {
        try {
            if (((PreparedStatement) this.queries.prepareInsertSnapshotQuery(str, i, (String) textState.data, textState.dataVersion, textState.type, textState.typeVersion, this.gson.toJson(textState.metadata))._1).executeUpdate() != 1) {
                logger().error("vlingo-symbio-jdbc:journal-" + this.databaseType + ": Could not insert snapshot with id " + textState.id);
                throw new IllegalStateException("vlingo-symbio-jdbc:journal-" + this.databaseType + ": Could not insert snapshot");
            }
        } catch (SQLException e) {
            consumer.accept(e);
            logger().error("vlingo-symbio-jdbc:journal-" + this.databaseType + ": Could not insert event with id " + textState.id, e);
            throw new IllegalStateException(e);
        }
    }

    protected final void insertDispatchable(Dispatchable<Entry<String>, State.TextState> dispatchable, Consumer<Exception> consumer) {
        Tuple2<PreparedStatement, Optional<String>> prepareInsertDispatchableQuery;
        String databaseType = this.configuration.databaseType.toString();
        try {
            String str = dispatchable.hasEntries() ? (String) dispatchable.entries().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.joining("|")) : "";
            String id = dispatchable.id();
            if (dispatchable.state().isPresent()) {
                State typedState = dispatchable.typedState();
                prepareInsertDispatchableQuery = this.queries.prepareInsertDispatchableQuery(id, this.configuration.originatorId, typedState.id, (String) typedState.data, typedState.dataVersion, typedState.type, typedState.typeVersion, this.gson.toJson(typedState.metadata), str);
            } else {
                prepareInsertDispatchableQuery = this.queries.prepareInsertDispatchableQuery(id, this.configuration.originatorId, null, null, 0, null, 0, null, str);
            }
            if (((PreparedStatement) prepareInsertDispatchableQuery._1).executeUpdate() != 1) {
                logger().error("vlingo-symbio-jdbc:journal-" + databaseType + ": Could not insert dispatchable with id " + dispatchable.id());
                throw new IllegalStateException("vlingo-symbio-jdbc:journal-" + databaseType + ": Could not insert snapshot");
            }
        } catch (SQLException e) {
            consumer.accept(e);
            logger().error("vlingo-symbio-jdbc:journal-" + databaseType + ": Could not insert dispatchable with id " + dispatchable.id(), e);
            throw new IllegalStateException(e);
        }
    }

    private <S, ST> void appendResultedInFailure(String str, int i, Source<S> source, ST st, Journal.AppendResultInterest appendResultInterest, Object obj, Exception exc) {
        appendResultInterest.appendResultedIn(Failure.of(new StorageException(Result.Failure, exc.getMessage(), exc)), str, i, source, st == null ? Optional.empty() : Optional.of(st), obj);
    }

    private <S, ST> void appendAllResultedInFailure(String str, int i, List<Source<S>> list, ST st, Journal.AppendResultInterest appendResultInterest, Object obj, Exception exc) {
        appendResultInterest.appendAllResultedIn(Failure.of(new StorageException(Result.Failure, exc.getMessage(), exc)), str, i, list, st == null ? Optional.empty() : Optional.of(st), obj);
    }

    private void doCommit(Consumer<Exception> consumer) {
        try {
            this.connection.commit();
        } catch (SQLException e) {
            consumer.accept(e);
            logger().error("vlingo-symbio-jdbc:journal-" + this.databaseType + ": Could not complete transaction", e);
            throw new IllegalStateException(e);
        }
    }

    private <S> List<Entry<String>> asEntries(List<Source<S>> list, int i, Metadata metadata, Consumer<Exception> consumer) {
        ArrayList arrayList = new ArrayList(list.size());
        int i2 = i;
        Iterator<Source<S>> it = list.iterator();
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            arrayList.add(asEntry(it.next(), i3, metadata, consumer));
        }
        return arrayList;
    }

    private <S> Entry<String> asEntry(Source<S> source, int i, Metadata metadata, Consumer<Exception> consumer) {
        try {
            return this.entryAdapterProvider.asEntry(source, i, metadata);
        } catch (Exception e) {
            consumer.accept(e);
            logger().error("vlingo-symbio-jdbc:journal-" + this.databaseType + ": Cannot adapt source to entry because: ", e);
            throw new IllegalArgumentException(e);
        }
    }

    private <ST> Tuple2<Optional<ST>, Optional<State.TextState>> toState(String str, ST st, int i) {
        return st != null ? Tuple2.from(Optional.of(st), Optional.of(this.stateAdapterProvider.asRaw(str, st, i))) : Tuple2.from(Optional.empty(), Optional.empty());
    }

    private void dispatch(Dispatchable<Entry<String>, State.TextState> dispatchable) {
        if (this.dispatchers != null) {
            this.dispatchers.forEach(dispatcher -> {
                dispatcher.dispatch(dispatchable);
            });
        }
    }

    private Dispatchable<Entry<String>, State.TextState> buildDispatchable(String str, int i, List<Entry<String>> list, State.TextState textState) {
        return new Dispatchable<>(getDispatchId(str, i), LocalDateTime.now(), textState, list);
    }

    private String getDispatchId(String str, int i) {
        return str + ":" + i + ":" + this.dispatchablesIdentityGenerator.generate().toString();
    }
}
