package io.vlingo.symbio.store.journal.jdbc.postgres;

import com.google.gson.Gson;
import io.vlingo.actors.Actor;
import io.vlingo.common.Completes;
import io.vlingo.symbio.Entry;
import io.vlingo.symbio.Metadata;
import io.vlingo.symbio.State;
import io.vlingo.symbio.store.common.jdbc.Configuration;
import io.vlingo.symbio.store.journal.JournalReader;
import io.vlingo.symbio.store.journal.Stream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;

/* loaded from: input_file:io/vlingo/symbio/store/journal/jdbc/postgres/PostgresJournalReaderActor.class */
public class PostgresJournalReaderActor extends Actor implements JournalReader<String> {
    private static final String QUERY_CURRENT_OFFSET = "SELECT reader_offset FROM vlingo_symbio_journal_offsets WHERE reader_name=?";
    private static final String UPDATE_CURRENT_OFFSET = "INSERT INTO vlingo_symbio_journal_offsets(reader_offset, reader_name) VALUES(?, ?) ON CONFLICT (reader_name) DO UPDATE SET reader_offset=?";
    private static final String QUERY_SINGLE = "SELECT id, entry_data, entry_metadata, entry_type, entry_type_version, entry_timestamp FROM vlingo_symbio_journal WHERE entry_timestamp >= ?";
    private static final String QUERY_BATCH = "SELECT id, entry_data, entry_metadata, entry_type, entry_type_version, entry_timestamp FROM vlingo_symbio_journal WHERE entry_timestamp > ?";
    private static final String QUERY_LAST_OFFSET = "SELECT MAX(entry_timestamp) FROM vlingo_symbio_journal";
    private final Connection connection;
    private final String name;
    private final PreparedStatement queryCurrentOffset;
    private final PreparedStatement updateCurrentOffset;
    private final PreparedStatement querySingleEvent;
    private final PreparedStatement queryEventBatch;
    private final PreparedStatement queryLastOffset;
    private final Gson gson = new Gson();
    private long offset;

    public PostgresJournalReaderActor(Configuration configuration, String str) throws SQLException {
        this.connection = configuration.connection;
        this.name = str;
        this.queryCurrentOffset = this.connection.prepareStatement(QUERY_CURRENT_OFFSET);
        this.updateCurrentOffset = this.connection.prepareStatement(UPDATE_CURRENT_OFFSET);
        this.querySingleEvent = this.connection.prepareStatement(QUERY_SINGLE);
        this.queryEventBatch = this.connection.prepareStatement(QUERY_BATCH);
        this.queryLastOffset = this.connection.prepareStatement(QUERY_LAST_OFFSET);
        retrieveCurrentOffset();
    }

    public Completes<String> name() {
        return completes().with(this.name);
    }

    public Completes<Entry<String>> readNext() {
        try {
            this.querySingleEvent.setLong(1, this.offset);
            ResultSet executeQuery = this.querySingleEvent.executeQuery();
            if (executeQuery.next()) {
                this.offset = nextOffsetFromResultSet(executeQuery);
                updateCurrentOffset();
                return completes().with(eventFromResultSet(executeQuery));
            }
        } catch (Exception e) {
            logger().log("vlingo/symbio-postgres: " + e.getMessage(), e);
        }
        return completes().with((Object) null);
    }

    public Completes<Stream<String>> readNext(int i) {
        try {
            ArrayList arrayList = new ArrayList(i);
            this.queryEventBatch.setLong(1, this.offset);
            this.queryEventBatch.setMaxRows(i);
            ResultSet executeQuery = this.queryEventBatch.executeQuery();
            while (executeQuery.next()) {
                arrayList.add(eventFromResultSet(executeQuery));
                if (executeQuery.isLast()) {
                    this.offset = nextOffsetFromResultSet(executeQuery);
                }
            }
            updateCurrentOffset();
            return completes().with(new Stream(this.name, (int) this.offset, arrayList, State.TextState.Null));
        } catch (Exception e) {
            logger().log("vlingo/symbio-postgres: " + e.getMessage(), e);
            return completes().with((Object) null);
        }
    }

    public void rewind() {
        this.offset = 1L;
        updateCurrentOffset();
    }

    public Completes<String> seekTo(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case 60:
                if (str.equals("<")) {
                    z = false;
                    break;
                }
                break;
            case 61:
                if (str.equals("=")) {
                    z = 2;
                    break;
                }
                break;
            case 62:
                if (str.equals(">")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.offset = 1L;
                updateCurrentOffset();
                break;
            case true:
                this.offset = retrieveLatestOffset() + 1;
                updateCurrentOffset();
                break;
            case true:
                break;
            default:
                this.offset = Integer.parseInt(str);
                updateCurrentOffset();
                break;
        }
        return completes().with(String.valueOf(this.offset));
    }

    private Entry<String> eventFromResultSet(ResultSet resultSet) throws SQLException, ClassNotFoundException {
        String string = resultSet.getString(1);
        String string2 = resultSet.getString(2);
        String string3 = resultSet.getString(3);
        String string4 = resultSet.getString(4);
        return new Entry.TextEntry(string, Class.forName(string4), resultSet.getInt(5), string2, (Metadata) this.gson.fromJson(string3, Metadata.class));
    }

    private void retrieveCurrentOffset() {
        this.offset = 1L;
        try {
            this.queryCurrentOffset.setString(1, this.name);
            ResultSet executeQuery = this.queryCurrentOffset.executeQuery();
            if (executeQuery.next()) {
                this.offset = executeQuery.getLong(1);
            }
        } catch (Exception e) {
            logger().log("vlingo/symbio-postgres: " + e.getMessage(), e);
            logger().log("vlingo/symbio-postgres: Rewinding the offset");
        }
    }

    private void updateCurrentOffset() {
        try {
            this.updateCurrentOffset.setLong(1, this.offset);
            this.updateCurrentOffset.setString(2, this.name);
            this.updateCurrentOffset.setLong(3, this.offset);
            this.updateCurrentOffset.executeUpdate();
            this.connection.commit();
        } catch (Exception e) {
            logger().log("vlingo/symbio-postgres: Could not persist the offset. Will retry on next read.");
            logger().log("vlingo/symbio-postgres: " + e.getMessage(), e);
        }
    }

    private long retrieveLatestOffset() {
        try {
            ResultSet executeQuery = this.queryLastOffset.executeQuery();
            if (executeQuery.next()) {
                return executeQuery.getLong(1);
            }
        } catch (Exception e) {
            logger().log("vlingo/symbio-postgres: Could not retrieve latest offset, using current.");
        }
        return this.offset;
    }

    private long nextOffsetFromResultSet(ResultSet resultSet) throws SQLException {
        return resultSet.getLong(6) + 1;
    }
}
