package nstream.adapter.postgresql;

import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import nstream.adapter.common.ingress.IngestorMetricsAgent;
import nstream.adapter.common.provision.ProvisionLoader;
import nstream.adapter.common.schedule.DeferrableException;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.postgresql.jdbc.PgConnection;
import swim.concurrent.TimerRef;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/postgresql/PostgresListenAgent.class */
public abstract class PostgresListenAgent extends IngestorMetricsAgent<PostgresListenSettings, PGNotification> {
    protected HikariDataSource pool;
    protected volatile Connection pooledConnection;
    protected volatile PgConnection connection;
    protected TimerRef pollTimer;

    protected void assignPool(HikariDataSource hikariDataSource) {
        this.pool = hikariDataSource;
    }

    protected void listen() {
        try {
            this.pooledConnection = this.pool.getConnection();
            this.connection = (PgConnection) this.pooledConnection.unwrap(PGConnection.class);
            Statement createStatement = this.connection.createStatement();
            try {
                createStatement.execute("LISTEN " + ((PostgresListenSettings) this.ingressSettings).channel());
                info("Started listening to channel '" + ((PostgresListenSettings) this.ingressSettings).channel() + "'");
                if (createStatement != null) {
                    createStatement.close();
                }
            } finally {
            }
        } catch (SQLException e) {
            throw new RuntimeException(nodeUri() + ": Failed to listen", e);
        }
    }

    protected void unlisten() {
        try {
            Statement createStatement = this.connection.createStatement();
            try {
                createStatement.execute("UNLISTEN " + ((PostgresListenSettings) this.ingressSettings).channel());
                info("Stopped listening to channel + '" + ((PostgresListenSettings) this.ingressSettings).channel() + "'");
                this.connection = null;
                this.pooledConnection.close();
                this.pooledConnection = null;
                if (createStatement != null) {
                    createStatement.close();
                }
            } finally {
            }
        } catch (SQLException e) {
            throw new RuntimeException(nodeUri() + ": Failed to un-listen", e);
        }
    }

    protected void ingestNotifications() throws DeferrableException {
        try {
            PGNotification[] notifications = this.connection.getNotifications(0);
            if (notifications != null) {
                for (PGNotification pGNotification : notifications) {
                    ingestOrContinue(pGNotification);
                }
            }
        } catch (SQLException e) {
            throw new DeferrableException("Failed to get notifications", e);
        }
    }

    protected void cancel() {
        cancelTimer();
        unlisten();
    }

    protected void cancelTimer() {
        if (this.pollTimer != null) {
            this.pollTimer.cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: parseIngressSettings, reason: merged with bridge method [inline-methods] */
    public PostgresListenSettings m0parseIngressSettings(Value value) {
        PostgresListenSettings postgresListenSettings = (PostgresListenSettings) PostgresListenSettings.form().cast(value);
        return postgresListenSettings == null ? PostgresListenSettings.defaultSettings() : postgresListenSettings;
    }

    protected void stageReception() {
        loadSettings("postgresListenConf");
        assignPool((HikariDataSource) ProvisionLoader.getProvision(((PostgresListenSettings) this.ingressSettings).connectionPoolProvisionName()).value());
        listen();
        this.pollTimer = scheduleWithFixedDelay(() -> {
            return this.pollTimer;
        }, 0L, 1000L, this::ingestNotifications);
    }
}
