package org.factcast.store.internal.listen;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.EventBus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import lombok.NonNull;
import org.factcast.core.util.FactCastJson;
import org.factcast.store.StoreConfigurationProperties;
import org.factcast.store.internal.PgConstants;
import org.factcast.store.internal.PgMetrics;
import org.factcast.store.internal.StoreMetrics;
import org.postgresql.PGNotification;
import org.postgresql.jdbc.PgConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:org/factcast/store/internal/listen/PgListener.class */
public class PgListener implements InitializingBean, DisposableBean {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PgListener.class);

    @NonNull
    private final PgConnectionSupplier pgConnectionSupplier;

    @NonNull
    private final EventBus eventBus;

    @NonNull
    private final StoreConfigurationProperties props;

    @NonNull
    private final PgMetrics pgMetrics;
    private Thread listenerThread;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    /* loaded from: input_file:org/factcast/store/internal/listen/PgListener$BlacklistChangeSignal.class */
    public static final class BlacklistChangeSignal {
        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public BlacklistChangeSignal() {
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public boolean equals(Object obj) {
            return obj == this || (obj instanceof BlacklistChangeSignal);
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int hashCode() {
            return 1;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            return "PgListener.BlacklistChangeSignal()";
        }
    }

    /* loaded from: input_file:org/factcast/store/internal/listen/PgListener$FactInsertionSignal.class */
    public static final class FactInsertionSignal {

        @NonNull
        private final String name;
        private final String ns;
        private final String type;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public FactInsertionSignal(@NonNull String str, String str2, String str3) {
            Objects.requireNonNull(str, "name is marked non-null but is null");
            this.name = str;
            this.ns = str2;
            this.type = str3;
        }

        @NonNull
        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String name() {
            return this.name;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String ns() {
            return this.ns;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String type() {
            return this.type;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof FactInsertionSignal)) {
                return false;
            }
            FactInsertionSignal factInsertionSignal = (FactInsertionSignal) obj;
            String name = name();
            String name2 = factInsertionSignal.name();
            if (name == null) {
                if (name2 != null) {
                    return false;
                }
            } else if (!name.equals(name2)) {
                return false;
            }
            String ns = ns();
            String ns2 = factInsertionSignal.ns();
            if (ns == null) {
                if (ns2 != null) {
                    return false;
                }
            } else if (!ns.equals(ns2)) {
                return false;
            }
            String type = type();
            String type2 = factInsertionSignal.type();
            return type == null ? type2 == null : type.equals(type2);
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int hashCode() {
            String name = name();
            int hashCode = (1 * 59) + (name == null ? 43 : name.hashCode());
            String ns = ns();
            int hashCode2 = (hashCode * 59) + (ns == null ? 43 : ns.hashCode());
            String type = type();
            return (hashCode2 * 59) + (type == null ? 43 : type.hashCode());
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            return "PgListener.FactInsertionSignal(name=" + name() + ", ns=" + ns() + ", type=" + type() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    /* loaded from: input_file:org/factcast/store/internal/listen/PgListener$NotificationReceiverLoop.class */
    public class NotificationReceiverLoop implements Runnable {
        protected NotificationReceiverLoop() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (PgListener.this.running.get()) {
                try {
                    PgConnection pgConnection = PgListener.this.pgConnectionSupplier.get();
                    try {
                        PgListener.this.connectionSetup(pgConnection);
                        while (PgListener.this.running.get()) {
                            PgListener.this.processNotifications(PgListener.this.receiveNotifications(pgConnection));
                        }
                        if (pgConnection != null) {
                            pgConnection.close();
                        }
                    } catch (Throwable th) {
                        if (pgConnection != null) {
                            try {
                                pgConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                        break;
                    }
                } catch (Exception e) {
                    if (PgListener.this.running.get()) {
                        PgListener.log.warn("While waiting for Notifications", e);
                        PgListener.this.sleep();
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/factcast/store/internal/listen/PgListener$SchemaStoreChangeSignal.class */
    public static final class SchemaStoreChangeSignal {

        @NonNull
        private final String ns;

        @NonNull
        private final String type;

        @NonNull
        private final Integer version;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public SchemaStoreChangeSignal(@NonNull String str, @NonNull String str2, @NonNull Integer num) {
            Objects.requireNonNull(str, "ns is marked non-null but is null");
            Objects.requireNonNull(str2, "type is marked non-null but is null");
            Objects.requireNonNull(num, "version is marked non-null but is null");
            this.ns = str;
            this.type = str2;
            this.version = num;
        }

        @NonNull
        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String ns() {
            return this.ns;
        }

        @NonNull
        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String type() {
            return this.type;
        }

        @NonNull
        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public Integer version() {
            return this.version;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof SchemaStoreChangeSignal)) {
                return false;
            }
            SchemaStoreChangeSignal schemaStoreChangeSignal = (SchemaStoreChangeSignal) obj;
            Integer version = version();
            Integer version2 = schemaStoreChangeSignal.version();
            if (version == null) {
                if (version2 != null) {
                    return false;
                }
            } else if (!version.equals(version2)) {
                return false;
            }
            String ns = ns();
            String ns2 = schemaStoreChangeSignal.ns();
            if (ns == null) {
                if (ns2 != null) {
                    return false;
                }
            } else if (!ns.equals(ns2)) {
                return false;
            }
            String type = type();
            String type2 = schemaStoreChangeSignal.type();
            return type == null ? type2 == null : type.equals(type2);
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int hashCode() {
            Integer version = version();
            int hashCode = (1 * 59) + (version == null ? 43 : version.hashCode());
            String ns = ns();
            int hashCode2 = (hashCode * 59) + (ns == null ? 43 : ns.hashCode());
            String type = type();
            return (hashCode2 * 59) + (type == null ? 43 : type.hashCode());
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            return "PgListener.SchemaStoreChangeSignal(ns=" + ns() + ", type=" + type() + ", version=" + version() + ")";
        }
    }

    /* loaded from: input_file:org/factcast/store/internal/listen/PgListener$TransformationStoreChangeSignal.class */
    public static final class TransformationStoreChangeSignal {

        @NonNull
        private final String ns;

        @NonNull
        private final String type;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TransformationStoreChangeSignal(@NonNull String str, @NonNull String str2) {
            Objects.requireNonNull(str, "ns is marked non-null but is null");
            Objects.requireNonNull(str2, "type is marked non-null but is null");
            this.ns = str;
            this.type = str2;
        }

        @NonNull
        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String ns() {
            return this.ns;
        }

        @NonNull
        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String type() {
            return this.type;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof TransformationStoreChangeSignal)) {
                return false;
            }
            TransformationStoreChangeSignal transformationStoreChangeSignal = (TransformationStoreChangeSignal) obj;
            String ns = ns();
            String ns2 = transformationStoreChangeSignal.ns();
            if (ns == null) {
                if (ns2 != null) {
                    return false;
                }
            } else if (!ns.equals(ns2)) {
                return false;
            }
            String type = type();
            String type2 = transformationStoreChangeSignal.type();
            return type == null ? type2 == null : type.equals(type2);
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int hashCode() {
            String ns = ns();
            int hashCode = (1 * 59) + (ns == null ? 43 : ns.hashCode());
            String type = type();
            return (hashCode * 59) + (type == null ? 43 : type.hashCode());
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            return "PgListener.TransformationStoreChangeSignal(ns=" + ns() + ", type=" + type() + ")";
        }
    }

    @VisibleForTesting
    protected void listen() {
        log.trace("Starting instance Listener");
        this.listenerThread = new Thread(new NotificationReceiverLoop(), "PG Instance Listener");
        this.listenerThread.setDaemon(true);
        this.listenerThread.setUncaughtExceptionHandler((thread, th) -> {
            log.error("thread " + thread + " encountered an unhandled exception", th);
        });
        this.listenerThread.start();
        try {
            log.info("Waiting to establish postgres listener (max 15sec.)");
            log.info("postgres listener " + (this.countDownLatch.await(15L, TimeUnit.SECONDS) ? "" : "not ") + "established");
        } catch (InterruptedException e) {
        }
    }

    private void connectionSetup(PgConnection pgConnection) throws SQLException {
        setupPostgresListeners(pgConnection);
        this.countDownLatch.countDown();
        informSubscribersAboutFreshConnection();
    }

    @VisibleForTesting
    protected void setupPostgresListeners(PgConnection pgConnection) throws SQLException {
        PreparedStatement prepareStatement = pgConnection.prepareStatement(PgConstants.LISTEN_SQL);
        try {
            prepareStatement.execute();
            if (prepareStatement != null) {
                prepareStatement.close();
            }
            PreparedStatement prepareStatement2 = pgConnection.prepareStatement(PgConstants.LISTEN_ROUNDTRIP_CHANNEL_SQL);
            try {
                prepareStatement2.execute();
                if (prepareStatement2 != null) {
                    prepareStatement2.close();
                }
                PreparedStatement prepareStatement3 = pgConnection.prepareStatement(PgConstants.LISTEN_BLACKLIST_CHANGE_CHANNEL_SQL);
                try {
                    prepareStatement3.execute();
                    if (prepareStatement3 != null) {
                        prepareStatement3.close();
                    }
                    PreparedStatement prepareStatement4 = pgConnection.prepareStatement(PgConstants.LISTEN_SCHEMASTORE_CHANGE_CHANNEL_SQL);
                    try {
                        prepareStatement4.execute();
                        if (prepareStatement4 != null) {
                            prepareStatement4.close();
                        }
                        prepareStatement4 = pgConnection.prepareStatement(PgConstants.LISTEN_TRANSFORMATIONSTORE_CHANGE_CHANNEL_SQL);
                        try {
                            prepareStatement4.execute();
                            if (prepareStatement4 != null) {
                                prepareStatement4.close();
                            }
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                    if (prepareStatement3 != null) {
                        try {
                            prepareStatement3.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                }
            } finally {
                if (prepareStatement2 != null) {
                    try {
                        prepareStatement2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            }
        } finally {
            if (prepareStatement != null) {
                try {
                    prepareStatement.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        }
    }

    @VisibleForTesting
    protected void informSubscribersAboutFreshConnection() {
        postFactInsertionSignal(PgConstants.CHANNEL_SCHEDULED_POLL);
        postBlacklistChangeSignal();
    }

    @VisibleForTesting
    protected void processNotifications(PGNotification[] pGNotificationArr) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Arrays.asList(pGNotificationArr).forEach(pGNotification -> {
            String name = pGNotification.getName();
            log.trace("Received notification on channel: {}.", name);
            if (PgConstants.CHANNEL_BLACKLIST_CHANGE.equals(name)) {
                postBlacklistChangeSignal();
                return;
            }
            if (PgConstants.CHANNEL_SCHEMASTORE_CHANGE.equals(name)) {
                processSchemaStoreChangeNotification(pGNotification);
                return;
            }
            if (PgConstants.CHANNEL_TRANSFORMATIONSTORE_CHANGE.equals(name)) {
                processTransformationStoreChangeNotification(pGNotification);
            } else if (PgConstants.CHANNEL_FACT_INSERT.equals(name)) {
                processFactInsertNotification(pGNotification, atomicBoolean);
            } else {
                if (PgConstants.CHANNEL_ROUNDTRIP.equals(name)) {
                    return;
                }
                log.warn("Ignored notification from unknown channel: {}", name);
            }
        });
    }

    private void processSchemaStoreChangeNotification(PGNotification pGNotification) {
        try {
            JsonNode readTree = FactCastJson.readTree(pGNotification.getParameter());
            postSchemaStoreChangeSignal(new SchemaStoreChangeSignal(readTree.get(PgConstants.ALIAS_NS).asText(), readTree.get(PgConstants.ALIAS_TYPE).asText(), Integer.valueOf(readTree.get(PgConstants.COLUMN_VERSION).asInt())));
        } catch (JsonProcessingException | NullPointerException e) {
            log.warn("Unparesable JSON parameter from notification: {}.", pGNotification.getName());
        }
    }

    private void processTransformationStoreChangeNotification(PGNotification pGNotification) {
        try {
            JsonNode readTree = FactCastJson.readTree(pGNotification.getParameter());
            postTransformationStoreChangeSignal(new TransformationStoreChangeSignal(readTree.get(PgConstants.ALIAS_NS).asText(), readTree.get(PgConstants.ALIAS_TYPE).asText()));
        } catch (JsonProcessingException | NullPointerException e) {
            log.warn("Unparesable JSON parameter from notification: {}.", pGNotification.getName());
        }
    }

    private void processFactInsertNotification(PGNotification pGNotification, AtomicBoolean atomicBoolean) {
        try {
            JsonNode readTree = FactCastJson.readTree(pGNotification.getParameter());
            postFactInsertionSignal(new FactInsertionSignal(PgConstants.CHANNEL_FACT_INSERT, readTree.get(PgConstants.ALIAS_NS).asText(), readTree.get(PgConstants.ALIAS_TYPE).asText()));
        } catch (JsonProcessingException | NullPointerException e) {
            if (atomicBoolean.getAndSet(true)) {
                return;
            }
            log.warn("Unparesable JSON header from Notification: {}. Notifying everyone - just in case", pGNotification.getName());
            postFactInsertionSignal(PgConstants.CHANNEL_FACT_INSERT);
        }
    }

    private void postBlacklistChangeSignal() {
        log.trace("Potential blacklist change detected");
        this.eventBus.post(new BlacklistChangeSignal());
    }

    @VisibleForTesting
    protected void postSchemaStoreChangeSignal(SchemaStoreChangeSignal schemaStoreChangeSignal) {
        log.trace("Schema store change detected");
        this.eventBus.post(schemaStoreChangeSignal);
    }

    @VisibleForTesting
    protected void postTransformationStoreChangeSignal(TransformationStoreChangeSignal transformationStoreChangeSignal) {
        log.trace("Transformation store change detected");
        this.eventBus.post(transformationStoreChangeSignal);
    }

    @VisibleForTesting
    protected PGNotification[] receiveNotifications(PgConnection pgConnection) throws SQLException {
        PGNotification[] notifications = pgConnection.getNotifications(this.props.getFactNotificationBlockingWaitTimeInMillis());
        if (notifications == null || notifications.length == 0) {
            notifications = checkDatabaseConnectionHealthy(pgConnection);
        }
        return notifications;
    }

    @VisibleForTesting
    protected PGNotification[] checkDatabaseConnectionHealthy(PgConnection pgConnection) throws SQLException {
        long nanoTime = System.nanoTime();
        pgConnection.prepareCall(PgConstants.NOTIFY_ROUNDTRIP).execute();
        PGNotification[] notifications = pgConnection.getNotifications(this.props.getFactNotificationMaxRoundTripLatencyInMillis());
        if (notifications == null || notifications.length == 0) {
            this.pgMetrics.counter(StoreMetrics.EVENT.MISSED_ROUNDTRIP).increment();
            throw new SQLException("Missed roundtrip notification from channel '" + PgConstants.CHANNEL_ROUNDTRIP + "'");
        }
        this.pgMetrics.timer(StoreMetrics.OP.NOTIFY_ROUNDTRIP).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
        return notifications;
    }

    @VisibleForTesting
    protected void sleep() {
        try {
            Thread.sleep(this.props.getFactNotificationNewConnectionWaitTimeInMillis());
        } catch (InterruptedException e) {
        }
    }

    @VisibleForTesting
    protected void postFactInsertionSignal(@NonNull String str) {
        Objects.requireNonNull(str, "name is marked non-null but is null");
        postFactInsertionSignal(new FactInsertionSignal(str, null, null));
    }

    @VisibleForTesting
    protected void postFactInsertionSignal(@NonNull FactInsertionSignal factInsertionSignal) {
        Objects.requireNonNull(factInsertionSignal, "signal is marked non-null but is null");
        if (this.running.get()) {
            log.trace("notifying consumers for '{}' with ns={}, type={}", new Object[]{factInsertionSignal.name(), factInsertionSignal.ns(), factInsertionSignal.type()});
            this.eventBus.post(factInsertionSignal);
        }
    }

    public void afterPropertiesSet() {
        listen();
    }

    public void destroy() {
        this.running.set(false);
        if (this.listenerThread != null) {
            this.listenerThread.interrupt();
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public PgListener(@NonNull PgConnectionSupplier pgConnectionSupplier, @NonNull EventBus eventBus, @NonNull StoreConfigurationProperties storeConfigurationProperties, @NonNull PgMetrics pgMetrics) {
        Objects.requireNonNull(pgConnectionSupplier, "pgConnectionSupplier is marked non-null but is null");
        Objects.requireNonNull(eventBus, "eventBus is marked non-null but is null");
        Objects.requireNonNull(storeConfigurationProperties, "props is marked non-null but is null");
        Objects.requireNonNull(pgMetrics, "pgMetrics is marked non-null but is null");
        this.pgConnectionSupplier = pgConnectionSupplier;
        this.eventBus = eventBus;
        this.props = storeConfigurationProperties;
        this.pgMetrics = pgMetrics;
    }
}
