package io.vlingo.symbio.store.journal.jdbc.postgres;

import com.google.gson.Gson;
import io.vlingo.actors.Actor;
import io.vlingo.common.Completes;
import io.vlingo.symbio.Entry;
import io.vlingo.symbio.Metadata;
import io.vlingo.symbio.State;
import io.vlingo.symbio.store.common.jdbc.Configuration;
import io.vlingo.symbio.store.journal.Stream;
import io.vlingo.symbio.store.journal.StreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;

/* loaded from: input_file:io/vlingo/symbio/store/journal/jdbc/postgres/PostgresStreamReaderActor.class */
public class PostgresStreamReaderActor extends Actor implements StreamReader<String> {
    private static final String QUERY_EVENTS = "SELECT id, entry_data, entry_metadata, entry_type, entry_type_version FROM vlingo_symbio_journal WHERE stream_name = ? AND stream_version >= ?";
    private static final String QUERY_SNAPSHOT = "SELECT snapshot_type, snapshot_type_version, snapshot_data, snapshot_data_version, snapshot_metadata FROM vlingo_symbio_journal_snapshots WHERE stream_name = ?";
    private final Connection connection;
    private final PreparedStatement queryEventsStatement;
    private final PreparedStatement queryLatestSnapshotStatement;
    private final Gson gson = new Gson();

    public PostgresStreamReaderActor(Configuration configuration) throws SQLException {
        this.connection = configuration.connection;
        this.queryEventsStatement = this.connection.prepareStatement(QUERY_EVENTS);
        this.queryLatestSnapshotStatement = this.connection.prepareStatement(QUERY_SNAPSHOT);
    }

    public Completes<Stream<String>> streamFor(String str) {
        return streamFor(str, 1);
    }

    public Completes<Stream<String>> streamFor(String str, int i) {
        try {
            return completes().with(eventsFromOffset(str, i));
        } catch (Exception e) {
            logger().log("vlingo/symbio-postgresql: " + e.getMessage(), e);
            return completes().with(new Stream(str, 1, Collections.emptyList(), State.TextState.Null));
        }
    }

    private Stream<String> eventsFromOffset(String str, int i) throws Exception {
        State latestSnapshotOf = latestSnapshotOf(str);
        ArrayList arrayList = new ArrayList();
        int i2 = i;
        State state = State.TextState.Null;
        if (latestSnapshotOf != State.TextState.Null && latestSnapshotOf.dataVersion > i) {
            i2 = latestSnapshotOf.dataVersion;
            state = latestSnapshotOf;
        }
        this.queryEventsStatement.setString(1, str);
        this.queryEventsStatement.setInt(2, i2);
        ResultSet executeQuery = this.queryEventsStatement.executeQuery();
        while (executeQuery.next()) {
            String string = executeQuery.getString(1);
            String string2 = executeQuery.getString(2);
            String string3 = executeQuery.getString(3);
            String string4 = executeQuery.getString(4);
            arrayList.add(new Entry.TextEntry(string, Class.forName(string4), executeQuery.getInt(5), string2, (Metadata) this.gson.fromJson(string3, Metadata.class)));
        }
        return new Stream<>(str, i2 + arrayList.size(), arrayList, state);
    }

    private State<String> latestSnapshotOf(String str) throws Exception {
        this.queryLatestSnapshotStatement.setString(1, str);
        ResultSet executeQuery = this.queryLatestSnapshotStatement.executeQuery();
        if (!executeQuery.next()) {
            return State.TextState.Null;
        }
        String string = executeQuery.getString(1);
        return new State.TextState(str, Class.forName(string), executeQuery.getInt(2), executeQuery.getString(3), executeQuery.getInt(4), (Metadata) this.gson.fromJson(executeQuery.getString(5), Metadata.class));
    }
}
