package io.reactiverse.neo4j.impl;

import io.reactiverse.neo4j.Neo4jClient;
import io.reactiverse.neo4j.Neo4jRecordStream;
import io.reactiverse.neo4j.Neo4jTransaction;
import io.reactiverse.neo4j.Util;
import io.reactiverse.neo4j.VisibleForTesting;
import io.reactiverse.neo4j.options.Neo4jClientOptions;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.shareddata.LocalMap;
import io.vertx.core.shareddata.Shareable;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.internal.summary.InternalSummaryCounters;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.driver.summary.SummaryCounters;

/* loaded from: input_file:io/reactiverse/neo4j/impl/Neo4jClientImpl.class */
public class Neo4jClientImpl implements Neo4jClient {
    private static final String NEO4J_CLIENT_MAP_NAME = "__vertx.Neo4jClient.datasources";
    private final Vertx vertx;
    private Neo4jHolder neo4jHolder;
    private Driver driver;
    private static final SessionConfig DEFAULT_WRITE_SESSION_CONFIG = SessionConfig.builder().withDefaultAccessMode(AccessMode.WRITE).build();
    private static final SessionConfig DEFAULT_READ_SESSION_CONFIG = SessionConfig.builder().withDefaultAccessMode(AccessMode.READ).build();
    private static final Value EMPTY = Values.parameters(new Object[0]);

    @VisibleForTesting
    static final BinaryOperator<SummaryCounters> AGGREGATE_COUNTERS = (summaryCounters, summaryCounters2) -> {
        return new InternalSummaryCounters(summaryCounters.nodesCreated() + summaryCounters2.nodesCreated(), summaryCounters.nodesDeleted() + summaryCounters2.nodesDeleted(), summaryCounters.relationshipsCreated() + summaryCounters2.relationshipsCreated(), summaryCounters.relationshipsDeleted() + summaryCounters2.relationshipsDeleted(), summaryCounters.propertiesSet() + summaryCounters2.propertiesSet(), summaryCounters.labelsAdded() + summaryCounters2.labelsAdded(), summaryCounters.labelsRemoved() + summaryCounters2.labelsRemoved(), summaryCounters.indexesAdded() + summaryCounters2.indexesAdded(), summaryCounters.indexesRemoved() + summaryCounters2.indexesRemoved(), summaryCounters.constraintsAdded() + summaryCounters2.constraintsAdded(), summaryCounters.constraintsRemoved() + summaryCounters2.constraintsRemoved(), summaryCounters.systemUpdates() + summaryCounters2.systemUpdates());
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/reactiverse/neo4j/impl/Neo4jClientImpl$Neo4jHolder.class */
    public static class Neo4jHolder implements Shareable {
        Driver driver;
        Neo4jClientOptions config;
        Runnable closeRunner;
        int refCount = 1;

        Neo4jHolder(Neo4jClientOptions neo4jClientOptions, Runnable runnable) {
            this.config = neo4jClientOptions;
            this.closeRunner = runnable;
        }

        synchronized Driver neo4jDriver() {
            if (this.driver == null) {
                Driver driver = new DriverSupplier(this.config).get();
                driver.verifyConnectivity();
                this.driver = driver;
            }
            return this.driver;
        }

        synchronized void incRefCount() {
            this.refCount++;
        }

        synchronized void close() {
            int i = this.refCount - 1;
            this.refCount = i;
            if (i == 0) {
                if (this.driver != null) {
                    this.driver.close();
                }
                if (this.closeRunner != null) {
                    this.closeRunner.run();
                }
            }
        }
    }

    public Neo4jClientImpl(Vertx vertx, Neo4jClientOptions neo4jClientOptions, String str) {
        Objects.requireNonNull(vertx);
        Objects.requireNonNull(neo4jClientOptions);
        Objects.requireNonNull(str);
        this.vertx = vertx;
        this.neo4jHolder = lookupHolder(neo4jClientOptions, str);
        this.driver = this.neo4jHolder.neo4jDriver();
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public Neo4jClient execute(String str, Handler<AsyncResult<ResultSummary>> handler) {
        execute(str, EMPTY, handler);
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public Neo4jClient execute(String str, Value value, Handler<AsyncResult<ResultSummary>> handler) {
        executeWriteTransaction(str, value, (v0) -> {
            return v0.consumeAsync();
        }, handler);
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public Neo4jClient delete(String str, Handler<AsyncResult<List<Record>>> handler) {
        return delete(str, EMPTY, handler);
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public Neo4jClient delete(String str, Value value, Handler<AsyncResult<List<Record>>> handler) {
        executeWriteTransaction(str, value, (v0) -> {
            return v0.listAsync();
        }, handler);
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public Neo4jClient findOne(String str, Handler<AsyncResult<Record>> handler) {
        findOne(str, EMPTY, handler);
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public Neo4jClient findOne(String str, Value value, Handler<AsyncResult<Record>> handler) {
        executeReadTransaction(str, value, (v0) -> {
            return v0.singleAsync();
        }, handler);
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public Neo4jClient find(String str, Handler<AsyncResult<List<Record>>> handler) {
        find(str, EMPTY, handler);
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public Neo4jClient find(String str, Value value, Handler<AsyncResult<List<Record>>> handler) {
        executeReadTransaction(str, value, (v0) -> {
            return v0.listAsync();
        }, handler);
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public Neo4jClient bulkWrite(List<Query> list, Handler<AsyncResult<SummaryCounters>> handler) {
        AsyncSession asyncSession = this.driver.asyncSession(DEFAULT_WRITE_SESSION_CONFIG);
        asyncSession.writeTransactionAsync(asyncTransaction -> {
            CompletableFuture completedFuture = CompletableFuture.completedFuture(InternalSummaryCounters.EMPTY_STATS);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Query query = (Query) it.next();
                completedFuture = completedFuture.thenCompose(summaryCounters -> {
                    return asyncTransaction.runAsync(query).thenCompose((v0) -> {
                        return v0.consumeAsync();
                    }).thenApply((v0) -> {
                        return v0.counters();
                    }).thenApply(summaryCounters -> {
                        return (SummaryCounters) AGGREGATE_COUNTERS.apply(summaryCounters, summaryCounters);
                    });
                });
            }
            return completedFuture;
        }).whenComplete(Util.wrapCallback(this.vertx.getOrCreateContext(), handler)).thenCompose(summaryCounters -> {
            return asyncSession.closeAsync();
        });
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public Neo4jClient begin(Handler<AsyncResult<Neo4jTransaction>> handler) {
        AsyncSession asyncSession = this.driver.asyncSession(DEFAULT_WRITE_SESSION_CONFIG);
        Context orCreateContext = this.vertx.getOrCreateContext();
        asyncSession.beginTransactionAsync().thenAccept(asyncTransaction -> {
            orCreateContext.runOnContext(r11 -> {
                handler.handle(Future.succeededFuture(new Neo4jTransactionImpl(this.vertx, asyncTransaction, asyncSession)));
            });
        }).exceptionally(th -> {
            orCreateContext.runOnContext(r5 -> {
                handler.handle(Future.failedFuture(th));
            });
            asyncSession.closeAsync();
            return null;
        });
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public Neo4jClient queryStream(String str, Handler<AsyncResult<Neo4jRecordStream>> handler) {
        return queryStream(str, EMPTY, handler);
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public Neo4jClient queryStream(String str, Value value, Handler<AsyncResult<Neo4jRecordStream>> handler) {
        AsyncSession asyncSession = this.driver.asyncSession(DEFAULT_READ_SESSION_CONFIG);
        Context orCreateContext = this.vertx.getOrCreateContext();
        asyncSession.beginTransactionAsync().thenAccept(asyncTransaction -> {
            asyncTransaction.runAsync(str, value).thenAccept(resultCursor -> {
                orCreateContext.runOnContext(r17 -> {
                    handler.handle(Future.succeededFuture(new Neo4jRecordStreamImpl(orCreateContext, asyncTransaction, asyncSession, new ResultCursorImpl(resultCursor, this.vertx))));
                });
            });
        }).exceptionally(th -> {
            orCreateContext.runOnContext(r5 -> {
                handler.handle(Future.failedFuture(th));
            });
            asyncSession.closeAsync();
            return null;
        });
        return this;
    }

    private <T> void executeWriteTransaction(String str, Value value, Function<ResultCursor, CompletionStage<T>> function, Handler<AsyncResult<T>> handler) {
        AsyncSession asyncSession = this.driver.asyncSession(DEFAULT_WRITE_SESSION_CONFIG);
        asyncSession.writeTransactionAsync(asyncTransaction -> {
            return asyncTransaction.runAsync(str, value).thenCompose(function);
        }).whenComplete(Util.wrapCallback(this.vertx.getOrCreateContext(), handler)).thenCompose(obj -> {
            return asyncSession.closeAsync();
        });
    }

    private <T> void executeReadTransaction(String str, Value value, Function<ResultCursor, CompletionStage<T>> function, Handler<AsyncResult<T>> handler) {
        AsyncSession asyncSession = this.driver.asyncSession(DEFAULT_READ_SESSION_CONFIG);
        asyncSession.readTransactionAsync(asyncTransaction -> {
            return asyncTransaction.runAsync(str, value).thenCompose(function);
        }).whenComplete(Util.wrapCallback(this.vertx.getOrCreateContext(), handler)).thenCompose(obj -> {
            return asyncSession.closeAsync();
        });
    }

    @Override // io.reactiverse.neo4j.Neo4jClient
    public void close() {
        this.neo4jHolder.close();
    }

    private Neo4jHolder lookupHolder(Neo4jClientOptions neo4jClientOptions, String str) {
        Neo4jHolder neo4jHolder;
        synchronized (this.vertx) {
            LocalMap localMap = this.vertx.sharedData().getLocalMap(NEO4J_CLIENT_MAP_NAME);
            Neo4jHolder neo4jHolder2 = (Neo4jHolder) localMap.get(str);
            if (neo4jHolder2 == null) {
                neo4jHolder2 = new Neo4jHolder(neo4jClientOptions, () -> {
                    removeFromMap(localMap, str);
                });
                localMap.put(str, neo4jHolder2);
            } else {
                neo4jHolder2.incRefCount();
            }
            neo4jHolder = neo4jHolder2;
        }
        return neo4jHolder;
    }

    private void removeFromMap(LocalMap<String, Neo4jHolder> localMap, String str) {
        synchronized (this.vertx) {
            localMap.remove(str);
            if (localMap.isEmpty()) {
                localMap.close();
            }
        }
    }
}
