package io.reactiverse.neo4j.impl;

import io.reactiverse.neo4j.Neo4jRecordStream;
import io.reactiverse.neo4j.ResultCursor;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.impl.InboundBuffer;
import org.neo4j.driver.Record;
import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.async.AsyncTransaction;

/* loaded from: input_file:io/reactiverse/neo4j/impl/Neo4jRecordStreamImpl.class */
public class Neo4jRecordStreamImpl implements Neo4jRecordStream {
    private final Context context;
    private final AsyncTransaction tx;
    private final AsyncSession session;
    private final ResultCursor cursor;
    private final InboundBuffer<Record> internalQueue;
    private State state = State.IDLE;
    private int inFlight;
    private Handler<Record> handler;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/reactiverse/neo4j/impl/Neo4jRecordStreamImpl$State.class */
    public enum State {
        IDLE,
        STARTED,
        EXHAUSTED,
        STOPPED
    }

    public Neo4jRecordStreamImpl(Context context, AsyncTransaction asyncTransaction, AsyncSession asyncSession, ResultCursor resultCursor) {
        this.context = context;
        this.tx = asyncTransaction;
        this.session = asyncSession;
        this.cursor = resultCursor;
        this.internalQueue = new InboundBuffer(context).exceptionHandler(this::handleException).drainHandler(r3 -> {
            fetchRecord();
        });
    }

    @Override // io.reactiverse.neo4j.Neo4jRecordStream
    public synchronized Neo4jRecordStream exceptionHandler(Handler<Throwable> handler) {
        if (this.state != State.STOPPED) {
            this.exceptionHandler = handler;
        }
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jRecordStream
    public synchronized Neo4jRecordStream handler(Handler<Record> handler) {
        if (this.state == State.STOPPED) {
            return this;
        }
        if (handler == null) {
            stop();
            if (this.context != Vertx.currentContext()) {
                this.context.runOnContext(r3 -> {
                    handleEnd();
                });
            } else {
                handleEnd();
            }
        } else {
            this.handler = handler;
            this.internalQueue.handler(this::handleRecord);
            if (this.state == State.IDLE) {
                this.state = State.STARTED;
                if (this.context != Vertx.currentContext()) {
                    this.context.runOnContext(r32 -> {
                        fetchRecord();
                    });
                } else {
                    fetchRecord();
                }
            }
        }
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jRecordStream
    /* renamed from: pause */
    public synchronized Neo4jRecordStream mo3pause() {
        if (this.state != State.STOPPED) {
            this.internalQueue.pause();
        }
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jRecordStream
    /* renamed from: resume */
    public synchronized Neo4jRecordStream mo2resume() {
        if (this.state != State.STOPPED) {
            this.internalQueue.resume();
        }
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jRecordStream
    public synchronized Neo4jRecordStream endHandler(Handler<Void> handler) {
        if (this.state != State.STOPPED) {
            this.endHandler = handler;
        }
        return this;
    }

    @Override // io.reactiverse.neo4j.Neo4jRecordStream
    /* renamed from: fetch */
    public Neo4jRecordStream mo1fetch(long j) {
        if (this.state != State.STOPPED) {
            this.internalQueue.fetch(j);
        }
        return this;
    }

    private synchronized void fetchRecord() {
        if (this.state == State.STOPPED) {
            return;
        }
        this.cursor.one(asyncResult -> {
            if (asyncResult.succeeded()) {
                handleFetched((Record) asyncResult.result());
            } else {
                handleException(asyncResult.cause());
            }
        });
    }

    private synchronized void handleFetched(Record record) {
        if (this.state == State.STOPPED) {
            return;
        }
        if (record != null) {
            this.inFlight++;
            if (this.internalQueue.write(record)) {
                fetchRecord();
                return;
            }
            return;
        }
        this.state = State.EXHAUSTED;
        if (this.inFlight == 0) {
            stop();
            handleEnd();
        }
    }

    private void handleRecord(Record record) {
        synchronized (this) {
            if (this.state == State.STOPPED) {
                return;
            }
            this.inFlight--;
            this.handler.handle(record);
            synchronized (this) {
                if (this.state == State.EXHAUSTED && this.inFlight == 0) {
                    stop();
                    handleEnd();
                }
            }
        }
    }

    private void handleException(Throwable th) {
        Handler<Throwable> handler;
        synchronized (this) {
            if (this.state != State.STOPPED) {
                stop();
                handler = this.exceptionHandler;
            } else {
                handler = null;
            }
        }
        if (handler != null) {
            handler.handle(th);
        }
    }

    private synchronized void handleEnd() {
        Handler<Void> handler;
        handler = this.endHandler;
        if (handler != null) {
            handler.handle((Object) null);
        }
    }

    private synchronized void stop() {
        this.state = State.STOPPED;
        this.internalQueue.handler((Handler) null).drainHandler((Handler) null);
        this.tx.commitAsync().thenCompose(r3 -> {
            return this.session.closeAsync();
        });
    }

    @Override // io.reactiverse.neo4j.Neo4jRecordStream
    /* renamed from: endHandler */
    public /* bridge */ /* synthetic */ ReadStream mo0endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.reactiverse.neo4j.Neo4jRecordStream
    /* renamed from: handler */
    public /* bridge */ /* synthetic */ ReadStream mo4handler(Handler handler) {
        return handler((Handler<Record>) handler);
    }

    @Override // io.reactiverse.neo4j.Neo4jRecordStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ ReadStream mo5exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.reactiverse.neo4j.Neo4jRecordStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo6exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
