package fr.maif.jooq.reactive;

import fr.maif.jooq.PgAsyncConnection;
import fr.maif.jooq.PgAsyncPool;
import fr.maif.jooq.PgAsyncTransaction;
import fr.maif.jooq.QueryResult;
import io.vertx.core.Future;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.TransactionRollbackException;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.jooq.Configuration;
import org.jooq.DSLContext;
import org.jooq.Record;
import org.jooq.ResultQuery;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:fr/maif/jooq/reactive/ReactivePgAsyncPool.class */
public class ReactivePgAsyncPool extends AbstractReactivePgAsyncClient<PgPool> implements PgAsyncPool {
    public ReactivePgAsyncPool(PgPool pgPool, Configuration configuration) {
        super(pgPool, configuration);
    }

    public <T> CompletionStage<T> inTransaction(Function<PgAsyncTransaction, CompletionStage<T>> function) {
        return FutureConversions.fromVertx(this.client.getConnection().flatMap(sqlConnection -> {
            return sqlConnection.begin().flatMap(transaction -> {
                return FutureConversions.toVertx((CompletionStage) function.apply(new ReactivePgAsyncTransaction(sqlConnection, transaction, this.configuration))).compose(obj -> {
                    return transaction.commit().flatMap(r3 -> {
                        return Future.succeededFuture(obj);
                    });
                }, th -> {
                    return th instanceof TransactionRollbackException ? Future.failedFuture(th) : transaction.rollback().compose(r3 -> {
                        return Future.failedFuture(th);
                    }, th -> {
                        return Future.failedFuture(th);
                    });
                });
            }).onComplete(asyncResult -> {
                sqlConnection.close();
            });
        }));
    }

    public CompletionStage<PgAsyncConnection> connection() {
        return FutureConversions.fromVertx(this.client.getConnection()).thenApply(sqlConnection -> {
            return new ReactivePgAsyncConnection(sqlConnection, this.configuration);
        });
    }

    public CompletionStage<PgAsyncTransaction> begin() {
        return FutureConversions.fromVertx(this.client.getConnection()).thenCompose(sqlConnection -> {
            return FutureConversions.fromVertx(sqlConnection.begin()).thenApply(transaction -> {
                return new ReactivePgAsyncTransaction(sqlConnection, transaction, this.configuration);
            });
        });
    }

    public <Q extends Record> Publisher<QueryResult> stream(Integer num, Function<DSLContext, ? extends ResultQuery<Q>> function) {
        return Mono.fromCompletionStage(connection()).flux().concatMap(pgAsyncConnection -> {
            return Mono.fromCompletionStage(pgAsyncConnection.begin()).flux().concatMap(pgAsyncTransaction -> {
                return Flux.from(pgAsyncTransaction.stream(num, function)).doOnComplete(() -> {
                    LOGGER.debug("Stream terminated correctly");
                    pgAsyncTransaction.commit();
                }).doOnError(th -> {
                    LOGGER.error("Stream terminated with error", th);
                    pgAsyncTransaction.rollback();
                });
            });
        });
    }
}
