package io.trane.ndbc.postgres;

import io.trane.future.Future;
import io.trane.future.InterruptHandler;
import io.trane.future.Promise;
import io.trane.ndbc.PreparedStatement;
import io.trane.ndbc.Row;
import io.trane.ndbc.flow.Flow;
import io.trane.ndbc.postgres.proto.ExtendedQueryStreamExchange;
import io.trane.ndbc.postgres.proto.Message;
import io.trane.ndbc.postgres.proto.marshaller.Marshallers;
import io.trane.ndbc.proto.Channel;
import io.trane.ndbc.proto.Exchange;
import io.trane.ndbc.util.NonFatalException;
import io.trane.ndbc.value.Value;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/trane/ndbc/postgres/Connection.class */
public final class Connection implements io.trane.ndbc.datasource.Connection {
    private static final Logger log = LoggerFactory.getLogger(Connection.class);
    private static final PreparedStatement isValidQuery = PreparedStatement.create("SELECT 1");
    private final Channel channel;
    private final Marshallers marshallers;
    private final Optional<Duration> queryTimeout;
    private final ScheduledExecutorService scheduler;
    private final Supplier<? extends Future<? extends Channel>> channelSupplier;
    private final Optional<Message.BackendKeyData> backendKeyData;
    private final Function<String, Exchange<List<Row>>> simpleQueryExchange;
    private final Function<String, Exchange<Long>> simpleExecuteExchange;
    private final BiFunction<String, List<Value<?>>, Exchange<List<Row>>> extendedQueryExchange;
    private final BiFunction<String, List<Value<?>>, Exchange<Long>> extendedExecuteExchange;
    private final BiFunction<String, List<Value<?>>, Exchange<ExtendedQueryStreamExchange.Fetch>> extendedQueryStreamExchange;
    private final AtomicReference<Future<?>> mutex = new AtomicReference<>();

    public Connection(Channel channel, Marshallers marshallers, Optional<Duration> optional, ScheduledExecutorService scheduledExecutorService, Supplier<? extends Future<? extends Channel>> supplier, Optional<Message.BackendKeyData> optional2, Function<String, Exchange<List<Row>>> function, Function<String, Exchange<Long>> function2, BiFunction<String, List<Value<?>>, Exchange<List<Row>>> biFunction, BiFunction<String, List<Value<?>>, Exchange<ExtendedQueryStreamExchange.Fetch>> biFunction2, BiFunction<String, List<Value<?>>, Exchange<Long>> biFunction3) {
        this.channel = channel;
        this.marshallers = marshallers;
        this.queryTimeout = optional;
        this.scheduler = scheduledExecutorService;
        this.channelSupplier = supplier;
        this.backendKeyData = optional2;
        this.simpleQueryExchange = function;
        this.simpleExecuteExchange = function2;
        this.extendedQueryExchange = biFunction;
        this.extendedQueryStreamExchange = biFunction2;
        this.extendedExecuteExchange = biFunction3;
    }

    public final Future<List<Row>> query(String str) {
        return run(this.simpleQueryExchange.apply(str));
    }

    public final Future<Long> execute(String str) {
        return run(this.simpleExecuteExchange.apply(str));
    }

    public final Future<List<Row>> query(PreparedStatement preparedStatement) {
        return run(this.extendedQueryExchange.apply(preparedStatement.query(), preparedStatement.params()));
    }

    public final Flow<Row> stream(PreparedStatement preparedStatement) {
        Future run = run(this.extendedQueryStreamExchange.apply(preparedStatement.query(), preparedStatement.params()));
        return Flow.batched(l -> {
            return Flow.from(run.flatMap(fetch -> {
                return run(fetch.fetch(l.intValue()));
            }).map(Flow::from));
        });
    }

    public final Future<Long> execute(PreparedStatement preparedStatement) {
        return run(this.extendedExecuteExchange.apply(preparedStatement.query(), preparedStatement.params()));
    }

    public final Future<Boolean> isValid() {
        return query(isValidQuery).map(list -> {
            return true;
        }).rescue(th -> {
            return Future.FALSE;
        });
    }

    public final Future<Void> close() {
        return Exchange.CLOSE.run(this.channel);
    }

    private final <T> Future<T> run(Exchange<T> exchange) {
        Promise create = Promise.create(this::handler);
        Future<?> andSet = this.mutex.getAndSet(create);
        if (andSet == null) {
            create.become(execute(exchange));
        } else {
            andSet.ensure(() -> {
                create.become(execute(exchange));
            });
        }
        return create;
    }

    private final <T> Future<T> execute(Exchange<T> exchange) {
        try {
            Future run = exchange.run(this.channel);
            return (Future) this.queryTimeout.map(duration -> {
                return run.within(duration, this.scheduler);
            }).orElse(run);
        } catch (Throwable th) {
            NonFatalException.verify(th);
            return Future.exception(th);
        }
    }

    private final <T> InterruptHandler handler(Promise<T> promise) {
        return th -> {
            this.backendKeyData.ifPresent(backendKeyData -> {
                this.channelSupplier.get().flatMap(channel -> {
                    return Exchange.send(this.marshallers.cancelRequest, new Message.CancelRequest(backendKeyData.processId, backendKeyData.secretKey)).then(Exchange.CLOSE).run(channel);
                }).onFailure(th -> {
                    log.warn("Can't cancel request. Reason: " + th);
                });
            });
        };
    }

    public Future<Void> beginTransaction() {
        return execute("BEGIN").voided();
    }

    public Future<Void> commit() {
        return execute("COMMIT").voided();
    }

    public Future<Void> rollback() {
        return execute("ROLLBACK").voided();
    }
}
