package org.revenj;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Type;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.sql.DataSource;
import org.postgresql.Driver;
import org.postgresql.PGNotification;
import org.postgresql.core.BaseConnection;
import org.revenj.extensibility.SystemState;
import org.revenj.patterns.DataChangeNotification;
import org.revenj.patterns.DomainModel;
import org.revenj.patterns.EagerNotification;
import org.revenj.patterns.Repository;
import org.revenj.patterns.ServiceLocator;
import org.revenj.postgres.PostgresReader;
import org.revenj.postgres.converters.StringConverter;
import rx.Observable;
import rx.subjects.PublishSubject;

/* loaded from: input_file:org/revenj/PostgresDatabaseNotification.class */
final class PostgresDatabaseNotification implements EagerNotification, Closeable {
    private final DataSource dataSource;
    private final Optional<DomainModel> domainModel;
    private final SystemState systemState;
    private final ServiceLocator locator;
    private final Properties properties;
    private int retryCount;
    private final int timeout;
    private boolean isClosed;
    private final PublishSubject<DataChangeNotification.NotifyInfo> subject = PublishSubject.create();
    private final ConcurrentMap<Class<?>, Repository> repositories = new ConcurrentHashMap();
    private final ConcurrentMap<String, HashSet<Class<?>>> targets = new ConcurrentHashMap();
    private final Observable<DataChangeNotification.NotifyInfo> notifications = this.subject.asObservable();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/revenj/PostgresDatabaseNotification$Pooling.class */
    public class Pooling implements Runnable {
        private final BaseConnection connection;
        private final Statement ping;

        public Pooling(BaseConnection baseConnection, Statement statement) {
            this.connection = baseConnection;
            this.ping = statement;
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:28:0x0130. Please report as an issue. */
        @Override // java.lang.Runnable
        public void run() {
            PostgresReader postgresReader = new PostgresReader();
            while (!PostgresDatabaseNotification.this.isClosed) {
                try {
                    this.ping.execute("");
                    PGNotification[] notifications = this.connection.getNotifications();
                    if (notifications == null || notifications.length == 0) {
                        try {
                            Thread.sleep(PostgresDatabaseNotification.this.timeout);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    } else {
                        for (PGNotification pGNotification : notifications) {
                            if ("events".equals(pGNotification.getName()) || "aggregate_roots".equals(pGNotification.getName())) {
                                String parameter = pGNotification.getParameter();
                                String substring = parameter.substring(0, parameter.indexOf(58));
                                String substring2 = parameter.substring(substring.length() + 1, parameter.indexOf(58, substring.length() + 1));
                                postgresReader.process(parameter.substring(substring.length() + substring2.length() + 2));
                                List<String> parseCollection = StringConverter.parseCollection(postgresReader, 0, false);
                                if (parseCollection != null && parseCollection.size() > 0) {
                                    String[] strArr = (String[]) parseCollection.toArray(new String[parseCollection.size()]);
                                    boolean z = -1;
                                    switch (substring2.hashCode()) {
                                        case -1754979095:
                                            if (substring2.equals("Update")) {
                                                z = false;
                                                break;
                                            }
                                            break;
                                        case 2017198032:
                                            if (substring2.equals("Change")) {
                                                z = true;
                                                break;
                                            }
                                            break;
                                        case 2043376075:
                                            if (substring2.equals("Delete")) {
                                                z = 2;
                                                break;
                                            }
                                            break;
                                    }
                                    switch (z) {
                                        case false:
                                            PostgresDatabaseNotification.this.subject.onNext(new DataChangeNotification.NotifyInfo(substring, DataChangeNotification.Operation.Update, strArr));
                                            break;
                                        case true:
                                            PostgresDatabaseNotification.this.subject.onNext(new DataChangeNotification.NotifyInfo(substring, DataChangeNotification.Operation.Change, strArr));
                                            break;
                                        case true:
                                            PostgresDatabaseNotification.this.subject.onNext(new DataChangeNotification.NotifyInfo(substring, DataChangeNotification.Operation.Delete, strArr));
                                            break;
                                        default:
                                            PostgresDatabaseNotification.this.subject.onNext(new DataChangeNotification.NotifyInfo(substring, DataChangeNotification.Operation.Insert, strArr));
                                            break;
                                    }
                                }
                            } else if ("migration".equals(pGNotification.getName())) {
                                PostgresDatabaseNotification.this.systemState.notify(new SystemState.SystemEvent("migration", pGNotification.getParameter()));
                            }
                        }
                    }
                } catch (IOException | SQLException e2) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e3) {
                        e3.printStackTrace();
                    }
                    PostgresDatabaseNotification.this.cleanupConnection(this.connection);
                    PostgresDatabaseNotification.this.setupPooling();
                    return;
                }
            }
            PostgresDatabaseNotification.this.cleanupConnection(this.connection);
        }
    }

    public PostgresDatabaseNotification(DataSource dataSource, Optional<DomainModel> optional, Properties properties, SystemState systemState, ServiceLocator serviceLocator) {
        this.dataSource = dataSource;
        this.domainModel = optional;
        this.properties = properties;
        this.systemState = systemState;
        this.locator = serviceLocator;
        String property = properties.getProperty("revenj.notifications.timeout");
        if (property != null) {
            try {
                this.timeout = Integer.parseInt(property);
            } catch (NumberFormatException e) {
                throw new RuntimeException("Error parsing notificationTimeout setting");
            }
        } else {
            this.timeout = 500;
        }
        if ("disabled".equals(properties.getProperty("revenj.notifications.status"))) {
            this.isClosed = true;
        } else {
            setupPooling();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                this.isClosed = true;
            }));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupPooling() {
        this.retryCount++;
        if (this.retryCount > 60) {
            this.retryCount = 30;
        }
        try {
            Connection connection = this.dataSource != null ? this.dataSource.getConnection() : null;
            BaseConnection baseConnection = null;
            if (connection instanceof BaseConnection) {
                baseConnection = (BaseConnection) connection;
            } else {
                if (connection != null) {
                    try {
                        if (connection.isWrapperFor(BaseConnection.class)) {
                            baseConnection = (BaseConnection) connection.unwrap(BaseConnection.class);
                        }
                    } catch (AbstractMethodError e) {
                    }
                }
                if (baseConnection == null && this.properties.containsKey("revenj.jdbcUrl")) {
                    String property = this.properties.getProperty("revenj.user");
                    String property2 = this.properties.getProperty("revenj.password");
                    Driver driver = new Driver();
                    Properties properties = new Properties(this.properties);
                    if (property != null && property2 != null) {
                        properties.setProperty("user", property);
                        properties.setProperty("password", property2);
                    }
                    cleanupConnection(connection);
                    connection = driver.connect(this.properties.getProperty("revenj.jdbcUrl"), properties);
                    if (connection instanceof BaseConnection) {
                        baseConnection = (BaseConnection) connection;
                    }
                }
            }
            if (baseConnection != null) {
                Statement createStatement = baseConnection.createStatement();
                createStatement.execute("LISTEN events; LISTEN aggregate_roots; LISTEN migration;");
                this.retryCount = 0;
                Thread thread = new Thread(new Pooling(baseConnection, createStatement));
                thread.setDaemon(true);
                thread.start();
            } else {
                cleanupConnection(connection);
            }
        } catch (Exception e2) {
            try {
                Thread.sleep(1000 * this.retryCount);
            } catch (InterruptedException e3) {
                e3.printStackTrace();
            }
        }
    }

    private Repository getRepository(Class<?> cls) {
        return this.repositories.computeIfAbsent(cls, cls2 -> {
            try {
                return (Repository) this.locator.resolve(Utils.makeGenericType(Repository.class, cls, new Type[0]));
            } catch (ReflectiveOperationException e) {
                throw new RuntimeException("Repository is not registered for: " + cls, e);
            }
        });
    }

    @Override // org.revenj.patterns.EagerNotification
    public void notify(DataChangeNotification.NotifyInfo notifyInfo) {
        this.subject.onNext(notifyInfo);
    }

    @Override // org.revenj.patterns.DataChangeNotification
    public Observable<DataChangeNotification.NotifyInfo> getNotifications() {
        return this.notifications;
    }

    @Override // org.revenj.patterns.DataChangeNotification
    public <T> Observable<DataChangeNotification.TrackInfo<T>> track(Class<T> cls) {
        return this.notifications.filter(notifyInfo -> {
            HashSet<Class<?>> hashSet = this.targets.get(notifyInfo.name);
            if (hashSet == null) {
                hashSet = new HashSet<>();
                Optional<Class<?>> find = this.domainModel.get().find(notifyInfo.name);
                if (find.isPresent()) {
                    hashSet.add(find.get());
                    Collections.addAll(hashSet, find.get().getInterfaces());
                }
                this.targets.put(notifyInfo.name, hashSet);
            }
            return Boolean.valueOf(hashSet.contains(cls));
        }).map(notifyInfo2 -> {
            return new DataChangeNotification.TrackInfo(notifyInfo2.uris, () -> {
                return getRepository(cls).find(notifyInfo2.uris);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void cleanupConnection(Connection connection) {
        if (connection != null) {
            try {
                if (!connection.isClosed()) {
                    connection.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.isClosed = true;
    }
}
