package org.springframework.data.r2dbc.core;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils;
import org.springframework.data.r2dbc.connectionfactory.ReactiveTransactionSynchronization;
import org.springframework.data.r2dbc.connectionfactory.TransactionResources;
import org.springframework.data.r2dbc.core.TransactionalDatabaseClient;
import org.springframework.data.r2dbc.support.R2dbcExceptionTranslator;
import org.springframework.transaction.NoTransactionException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

/* loaded from: input_file:BOOT-INF/lib/spring-data-r2dbc-1.0.0.M2.jar:org/springframework/data/r2dbc/core/DefaultTransactionalDatabaseClient.class */
class DefaultTransactionalDatabaseClient extends DefaultDatabaseClient implements TransactionalDatabaseClient {
    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultTransactionalDatabaseClient(ConnectionFactory connectionFactory, R2dbcExceptionTranslator r2dbcExceptionTranslator, ReactiveDataAccessStrategy reactiveDataAccessStrategy, NamedParameterExpander namedParameterExpander, DefaultDatabaseClientBuilder defaultDatabaseClientBuilder) {
        super(connectionFactory, r2dbcExceptionTranslator, reactiveDataAccessStrategy, namedParameterExpander, defaultDatabaseClientBuilder);
    }

    @Override // org.springframework.data.r2dbc.core.DefaultDatabaseClient, org.springframework.data.r2dbc.core.DatabaseClient
    public TransactionalDatabaseClient.Builder mutate() {
        return (TransactionalDatabaseClient.Builder) super.mutate();
    }

    @Override // org.springframework.data.r2dbc.core.TransactionalDatabaseClient
    public Mono<Void> beginTransaction() {
        return ConnectionFactoryUtils.currentReactiveTransactionSynchronization().map(reactiveTransactionSynchronization -> {
            TransactionResources create = TransactionResources.create();
            reactiveTransactionSynchronization.registerTransaction(create);
            return create;
        }).flatMap(transactionResources -> {
            return ConnectionFactoryUtils.doGetConnection(obtainConnectionFactory());
        }).flatMap(tuple2 -> {
            return Mono.from(((Connection) tuple2.getT1()).beginTransaction());
        });
    }

    @Override // org.springframework.data.r2dbc.core.TransactionalDatabaseClient
    public Mono<Void> commitTransaction() {
        return cleanup((v0) -> {
            return v0.commitTransaction();
        });
    }

    @Override // org.springframework.data.r2dbc.core.TransactionalDatabaseClient
    public Mono<Void> rollbackTransaction() {
        return cleanup((v0) -> {
            return v0.rollbackTransaction();
        });
    }

    @Override // org.springframework.data.r2dbc.core.TransactionalDatabaseClient
    public <T> Flux<T> inTransaction(Function<DatabaseClient, ? extends Publisher<? extends T>> function) {
        return Flux.usingWhen(beginTransaction().thenReturn(this), function, (v0) -> {
            return v0.commitTransaction();
        }, (v0) -> {
            return v0.rollbackTransaction();
        }, (v0) -> {
            return v0.rollbackTransaction();
        }).subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization);
    }

    @Override // org.springframework.data.r2dbc.core.DefaultDatabaseClient
    protected Mono<Connection> getConnection() {
        return ConnectionFactoryUtils.getConnection(obtainConnectionFactory()).map((v0) -> {
            return v0.getT1();
        });
    }

    private static Mono<Void> cleanup(Function<Connection, ? extends Publisher<Void>> function) {
        return ConnectionFactoryUtils.currentActiveReactiveTransactionSynchronization().flatMap(reactiveTransactionSynchronization -> {
            TransactionResources currentTransaction = reactiveTransactionSynchronization.getCurrentTransaction();
            ConnectionFactory connectionFactory = (ConnectionFactory) currentTransaction.getResource(ConnectionFactory.class);
            if (connectionFactory == null) {
                throw new NoTransactionException("No ConnectionFactory attached");
            }
            return Mono.from(connectionFactory.create()).flatMap(connection -> {
                return Mono.from((Publisher) function.apply(connection)).then(ConnectionFactoryUtils.releaseConnection(connection, connectionFactory)).then(ConnectionFactoryUtils.closeConnection(connection, connectionFactory));
            }).doFinally(signalType -> {
                reactiveTransactionSynchronization.unregisterTransaction(currentTransaction);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Context withTransactionSynchronization(Context context) {
        return context.put(ReactiveTransactionSynchronization.class, context.getOrDefault(ReactiveTransactionSynchronization.class, new ReactiveTransactionSynchronization()));
    }
}
