package com.github.davidmoten.rx.jdbc;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

/* loaded from: input_file:com/github/davidmoten/rx/jdbc/QuerySelectOnSubscribe.class */
final class QuerySelectOnSubscribe<T> implements Observable.OnSubscribe<T> {
    private static final Logger log = LoggerFactory.getLogger(QuerySelectOnSubscribe.class);
    private final ResultSetMapper<? extends T> function;
    private final QuerySelect query;
    private final List<Parameter> parameters;
    private final boolean stateProvided;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> Observable<T> execute(QuerySelect querySelect, List<Parameter> list, ResultSetMapper<? extends T> resultSetMapper) {
        return Observable.create(new QuerySelectOnSubscribe(querySelect, list, resultSetMapper));
    }

    private QuerySelectOnSubscribe(QuerySelect querySelect, List<Parameter> list, ResultSetMapper<? extends T> resultSetMapper) {
        this.query = querySelect;
        this.parameters = list;
        this.function = resultSetMapper;
        this.stateProvided = querySelect.sql().equals("RETURN_GENERATED_KEYS?");
    }

    public void call(Subscriber<? super T> subscriber) {
        State state = null;
        try {
            if (this.stateProvided) {
                state = (State) this.parameters.get(0).value();
                setupUnsubscription(subscriber, state);
            } else {
                state = new State();
                connectAndPrepareStatement(subscriber, state);
                setupUnsubscription(subscriber, state);
                executeQuery(subscriber, state);
            }
            subscriber.setProducer(new QuerySelectProducer(this.function, subscriber, state.con, state.ps, state.rs));
        } catch (Throwable th) {
            this.query.context().endTransactionObserve();
            this.query.context().endTransactionSubscribe();
            if (state != null) {
                try {
                    closeQuietly(state);
                } finally {
                    handleException(th, subscriber);
                }
            }
        }
    }

    private static <T> void setupUnsubscription(Subscriber<T> subscriber, final State state) {
        subscriber.add(Subscriptions.create(new Action0() { // from class: com.github.davidmoten.rx.jdbc.QuerySelectOnSubscribe.1
            public void call() {
                QuerySelectOnSubscribe.closeQuietly(State.this);
            }
        }));
    }

    private void connectAndPrepareStatement(Subscriber<? super T> subscriber, State state) throws SQLException {
        log.debug("connectionProvider={}", this.query.context().connectionProvider());
        if (subscriber.isUnsubscribed()) {
            return;
        }
        log.debug("getting connection");
        state.con = this.query.context().connectionProvider().get();
        log.debug("preparing statement,sql={}", this.query.sql());
        state.ps = state.con.prepareStatement(this.query.sql(), 1003, 1007);
        log.debug("setting parameters");
        Util.setParameters(state.ps, this.parameters, this.query.names());
    }

    private void executeQuery(Subscriber<? super T> subscriber, State state) throws SQLException {
        if (subscriber.isUnsubscribed()) {
            return;
        }
        try {
            log.debug("executing sql={}, parameters {}", this.query.sql(), this.parameters);
            state.rs = (ResultSet) this.query.resultSetTransform().call(this.query.context().resultSetTransform().call(state.ps.executeQuery()));
            log.debug("executed ps={}", state.ps);
        } catch (SQLException e) {
            throw new SQLException("failed to run sql=" + this.query.sql(), e);
        }
    }

    private void handleException(Throwable th, Subscriber<? super T> subscriber) {
        log.debug("onError: {}", th.getMessage());
        Exceptions.throwOrReport(th, subscriber);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void closeQuietly(State state) {
        if (state.closed.compareAndSet(false, true)) {
            log.debug("closing rs");
            Util.closeQuietly(state.rs);
            log.debug("closing ps");
            Util.closeQuietly(state.ps);
            log.debug("closing con");
            Util.closeQuietlyIfAutoCommit(state.con);
            log.debug("closed");
        }
    }
}
