package org.revenj;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.sql.DataSource;
import org.postgresql.Driver;
import org.postgresql.PGNotification;
import org.postgresql.core.BaseConnection;
import org.postgresql.core.Notification;
import org.postgresql.core.PGStream;
import org.postgresql.util.HostSpec;
import org.revenj.database.postgres.ConnectionFactory;
import org.revenj.database.postgres.PostgresReader;
import org.revenj.database.postgres.converters.StringConverter;
import org.revenj.extensibility.SystemState;
import org.revenj.patterns.DataChangeNotification;
import org.revenj.patterns.DomainModel;
import org.revenj.patterns.EagerNotification;
import org.revenj.patterns.Repository;
import org.revenj.patterns.ServiceLocator;
import rx.Observable;
import rx.subjects.PublishSubject;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/revenj/PostgresDatabaseNotification.class */
public final class PostgresDatabaseNotification implements EagerNotification, Closeable {
    private final DataSource dataSource;
    private final Optional<DomainModel> domainModel;
    private final SystemState systemState;
    private final ServiceLocator locator;
    private final Properties properties;
    private int retryCount;
    private final int maxTimeout;
    private boolean isClosed;
    private final PublishSubject<DataChangeNotification.NotifyInfo> subject = PublishSubject.create();
    private final ConcurrentMap<Class<?>, Repository> repositories = new ConcurrentHashMap();
    private final ConcurrentMap<String, HashSet<Class<?>>> targets = new ConcurrentHashMap();
    private final Observable<DataChangeNotification.NotifyInfo> notifications = this.subject.asObservable();

    /* loaded from: input_file:org/revenj/PostgresDatabaseNotification$LazyResult.class */
    private class LazyResult<T> implements Callable<List<T>> {
        private final Class<T> manifest;
        private final String[] uris;
        private List<T> result;

        LazyResult(Class<T> cls, String[] strArr) {
            this.manifest = cls;
            this.uris = strArr;
        }

        @Override // java.util.concurrent.Callable
        public List<T> call() throws Exception {
            if (this.result == null) {
                this.result = PostgresDatabaseNotification.this.getRepository(this.manifest).find(this.uris);
            }
            return this.result;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/revenj/PostgresDatabaseNotification$Listening.class */
    public class Listening implements Runnable {
        private final PGStream stream;
        private final InputStream rawStream;

        Listening(PGStream pGStream) throws IOException {
            this.stream = pGStream;
            byte[] bytes = "LISTEN events; LISTEN aggregate_roots; LISTEN migration; LISTEN revenj".getBytes("UTF-8");
            this.rawStream = pGStream.getSocket().getInputStream();
            pGStream.SendChar(81);
            pGStream.SendInteger4(bytes.length + 5);
            pGStream.Send(bytes);
            pGStream.SendChar(0);
            pGStream.flush();
            receiveCommand(pGStream);
            receiveCommand(pGStream);
            receiveCommand(pGStream);
            receiveCommand(pGStream);
            if (pGStream.ReceiveChar() != 90) {
                throw new IOException("Unable to setup Postgres listener");
            }
            if (pGStream.ReceiveInteger4() != 5) {
                throw new IOException("unexpected length of ReadyForQuery packet");
            }
            pGStream.ReceiveChar();
        }

        private void receiveCommand(PGStream pGStream) throws IOException {
            pGStream.ReceiveChar();
            pGStream.Skip(pGStream.ReceiveInteger4() - 4);
        }

        @Override // java.lang.Runnable
        public void run() {
            PostgresReader postgresReader = new PostgresReader();
            PGStream pGStream = this.stream;
            PostgresDatabaseNotification.this.systemState.notify(new SystemState.SystemEvent("notification", "started"));
            while (!PostgresDatabaseNotification.this.isClosed) {
                while (!PostgresDatabaseNotification.this.isClosed && this.rawStream.available() == 0) {
                    try {
                        Thread.sleep(1L);
                    } catch (Exception e) {
                        try {
                            pGStream.close();
                            PostgresDatabaseNotification.this.systemState.notify(new SystemState.SystemEvent("notification", "error: " + e.getMessage()));
                            Thread.sleep(1000L);
                        } catch (Exception e2) {
                            e2.printStackTrace();
                        }
                        PostgresDatabaseNotification.this.setupListening();
                        return;
                    }
                }
                if (!PostgresDatabaseNotification.this.isClosed) {
                    switch (pGStream.ReceiveChar()) {
                        case 65:
                            pGStream.ReceiveInteger4();
                            PostgresDatabaseNotification.this.processNotification(postgresReader, new Notification(pGStream.ReceiveString(), pGStream.ReceiveInteger4(), pGStream.ReceiveString()));
                        case 69:
                            throw new IOException(pGStream.ReceiveString(pGStream.ReceiveInteger4() - 4));
                        default:
                            throw new IOException("Unexpected packet type");
                    }
                }
            }
            try {
                pGStream.close();
            } catch (Exception e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/revenj/PostgresDatabaseNotification$Pooling.class */
    public class Pooling implements Runnable {
        private final BaseConnection connection;
        private final Statement ping;

        Pooling(BaseConnection baseConnection, Statement statement) {
            this.connection = baseConnection;
            this.ping = statement;
        }

        @Override // java.lang.Runnable
        public void run() {
            PostgresReader postgresReader = new PostgresReader();
            int i = PostgresDatabaseNotification.this.maxTimeout;
            PostgresDatabaseNotification.this.systemState.notify(new SystemState.SystemEvent("notification", "started"));
            while (!PostgresDatabaseNotification.this.isClosed) {
                try {
                    this.ping.execute("");
                    PGNotification[] notifications = this.connection.getNotifications();
                    if (notifications == null || notifications.length == 0) {
                        try {
                            Thread.sleep(i);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        if (i < PostgresDatabaseNotification.this.maxTimeout) {
                            i++;
                        }
                    } else {
                        i = 0;
                        for (PGNotification pGNotification : notifications) {
                            PostgresDatabaseNotification.this.processNotification(postgresReader, pGNotification);
                        }
                    }
                } catch (IOException | SQLException e2) {
                    try {
                        PostgresDatabaseNotification.this.systemState.notify(new SystemState.SystemEvent("notification", "error: " + e2.getMessage()));
                        Thread.sleep(1000L);
                    } catch (InterruptedException e3) {
                        e3.printStackTrace();
                    }
                    PostgresDatabaseNotification.this.cleanupConnection(this.connection);
                    PostgresDatabaseNotification.this.setupPooling();
                    return;
                }
            }
            PostgresDatabaseNotification.this.cleanupConnection(this.connection);
        }
    }

    public PostgresDatabaseNotification(DataSource dataSource, Optional<DomainModel> optional, Properties properties, SystemState systemState, ServiceLocator serviceLocator) {
        this.dataSource = dataSource;
        this.domainModel = optional;
        this.properties = properties;
        this.systemState = systemState;
        this.locator = serviceLocator;
        String property = properties.getProperty("revenj.notifications.timeout");
        if (property != null) {
            try {
                this.maxTimeout = Integer.parseInt(property);
            } catch (NumberFormatException e) {
                throw new RuntimeException("Error parsing notificationTimeout setting");
            }
        } else {
            this.maxTimeout = 1000;
        }
        if ("disabled".equals(properties.getProperty("revenj.notifications.status"))) {
            this.isClosed = true;
        } else if ("pooling".equals(properties.getProperty("revenj.notifications.type"))) {
            setupPooling();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                this.isClosed = true;
            }));
        } else {
            setupListening();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                this.isClosed = true;
            }));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupPooling() {
        if (this.dataSource == null) {
            return;
        }
        this.retryCount++;
        if (this.retryCount > 60) {
            this.retryCount = 30;
        }
        try {
            Connection connection = this.dataSource.getConnection();
            BaseConnection baseConnection = null;
            if (connection instanceof BaseConnection) {
                baseConnection = (BaseConnection) connection;
            } else {
                if (connection != null) {
                    try {
                        if (connection.isWrapperFor(BaseConnection.class)) {
                            baseConnection = (BaseConnection) connection.unwrap(BaseConnection.class);
                        }
                    } catch (AbstractMethodError e) {
                    }
                }
                if (baseConnection == null && this.properties.containsKey("revenj.jdbcUrl")) {
                    String property = this.properties.getProperty("revenj.user");
                    String property2 = this.properties.getProperty("revenj.password");
                    Driver driver = new Driver();
                    Properties properties = new Properties(this.properties);
                    if (property != null && property2 != null) {
                        properties.setProperty("user", property);
                        properties.setProperty("password", property2);
                    }
                    cleanupConnection(connection);
                    connection = driver.connect(this.properties.getProperty("revenj.jdbcUrl"), properties);
                    if (connection instanceof BaseConnection) {
                        baseConnection = (BaseConnection) connection;
                    }
                }
            }
            if (baseConnection != null) {
                Statement createStatement = baseConnection.createStatement();
                createStatement.execute("LISTEN events; LISTEN aggregate_roots; LISTEN migration; LISTEN revenj");
                this.retryCount = 0;
                Thread thread = new Thread(new Pooling(baseConnection, createStatement));
                thread.setDaemon(true);
                thread.start();
            } else {
                cleanupConnection(connection);
            }
        } catch (Exception e2) {
            try {
                this.systemState.notify(new SystemState.SystemEvent("notification", "issue: " + e2.getMessage()));
                Thread.sleep(1000 * this.retryCount);
            } catch (InterruptedException e3) {
                e3.printStackTrace();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupListening() {
        this.retryCount++;
        if (this.retryCount > 60) {
            this.retryCount = 30;
        }
        String property = this.properties.getProperty("revenj.jdbcUrl");
        if (property == null || property.isEmpty()) {
            throw new RuntimeException("Unable to read revenj.jdbcUrl from properties. Listening notification is not supported without it.\nEither disable notifications (revenj.notifications.status=disabled), change it to pooling (revenj.notifications.type=pooling) or provide revenj.jdbcUrl to properties.");
        }
        if (!property.startsWith("jdbc:postgresql:") && property.contains("://")) {
            property = "jdbc:postgresql" + property.substring(property.indexOf("://"));
        }
        Properties parseURL = Driver.parseURL(property, this.properties);
        if (parseURL == null) {
            throw new RuntimeException("Unable to parse revenj.jdbcUrl");
        }
        try {
            PGStream openConnection = ConnectionFactory.openConnection(new HostSpec(parseURL.getProperty("PGHOST").split(",")[0], Integer.parseInt(parseURL.getProperty("PGPORT").split(",")[0])), this.properties.containsKey("revenj.user") ? this.properties.getProperty("revenj.user") : parseURL.getProperty("user", ""), this.properties.containsKey("revenj.password") ? this.properties.getProperty("revenj.password") : parseURL.getProperty("password", ""), parseURL.getProperty("PGDBNAME"), this.properties);
            this.retryCount = 0;
            Thread thread = new Thread(new Listening(openConnection));
            thread.setDaemon(true);
            thread.start();
        } catch (Exception e) {
            try {
                this.systemState.notify(new SystemState.SystemEvent("notification", "issue: " + e.getMessage()));
                Thread.sleep(1000 * this.retryCount);
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processNotification(PostgresReader postgresReader, PGNotification pGNotification) throws IOException {
        if (!"events".equals(pGNotification.getName()) && !"aggregate_roots".equals(pGNotification.getName())) {
            this.systemState.notify(new SystemState.SystemEvent(pGNotification.getName(), pGNotification.getParameter()));
            return;
        }
        String parameter = pGNotification.getParameter();
        String substring = parameter.substring(0, parameter.indexOf(58));
        String substring2 = parameter.substring(substring.length() + 1, parameter.indexOf(58, substring.length() + 1));
        postgresReader.process(parameter.substring(substring.length() + substring2.length() + 2));
        List<String> parseCollection = StringConverter.parseCollection(postgresReader, 0, false);
        if (parseCollection == null || parseCollection.size() <= 0) {
            return;
        }
        String[] strArr = (String[]) parseCollection.toArray(new String[parseCollection.size()]);
        boolean z = -1;
        switch (substring2.hashCode()) {
            case -1754979095:
                if (substring2.equals("Update")) {
                    z = false;
                    break;
                }
                break;
            case 2017198032:
                if (substring2.equals("Change")) {
                    z = true;
                    break;
                }
                break;
            case 2043376075:
                if (substring2.equals("Delete")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.subject.onNext(new DataChangeNotification.NotifyInfo(substring, DataChangeNotification.Operation.Update, strArr));
                return;
            case true:
                this.subject.onNext(new DataChangeNotification.NotifyInfo(substring, DataChangeNotification.Operation.Change, strArr));
                return;
            case true:
                this.subject.onNext(new DataChangeNotification.NotifyInfo(substring, DataChangeNotification.Operation.Delete, strArr));
                return;
            default:
                this.subject.onNext(new DataChangeNotification.NotifyInfo(substring, DataChangeNotification.Operation.Insert, strArr));
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Repository getRepository(Class<?> cls) {
        return this.repositories.computeIfAbsent(cls, cls2 -> {
            try {
                return (Repository) this.locator.resolve(Utils.makeGenericType(Repository.class, cls, new Type[0]));
            } catch (ReflectiveOperationException e) {
                throw new RuntimeException("Repository is not registered for: " + cls, e);
            }
        });
    }

    @Override // org.revenj.patterns.EagerNotification
    public void notify(DataChangeNotification.NotifyInfo notifyInfo) {
        this.subject.onNext(notifyInfo);
    }

    @Override // org.revenj.patterns.DataChangeNotification
    public Observable<DataChangeNotification.NotifyInfo> getNotifications() {
        return this.notifications;
    }

    @Override // org.revenj.patterns.DataChangeNotification
    public <T> Observable<DataChangeNotification.TrackInfo<T>> track(Class<T> cls) {
        return this.notifications.filter(notifyInfo -> {
            HashSet<Class<?>> hashSet = this.targets.get(notifyInfo.name);
            if (hashSet == null) {
                hashSet = new HashSet<>();
                Optional<Class<?>> find = this.domainModel.get().find(notifyInfo.name);
                if (find.isPresent()) {
                    hashSet.add(find.get());
                    Collections.addAll(hashSet, find.get().getInterfaces());
                }
                this.targets.put(notifyInfo.name, hashSet);
            }
            return Boolean.valueOf(hashSet.contains(cls));
        }).map(notifyInfo2 -> {
            return new DataChangeNotification.TrackInfo(notifyInfo2.uris, new LazyResult(cls, notifyInfo2.uris));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void cleanupConnection(Connection connection) {
        if (connection != null) {
            try {
                if (!connection.isClosed()) {
                    connection.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.isClosed = true;
    }
}
