package io.vlingo.symbio.store.journal.jdbc.postgres;

import com.google.gson.Gson;
import io.vlingo.actors.Actor;
import io.vlingo.actors.Definition;
import io.vlingo.common.Completes;
import io.vlingo.common.Failure;
import io.vlingo.common.Success;
import io.vlingo.common.Tuple2;
import io.vlingo.common.identity.IdentityGenerator;
import io.vlingo.symbio.BaseEntry;
import io.vlingo.symbio.Entry;
import io.vlingo.symbio.EntryAdapterProvider;
import io.vlingo.symbio.Source;
import io.vlingo.symbio.State;
import io.vlingo.symbio.StateAdapterProvider;
import io.vlingo.symbio.store.Result;
import io.vlingo.symbio.store.StorageException;
import io.vlingo.symbio.store.common.jdbc.Configuration;
import io.vlingo.symbio.store.journal.Journal;
import io.vlingo.symbio.store.journal.JournalListener;
import io.vlingo.symbio.store.journal.JournalReader;
import io.vlingo.symbio.store.journal.StreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;

/* loaded from: input_file:io/vlingo/symbio/store/journal/jdbc/postgres/PostgresJournalActor.class */
public class PostgresJournalActor extends Actor implements Journal<String> {
    private static final String INSERT_EVENT = "INSERT INTO vlingo_symbio_journal(entry_data, entry_metadata, entry_type, entry_type_version, stream_name, stream_version, id, entry_timestamp)VALUES(?::JSONB, ?::JSONB, ?, ?, ?, ?, ?, ?)";
    private static final String INSERT_SNAPSHOT = "INSERT INTO vlingo_symbio_journal_snapshots(stream_name, snapshot_type, snapshot_type_version, snapshot_data, snapshot_data_version, snapshot_metadata)VALUES(?, ?, ?, ?::JSONB, ?, ?::JSONB)";
    private final Configuration configuration;
    private final Connection connection;
    private final JournalListener<String> listener;
    private final PreparedStatement insertEvent;
    private final PreparedStatement insertSnapshot;
    private final Gson gson = new Gson();
    private final EntryAdapterProvider entryAdapterProvider = EntryAdapterProvider.instance(stage().world());
    private final StateAdapterProvider stateAdapterProvider = StateAdapterProvider.instance(stage().world());
    private final Map<String, JournalReader<BaseEntry.TextEntry>> journalReaders = new HashMap();
    private final Map<String, StreamReader<String>> streamReaders = new HashMap();
    private final IdentityGenerator identityGenerator = new IdentityGenerator.TimeBasedIdentityGenerator();

    public PostgresJournalActor(JournalListener<String> journalListener, Configuration configuration) throws SQLException {
        this.configuration = configuration;
        this.connection = configuration.connection;
        this.listener = journalListener;
        this.insertEvent = this.connection.prepareStatement(INSERT_EVENT);
        this.insertSnapshot = this.connection.prepareStatement(INSERT_SNAPSHOT);
    }

    public <S, ST> void append(String str, int i, Source<S> source, Journal.AppendResultInterest appendResultInterest, Object obj) {
        Consumer<Exception> consumer = exc -> {
            appendResultedInFailure(str, i, source, null, appendResultInterest, obj, exc);
        };
        Entry<String> asEntry = asEntry(source, consumer);
        insertEntry(str, i, asEntry, consumer);
        doCommit(consumer);
        this.listener.appended(asEntry);
        appendResultInterest.appendResultedIn(Success.of(Result.Success), str, i, source, Optional.empty(), obj);
    }

    public <S, ST> void appendWith(String str, int i, Source<S> source, ST st, Journal.AppendResultInterest appendResultInterest, Object obj) {
        Consumer<Exception> consumer = exc -> {
            appendResultedInFailure(str, i, source, st, appendResultInterest, obj, exc);
        };
        Entry<String> asEntry = asEntry(source, consumer);
        insertEntry(str, i, asEntry, consumer);
        Tuple2<Optional<ST>, Optional<State.TextState>> state = toState(str, st, i);
        ((Optional) state._2).ifPresent(textState -> {
            insertSnapshot(str, textState, consumer);
        });
        doCommit(consumer);
        this.listener.appendedWith(asEntry, (State) ((Optional) state._2).orElseGet(() -> {
            return null;
        }));
        appendResultInterest.appendResultedIn(Success.of(Result.Success), str, i, source, (Optional) state._1, obj);
    }

    public <S, ST> void appendAll(String str, int i, List<Source<S>> list, Journal.AppendResultInterest appendResultInterest, Object obj) {
        Consumer<Exception> consumer = exc -> {
            appendAllResultedInFailure(str, i, list, null, appendResultInterest, obj, exc);
        };
        List<Entry<String>> asEntries = asEntries(list, consumer);
        int i2 = i;
        Iterator<Entry<String>> it = asEntries.iterator();
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            insertEntry(str, i3, it.next(), consumer);
        }
        doCommit(consumer);
        this.listener.appendedAll(asEntries);
        appendResultInterest.appendAllResultedIn(Success.of(Result.Success), str, i, list, Optional.empty(), obj);
    }

    public <S, ST> void appendAllWith(String str, int i, List<Source<S>> list, ST st, Journal.AppendResultInterest appendResultInterest, Object obj) {
        Consumer<Exception> consumer = exc -> {
            appendAllResultedInFailure(str, i, list, st, appendResultInterest, obj, exc);
        };
        List<Entry<String>> asEntries = asEntries(list, consumer);
        int i2 = i;
        Iterator<Entry<String>> it = asEntries.iterator();
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            insertEntry(str, i3, it.next(), consumer);
        }
        Tuple2<Optional<ST>, Optional<State.TextState>> state = toState(str, st, i);
        ((Optional) state._2).ifPresent(textState -> {
            insertSnapshot(str, textState, consumer);
        });
        doCommit(consumer);
        this.listener.appendedAllWith(asEntries, (State) ((Optional) state._2).orElseGet(() -> {
            return null;
        }));
        appendResultInterest.appendAllResultedIn(Success.of(Result.Success), str, i, list, (Optional) state._1, obj);
    }

    public Completes<JournalReader<BaseEntry.TextEntry>> journalReader(String str) {
        return completes().with(this.journalReaders.computeIfAbsent(str, str2 -> {
            return (JournalReader) stage().actorFor(JournalReader.class, Definition.has(PostgresJournalReaderActor.class, Definition.parameters(new Object[]{this.configuration, str})), stage().world().addressFactory().uniquePrefixedWith("eventJournalReader-" + str));
        }));
    }

    public Completes<StreamReader<String>> streamReader(String str) {
        return completes().with(this.streamReaders.computeIfAbsent(str, str2 -> {
            return (StreamReader) stage().actorFor(StreamReader.class, Definition.has(PostgresStreamReaderActor.class, Definition.parameters(new Object[]{this.configuration})), stage().world().addressFactory().uniquePrefixedWith("eventStreamReader-" + str2));
        }));
    }

    protected final void insertEntry(String str, int i, Entry<String> entry, Consumer<Exception> consumer) {
        try {
            UUID generate = this.identityGenerator.generate();
            long timestamp = generate.timestamp();
            this.insertEvent.setString(1, (String) entry.entryData());
            this.insertEvent.setString(2, this.gson.toJson(entry.metadata()));
            this.insertEvent.setString(3, entry.type());
            this.insertEvent.setInt(4, entry.typeVersion());
            this.insertEvent.setString(5, str);
            this.insertEvent.setInt(6, i);
            this.insertEvent.setObject(7, generate);
            this.insertEvent.setLong(8, timestamp);
            if (this.insertEvent.executeUpdate() != 1) {
                logger().log("vlingo/symbio-jdbc-postgres: Could not insert event " + entry.toString());
                throw new IllegalStateException("vlingo/symbio-jdbc-postgres: Could not insert event");
            }
            ((BaseEntry) entry).__internal__setId(generate.toString());
        } catch (SQLException e) {
            consumer.accept(e);
            logger().log("vlingo/symbio-jdbc-postgres: Could not insert event " + entry.toString(), e);
            throw new IllegalStateException(e);
        }
    }

    protected final void insertSnapshot(String str, State.TextState textState, Consumer<Exception> consumer) {
        try {
            this.insertSnapshot.setString(1, str);
            this.insertSnapshot.setString(2, textState.type);
            this.insertSnapshot.setInt(3, textState.typeVersion);
            this.insertSnapshot.setString(4, (String) textState.data);
            this.insertSnapshot.setInt(5, textState.dataVersion);
            this.insertSnapshot.setString(6, this.gson.toJson(textState.metadata));
            if (this.insertSnapshot.executeUpdate() != 1) {
                logger().log("vlingo/symbio-jdbc-postgres: Could not insert snapshot with id " + textState.id);
                throw new IllegalStateException("vlingo/symbio-jdbc-postgres: Could not insert snapshot");
            }
        } catch (SQLException e) {
            consumer.accept(e);
            logger().log("vlingo/symbio-jdbc-postgres: Could not insert event with id " + textState.id, e);
            throw new IllegalStateException(e);
        }
    }

    private <S, ST> void appendResultedInFailure(String str, int i, Source<S> source, ST st, Journal.AppendResultInterest appendResultInterest, Object obj, Exception exc) {
        appendResultInterest.appendResultedIn(Failure.of(new StorageException(Result.Failure, exc.getMessage(), exc)), str, i, source, st == null ? Optional.empty() : Optional.of(st), obj);
    }

    private <S, ST> void appendAllResultedInFailure(String str, int i, List<Source<S>> list, ST st, Journal.AppendResultInterest appendResultInterest, Object obj, Exception exc) {
        appendResultInterest.appendAllResultedIn(Failure.of(new StorageException(Result.Failure, exc.getMessage(), exc)), str, i, list, st == null ? Optional.empty() : Optional.of(st), obj);
    }

    private void doCommit(Consumer<Exception> consumer) {
        try {
            this.connection.commit();
        } catch (SQLException e) {
            consumer.accept(e);
            logger().log("vlingo/symbio-jdbc-postgres: Could not complete transaction", e);
            throw new IllegalStateException(e);
        }
    }

    private <S> List<Entry<String>> asEntries(List<Source<S>> list, Consumer<Exception> consumer) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<Source<S>> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(asEntry(it.next(), consumer));
        }
        return arrayList;
    }

    private <S> Entry<String> asEntry(Source<S> source, Consumer<Exception> consumer) {
        try {
            return this.entryAdapterProvider.asEntry(source);
        } catch (Exception e) {
            consumer.accept(e);
            logger().log("vlingo/symbio-jdbc-postgres: Cannot adapt source to entry because: ", e);
            throw new IllegalArgumentException(e);
        }
    }

    private <ST> Tuple2<Optional<ST>, Optional<State.TextState>> toState(String str, ST st, int i) {
        return st != null ? Tuple2.from(Optional.of(st), Optional.of(this.stateAdapterProvider.asRaw(str, st, i))) : Tuple2.from(Optional.empty(), Optional.empty());
    }
}
