/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.rx.jdbc;

import com.github.davidmoten.rx.jdbc.Parameter;
import com.github.davidmoten.rx.jdbc.QuerySelect;
import com.github.davidmoten.rx.jdbc.QuerySelectProducer;
import com.github.davidmoten.rx.jdbc.ResultSetMapper;
import com.github.davidmoten.rx.jdbc.State;
import com.github.davidmoten.rx.jdbc.Util;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

final class QuerySelectOnSubscribe<T>
implements Observable.OnSubscribe<T> {
    private static final Logger log = LoggerFactory.getLogger(QuerySelectOnSubscribe.class);
    private final ResultSetMapper<? extends T> function;
    private final QuerySelect query;
    private final List<Parameter> parameters;
    private final boolean stateProvided;

    static <T> Observable<T> execute(QuerySelect query, List<Parameter> parameters, ResultSetMapper<? extends T> function) {
        return Observable.create(new QuerySelectOnSubscribe<T>(query, parameters, function));
    }

    private QuerySelectOnSubscribe(QuerySelect query, List<Parameter> parameters, ResultSetMapper<? extends T> function) {
        this.query = query;
        this.parameters = parameters;
        this.function = function;
        this.stateProvided = query.sql().equals("RETURN_GENERATED_KEYS?");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void call(Subscriber<? super T> subscriber) {
        State state = null;
        try {
            if (this.stateProvided) {
                state = (State)this.parameters.get(0).value();
                QuerySelectOnSubscribe.setupUnsubscription(subscriber, state);
            } else {
                state = new State();
                this.connectAndPrepareStatement(subscriber, state);
                QuerySelectOnSubscribe.setupUnsubscription(subscriber, state);
                this.executeQuery(subscriber, state);
            }
            subscriber.setProducer(new QuerySelectProducer<T>(this.function, subscriber, state.con, state.ps, state.rs));
        }
        catch (Throwable e) {
            this.query.context().endTransactionObserve();
            this.query.context().endTransactionSubscribe();
            try {
                if (state != null) {
                    QuerySelectOnSubscribe.closeQuietly(state);
                }
            }
            finally {
                this.handleException(e, subscriber);
            }
        }
    }

    private static <T> void setupUnsubscription(Subscriber<T> subscriber, final State state) {
        subscriber.add(Subscriptions.create((Action0)new Action0(){

            public void call() {
                QuerySelectOnSubscribe.closeQuietly(state);
            }
        }));
    }

    private void connectAndPrepareStatement(Subscriber<? super T> subscriber, State state) throws SQLException {
        log.debug("connectionProvider={}", (Object)this.query.context().connectionProvider());
        if (!subscriber.isUnsubscribed()) {
            log.debug("getting connection");
            state.con = this.query.context().connectionProvider().get();
            log.debug("preparing statement,sql={}", (Object)this.query.sql());
            state.ps = state.con.prepareStatement(this.query.sql(), 1003, 1007);
            log.debug("setting parameters");
            Util.setParameters(state.ps, this.parameters, this.query.names());
        }
    }

    private void executeQuery(Subscriber<? super T> subscriber, State state) throws SQLException {
        if (!subscriber.isUnsubscribed()) {
            try {
                log.debug("executing sql={}, parameters {}", (Object)this.query.sql(), this.parameters);
                state.rs = (ResultSet)this.query.resultSetTransform().call(this.query.context().resultSetTransform().call((Object)state.ps.executeQuery()));
                log.debug("executed ps={}", (Object)state.ps);
            }
            catch (SQLException e) {
                throw new SQLException("failed to run sql=" + this.query.sql(), e);
            }
        }
    }

    private void handleException(Throwable e, Subscriber<? super T> subscriber) {
        log.debug("onError: {}", (Object)e.getMessage());
        Exceptions.throwOrReport((Throwable)e, subscriber);
    }

    private static void closeQuietly(State state) {
        if (state.closed.compareAndSet(false, true)) {
            log.debug("closing rs");
            Util.closeQuietly(state.rs);
            log.debug("closing ps");
            Util.closeQuietly(state.ps);
            log.debug("closing con");
            Util.closeQuietlyIfAutoCommit(state.con);
            log.debug("closed");
        }
    }
}

