package com.github.pgasync.impl;

import com.github.pgasync.Connection;
import com.github.pgasync.ConnectionPool;
import com.github.pgasync.DatabaseConfig;
import com.github.pgasync.ResultSet;
import com.github.pgasync.Row;
import com.github.pgasync.SqlException;
import com.github.pgasync.Transaction;
import com.github.pgasync.impl.conversion.DataConverter;
import com.github.pgasync.impl.protocol.ProtocolStream;
import io.netty.channel.EventLoopGroup;
import java.beans.ConstructorProperties;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.SingleSubscriber;
import rx.functions.Action0;
import rx.schedulers.Schedulers;

/* loaded from: input_file:com/github/pgasync/impl/PgConnectionPool.class */
public class PgConnectionPool implements ConnectionPool {
    private static final Logger LOG = LoggerFactory.getLogger(PgConnectionPool.class);
    private final Queue<SingleSubscriber<Connection>> subscribers = new LinkedList();
    private final Set<Connection> connections = new HashSet();
    private final Queue<Connection> availableConnections = new LinkedList();
    private final ConnectionPool delegate;
    private final DatabaseConfig config;
    private final DataConverter dataConverter;
    private final EventLoopGroup eventLoopGroup;
    private final NettyScheduler scheduler;
    private int currentSize;
    private volatile boolean closed;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/pgasync/impl/PgConnectionPool$ConnectionPoolWithTimeout.class */
    public class ConnectionPoolWithTimeout implements ConnectionPool {
        private final long timeout;

        /* loaded from: input_file:com/github/pgasync/impl/PgConnectionPool$ConnectionPoolWithTimeout$ReleaseEnforcer.class */
        class ReleaseEnforcer implements Action0 {
            final Connection connection;
            volatile boolean released;

            public void call() {
                if (this.released) {
                    return;
                }
                this.released = true;
                ConnectionPoolWithTimeout.this.releaseIfNotPipelining(this.connection);
            }

            @ConstructorProperties({"connection"})
            public ReleaseEnforcer(Connection connection) {
                this.connection = connection;
            }
        }

        @Override // com.github.pgasync.ConnectionPool
        public Single<Connection> getConnection() {
            return PgConnectionPool.this.getConnection().map(connection -> {
                return connection.withTimeout(this.timeout, TimeUnit.MILLISECONDS);
            });
        }

        @Override // com.github.pgasync.QueryExecutor
        public Single<Transaction> begin() {
            return getConnection().flatMap(connection -> {
                return connection.begin().onErrorResumeNext(th -> {
                    return release(connection).andThen(Single.error(th));
                }).map(transaction -> {
                    return new ReleasingTransaction(connection, transaction);
                });
            });
        }

        @Override // com.github.pgasync.QueryExecutor
        public Observable<Row> queryRows(String str, Object... objArr) {
            return getConnection().doOnSuccess(this::releaseIfPipelining).flatMapObservable(connection -> {
                ReleaseEnforcer releaseEnforcer = new ReleaseEnforcer(connection);
                return connection.queryRows(str, objArr).doOnTerminate(releaseEnforcer).doOnUnsubscribe(releaseEnforcer);
            });
        }

        @Override // com.github.pgasync.QueryExecutor
        public Single<ResultSet> querySet(String str, Object... objArr) {
            return getConnection().doOnSuccess(this::releaseIfPipelining).flatMap(connection -> {
                ReleaseEnforcer releaseEnforcer = new ReleaseEnforcer(connection);
                return connection.querySet(str, objArr).doAfterTerminate(releaseEnforcer).doOnUnsubscribe(releaseEnforcer);
            });
        }

        @Override // com.github.pgasync.Listenable
        public Observable<String> listen(String str) {
            return getConnection().flatMapObservable(connection -> {
                return connection.listen(str).doOnSubscribe(() -> {
                    release(connection).subscribe();
                });
            });
        }

        @Override // com.github.pgasync.ConnectionPool, com.github.pgasync.QueryExecutor
        public ConnectionPool withTimeout(long j, TimeUnit timeUnit) {
            return PgConnectionPool.this.withTimeout(j, timeUnit);
        }

        @Override // com.github.pgasync.ConnectionPool
        public Completable release(Connection connection) {
            return PgConnectionPool.this.release(connection.withTimeout(PgConnectionPool.this.config.statementTimeout(), TimeUnit.SECONDS));
        }

        @Override // com.github.pgasync.Db
        public Completable close() {
            return PgConnectionPool.this.close();
        }

        private void releaseIfPipelining(Connection connection) {
            if (PgConnectionPool.this.config.pipeline()) {
                release(connection).subscribe();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void releaseIfNotPipelining(Connection connection) {
            if (PgConnectionPool.this.config.pipeline()) {
                return;
            }
            release(connection).subscribe();
        }

        @ConstructorProperties({"timeout"})
        public ConnectionPoolWithTimeout(long j) {
            this.timeout = j;
        }
    }

    /* loaded from: input_file:com/github/pgasync/impl/PgConnectionPool$ReleasingTransaction.class */
    class ReleasingTransaction implements Transaction {
        final AtomicBoolean released = new AtomicBoolean();
        final Connection txConnection;
        final Transaction transaction;

        ReleasingTransaction(Connection connection, Transaction transaction) {
            this.txConnection = connection;
            this.transaction = transaction;
        }

        @Override // com.github.pgasync.QueryExecutor
        public Single<Transaction> begin() {
            return this.transaction.begin();
        }

        @Override // com.github.pgasync.Transaction
        public Completable rollback() {
            return this.transaction.rollback().doOnTerminate(this::releaseConnectionImmediately);
        }

        @Override // com.github.pgasync.Transaction
        public Completable commit() {
            return this.transaction.commit().doOnTerminate(this::releaseConnectionImmediately);
        }

        @Override // com.github.pgasync.QueryExecutor
        public Observable<Row> queryRows(String str, Object... objArr) {
            if (this.released.get()) {
                return Observable.error(new SqlException("Transaction is already completed"));
            }
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            return this.transaction.queryRows(str, objArr).onErrorResumeNext(th -> {
                return releaseConnection().andThen(Observable.error(th));
            }).doOnUnsubscribe(() -> {
                if (atomicBoolean.get()) {
                    return;
                }
                releaseConnectionImmediately();
            });
        }

        @Override // com.github.pgasync.QueryExecutor
        public Single<ResultSet> querySet(String str, Object... objArr) {
            if (this.released.get()) {
                return Single.error(new SqlException("Transaction is already completed"));
            }
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            return this.transaction.querySet(str, objArr).doOnSuccess(resultSet -> {
                atomicBoolean.set(true);
            }).onErrorResumeNext(th -> {
                return releaseConnection().andThen(Single.error(th));
            }).doOnUnsubscribe(() -> {
                if (atomicBoolean.get()) {
                    return;
                }
                releaseConnectionImmediately();
            });
        }

        @Override // com.github.pgasync.Transaction, com.github.pgasync.QueryExecutor
        public Transaction withTimeout(long j, TimeUnit timeUnit) {
            return this.transaction.withTimeout(j, timeUnit);
        }

        Completable releaseConnection() {
            return this.released.get() ? Completable.complete() : PgConnectionPool.this.release(this.txConnection).doOnCompleted(() -> {
                this.released.set(true);
            });
        }

        void releaseConnectionImmediately() {
            releaseConnection().subscribe();
        }
    }

    public PgConnectionPool(DatabaseConfig databaseConfig, DataConverter dataConverter, EventLoopGroup eventLoopGroup) {
        this.config = databaseConfig;
        this.dataConverter = dataConverter;
        this.eventLoopGroup = eventLoopGroup;
        this.delegate = new ConnectionPoolWithTimeout(databaseConfig.statementTimeout());
        this.scheduler = NettyScheduler.forEventExecutor(eventLoopGroup.next());
    }

    @Override // com.github.pgasync.QueryExecutor
    public Observable<Row> queryRows(String str, Object... objArr) {
        return this.delegate.queryRows(str, objArr);
    }

    @Override // com.github.pgasync.QueryExecutor
    public Single<ResultSet> querySet(String str, Object... objArr) {
        return this.delegate.querySet(str, objArr);
    }

    @Override // com.github.pgasync.QueryExecutor
    public Single<Transaction> begin() {
        return this.delegate.begin();
    }

    @Override // com.github.pgasync.Listenable
    public Observable<String> listen(String str) {
        return this.delegate.listen(str);
    }

    @Override // com.github.pgasync.ConnectionPool, com.github.pgasync.QueryExecutor
    public ConnectionPool withTimeout(long j, TimeUnit timeUnit) {
        return new ConnectionPoolWithTimeout(timeUnit.toMillis(j));
    }

    @Override // com.github.pgasync.Db
    public Completable close() {
        if (this.closed) {
            return Completable.complete();
        }
        this.closed = true;
        revokeSubscribers();
        return waitForConnectionsToBeReleased().observeOn(Schedulers.computation()).andThen(closeEventLoop()).doOnCompleted(() -> {
            LOG.info("Connection pool closed");
        });
    }

    private Completable closeEventLoop() {
        return Completable.create(completableSubscriber -> {
            LOG.debug("Closing event loop");
            this.eventLoopGroup.shutdownGracefully().addListener(future -> {
                if (future.isSuccess()) {
                    completableSubscriber.onCompleted();
                } else {
                    completableSubscriber.onError(future.cause());
                }
            });
        });
    }

    private Completable waitForConnectionsToBeReleased() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        return Observable.interval(100L, 100L, TimeUnit.MILLISECONDS).doOnSubscribe(() -> {
            LOG.debug("Waiting for connections to be released: {}", Integer.valueOf(this.connections.size()));
        }).doOnNext(l -> {
            Connection poll;
            while (this.currentSize > 0 && (poll = this.availableConnections.poll()) != null) {
                this.currentSize--;
                poll.close();
                this.connections.remove(poll);
            }
            atomicBoolean.set(this.currentSize == 0);
        }).takeWhile(l2 -> {
            return Boolean.valueOf(!atomicBoolean.get());
        }).toCompletable().timeout(this.config.poolCloseTimeout(), TimeUnit.MILLISECONDS).onErrorResumeNext(th -> {
            return forceClose();
        }).doOnCompleted(() -> {
            this.connections.clear();
            this.availableConnections.clear();
        }).subscribeOn(this.scheduler);
    }

    private Completable forceClose() {
        return Completable.fromAction(() -> {
            LOG.warn("Forcing connections to close: {}", Integer.valueOf(this.connections.size()));
            this.connections.stream().map((v0) -> {
                return v0.close();
            }).forEach((v0) -> {
                v0.subscribe();
            });
        });
    }

    private void revokeSubscribers() {
        LOG.debug("Revoking subscribers: {}", Integer.valueOf(this.subscribers.size()));
        this.subscribers.forEach(singleSubscriber -> {
            singleSubscriber.onError(new SqlException("Connection pool is closing"));
        });
        this.subscribers.clear();
    }

    @Override // com.github.pgasync.ConnectionPool
    public Single<Connection> getConnection() {
        return Single.create(this::subscribeForConnection).subscribeOn(this.scheduler);
    }

    @Override // com.github.pgasync.ConnectionPool
    public Completable release(Connection connection) {
        return Completable.create(completableSubscriber -> {
            if (this.connections.contains(connection) && !this.availableConnections.contains(connection)) {
                this.availableConnections.add(connection);
            }
            managePool();
            completableSubscriber.onCompleted();
        }).subscribeOn(this.scheduler);
    }

    private void subscribeForConnection(SingleSubscriber<? super Connection> singleSubscriber) {
        if (this.closed) {
            singleSubscriber.onError(new SqlException("Connection pool is closed"));
        } else {
            this.subscribers.add(singleSubscriber);
            managePool();
        }
    }

    private void openConnectionsIfNecessary() {
        if (this.currentSize >= this.config.poolSize() || this.subscribers.size() <= this.availableConnections.size() || this.closed) {
            return;
        }
        int min = Math.min(this.subscribers.size(), this.config.poolSize() - this.currentSize);
        this.currentSize += min;
        IntStream.range(0, min).forEach(i -> {
            new PgConnection(new ProtocolStream(this.eventLoopGroup, this.config), this.dataConverter).connect(this.config.username(), this.config.password(), this.config.database()).observeOn(this.scheduler).doOnEach(notification -> {
                houseKeepSubscribers();
            }).doOnSuccess(connection -> {
                this.connections.add(connection);
                this.availableConnections.add(connection);
                LOG.info("New connection created [{}/{}]", Integer.valueOf(this.connections.size()), Integer.valueOf(this.config.poolSize()));
                serveAvailableConnections();
            }).doOnError(th -> {
                LOG.debug("Failed to create connection", th);
                this.currentSize--;
                Optional.ofNullable(this.subscribers.remove()).ifPresent(singleSubscriber -> {
                    singleSubscriber.onError(th);
                });
                openConnectionsIfNecessary();
            }).subscribe();
        });
    }

    private void managePool() {
        houseKeepSubscribers();
        houseKeepConnections();
        openConnectionsIfNecessary();
        serveAvailableConnections();
    }

    private void serveAvailableConnections() {
        while (!this.subscribers.isEmpty() && !this.availableConnections.isEmpty() && !this.closed) {
            this.subscribers.poll().onSuccess(this.availableConnections.poll());
        }
    }

    private void houseKeepConnections() {
        List list = (List) ((Map) this.availableConnections.stream().collect(Collectors.partitioningBy((v0) -> {
            return v0.isConnected();
        }))).get(false);
        list.forEach(this::closeConnectionQuietly);
        this.connections.removeAll(list);
        this.availableConnections.removeAll(list);
    }

    private void houseKeepSubscribers() {
        while (!this.subscribers.isEmpty() && this.subscribers.peek().isUnsubscribed()) {
            this.subscribers.remove();
        }
    }

    private void closeConnectionQuietly(Connection connection) {
        LOG.info("Removing dirty connection [{}/{}]", Integer.valueOf(this.currentSize), Integer.valueOf(this.config.poolSize()));
        this.currentSize--;
        try {
            connection.close();
        } catch (Exception e) {
            LOG.debug("Error occurred while closing connection", e);
        }
    }
}
