package org.hswebframework.ezorm.rdb.executor.reactive.r2dbc;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Date;
import java.util.stream.Collectors;
import org.hswebframework.ezorm.core.CastUtil;
import org.hswebframework.ezorm.rdb.executor.BatchSqlRequest;
import org.hswebframework.ezorm.rdb.executor.DefaultColumnWrapperContext;
import org.hswebframework.ezorm.rdb.executor.NullValue;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.hswebframework.ezorm.rdb.utils.SqlUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

/* loaded from: input_file:org/hswebframework/ezorm/rdb/executor/reactive/r2dbc/R2dbcReactiveSqlExecutor.class */
public abstract class R2dbcReactiveSqlExecutor implements ReactiveSqlExecutor {
    private static final Logger log = LoggerFactory.getLogger(R2dbcReactiveSqlExecutor.class);
    private Logger logger = log;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/hswebframework/ezorm/rdb/executor/reactive/r2dbc/R2dbcReactiveSqlExecutor$Interrupted.class */
    public enum Interrupted {
        instance;

        static boolean nonInterrupted(Object obj) {
            return obj != instance;
        }
    }

    protected abstract Mono<Connection> getConnection();

    protected abstract void releaseConnection(SignalType signalType, Connection connection);

    protected Flux<Result> doExecute(Connection connection, SqlRequest sqlRequest) {
        Flux from = Flux.from(prepareStatement(connection.createStatement(sqlRequest.getSql()), sqlRequest).execute());
        Class<Result> cls = Result.class;
        Result.class.getClass();
        return from.map((v1) -> {
            return r1.cast(v1);
        }).doOnSubscribe(subscription -> {
            SqlUtils.printSql(this.logger, sqlRequest);
        }).doOnError(th -> {
            this.logger.error("==>      Error: {}", sqlRequest.toNativeSql(), th);
        });
    }

    private Flux<Result> doExecute(Flux<SqlRequest> flux) {
        return getConnection().flatMapMany(connection -> {
            return flux.flatMap(sqlRequest -> {
                return doExecute(connection, sqlRequest);
            }).doFinally(signalType -> {
                releaseConnection(signalType, connection);
            });
        });
    }

    @Override // org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor
    public Mono<Integer> update(Publisher<SqlRequest> publisher) {
        return doExecute(toFlux(publisher)).flatMap(result -> {
            return Mono.from(result.getRowsUpdated()).defaultIfEmpty(0);
        }).doOnNext(num -> {
            this.logger.debug("==>    Updated: {}", num);
        }).collect(Collectors.summingInt((v0) -> {
            return v0.intValue();
        })).defaultIfEmpty(0);
    }

    @Override // org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor
    public Mono<Void> execute(Publisher<SqlRequest> publisher) {
        return doExecute(toFlux(publisher)).flatMap((v0) -> {
            return v0.getRowsUpdated();
        }).then();
    }

    @Override // org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor
    public <E> Flux<E> select(Publisher<SqlRequest> publisher, ResultWrapper<E, ?> resultWrapper) {
        Flux map = ((Flux) toFlux(publisher).as(this::doExecute)).flatMap(result -> {
            return result.map((row, rowMetadata) -> {
                ArrayList arrayList = new ArrayList(rowMetadata.getColumnNames());
                resultWrapper.beforeWrap(() -> {
                    return arrayList;
                });
                Object newRowInstance = resultWrapper.newRowInstance();
                int size = arrayList.size();
                for (int i = 0; i < size; i++) {
                    String str = (String) arrayList.get(i);
                    DefaultColumnWrapperContext defaultColumnWrapperContext = new DefaultColumnWrapperContext(i, str, row.get(str), newRowInstance);
                    resultWrapper.wrapColumn(defaultColumnWrapperContext);
                    newRowInstance = defaultColumnWrapperContext.getRowInstance();
                }
                return !resultWrapper.completedWrapRow(newRowInstance) ? Interrupted.instance : newRowInstance;
            });
        }).takeWhile(Interrupted::nonInterrupted).map(CastUtil::cast);
        resultWrapper.getClass();
        Flux doOnCancel = map.doOnCancel(resultWrapper::completedWrap);
        resultWrapper.getClass();
        return doOnCancel.doOnComplete(resultWrapper::completedWrap);
    }

    protected Flux<SqlRequest> toFlux(Publisher<SqlRequest> publisher) {
        return Flux.from(publisher).flatMap(sqlRequest -> {
            return sqlRequest instanceof BatchSqlRequest ? Flux.concat(new Publisher[]{Flux.just(sqlRequest), Flux.fromIterable(((BatchSqlRequest) sqlRequest).getBatch())}) : Flux.just(sqlRequest);
        }).filter((v0) -> {
            return v0.isNotEmpty();
        }).map(this::convertRequest);
    }

    protected SqlRequest convertRequest(SqlRequest sqlRequest) {
        return R2dbcSqlRequest.of(getBindFirstIndex(), getBindSymbol(), sqlRequest);
    }

    protected String getBindSymbol() {
        return "$";
    }

    protected int getBindFirstIndex() {
        return 1;
    }

    protected void bindNull(Statement statement, int i, Class<?> cls) {
        if (cls == Date.class) {
            cls = LocalDateTime.class;
        }
        statement.bindNull(getBindSymbol() + (i + getBindFirstIndex()), cls);
    }

    protected void bind(Statement statement, int i, Object obj) {
        if (obj instanceof Date) {
            obj = ((Date) obj).toInstant().atZone(ZoneOffset.systemDefault()).toLocalDateTime();
        }
        statement.bind(getBindSymbol() + (i + getBindFirstIndex()), obj);
    }

    protected Statement prepareStatement(Statement statement, SqlRequest sqlRequest) {
        if (sqlRequest.isEmpty() || sqlRequest.getParameters() == null) {
            return statement;
        }
        int i = 0;
        for (Object obj : sqlRequest.getParameters()) {
            if (obj == null) {
                bindNull(statement, i, String.class);
            } else if (obj instanceof NullValue) {
                bindNull(statement, i, ((NullValue) obj).getDataType().getJavaType());
            } else {
                bind(statement, i, obj);
            }
            i++;
        }
        return statement;
    }

    public Logger getLogger() {
        return this.logger;
    }

    public void setLogger(Logger logger) {
        this.logger = logger;
    }
}
