package com.aerospike.client.proxy;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Log;
import com.aerospike.client.Value;
import com.aerospike.client.command.Command;
import com.aerospike.client.lua.LuaCache;
import com.aerospike.client.lua.LuaInputStream;
import com.aerospike.client.lua.LuaInstance;
import com.aerospike.client.lua.LuaOutputStream;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.proxy.grpc.GrpcCallExecutor;
import com.aerospike.client.proxy.grpc.GrpcConversions;
import com.aerospike.client.query.ResultSet;
import com.aerospike.client.query.Statement;
import com.aerospike.proxy.client.Kvs;
import com.aerospike.proxy.client.QueryGrpc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.luaj.vm2.LuaInteger;
import org.luaj.vm2.LuaValue;

/* loaded from: input_file:com/aerospike/client/proxy/QueryAggregateCommandProxy.class */
public final class QueryAggregateCommandProxy extends MultiCommandProxy implements Runnable {
    private final BlockingQueue<LuaValue> inputQueue;
    private final ResultSetProxy resultSet;
    private final LuaInstance lua;
    private final Statement statement;
    private final AtomicBoolean done;
    private final long taskId;
    private volatile Exception exception;

    public QueryAggregateCommandProxy(GrpcCallExecutor grpcCallExecutor, ThreadFactory threadFactory, QueryPolicy queryPolicy, Statement statement, long j) {
        super(QueryGrpc.getQueryStreamingMethod(), grpcCallExecutor, queryPolicy);
        this.statement = statement;
        this.taskId = j;
        this.inputQueue = new ArrayBlockingQueue(500);
        this.resultSet = new ResultSetProxy(this, queryPolicy.recordQueueSize);
        this.done = new AtomicBoolean();
        LuaValue.valueOf(0);
        this.lua = LuaCache.getInstance();
        try {
            threadFactory.newThread(this).start();
        } catch (RuntimeException e) {
            LuaCache.putInstance(this.lua);
            throw e;
        }
    }

    @Override // com.aerospike.client.proxy.CommandProxy
    void writeCommand(Command command) {
    }

    @Override // com.aerospike.client.proxy.MultiCommandProxy
    void parseResult(Parser parser) {
        int parseHeader = parser.parseHeader();
        parser.skipKey();
        if (parseHeader != 0) {
            if (parseHeader != 2) {
                throw new AerospikeException(parseHeader);
            }
            return;
        }
        if (!this.hasNext) {
            sendCompleted();
            return;
        }
        if (parser.opCount != 1) {
            throw new AerospikeException("Query aggregate expected exactly one bin.  Received " + parser.opCount);
        }
        LuaValue luaAggregateValue = parser.getLuaAggregateValue(this.lua);
        if (this.done.get()) {
            throw new AerospikeException.QueryTerminated();
        }
        if (luaAggregateValue != null) {
            try {
                this.inputQueue.put(luaAggregateValue);
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // com.aerospike.client.proxy.CommandProxy
    void onFailure(AerospikeException aerospikeException) {
        stop(aerospikeException);
    }

    @Override // com.aerospike.client.proxy.CommandProxy
    Kvs.AerospikeRequestPayload.Builder getRequestBuilder() {
        Kvs.AerospikeRequestPayload.Builder newBuilder = Kvs.AerospikeRequestPayload.newBuilder();
        Kvs.QueryRequest.Builder newBuilder2 = Kvs.QueryRequest.newBuilder();
        newBuilder2.setQueryPolicy(GrpcConversions.toGrpc(this.policy));
        newBuilder2.setStatement(GrpcConversions.toGrpc(this.statement, this.taskId, 0L));
        newBuilder.setQueryRequest(newBuilder2.build());
        return newBuilder;
    }

    public void stop(Exception exc) {
        if (this.done.compareAndSet(false, true)) {
            this.exception = exc;
            sendCancel();
        }
    }

    private void sendCompleted() {
        while (true) {
            try {
                this.inputQueue.put(LuaValue.NIL);
                return;
            } catch (InterruptedException e) {
                if (Log.debugEnabled()) {
                    Log.debug("Lua input queue " + this.taskId + " put interrupted");
                }
            }
        }
    }

    private void sendCancel() {
        this.inputQueue.clear();
        this.resultSet.abort();
        while (!this.inputQueue.offer(LuaValue.NIL)) {
            if (this.inputQueue.poll() == null) {
                if (Log.debugEnabled()) {
                    Log.debug("Lua input queue " + this.taskId + " both offer and poll failed on abort");
                    return;
                }
                return;
            }
        }
    }

    public void checkForException() {
        if (this.exception != null) {
            if (!(this.exception instanceof AerospikeException)) {
                throw new AerospikeException(this.exception);
            }
            throw this.exception;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                this.lua.loadPackage(this.statement);
                LuaValue[] luaValueArr = new LuaValue[4 + this.statement.getFunctionArgs().length];
                luaValueArr[0] = this.lua.getFunction(this.statement.getFunctionName());
                luaValueArr[1] = LuaInteger.valueOf(2);
                luaValueArr[2] = new LuaInputStream(this.inputQueue);
                luaValueArr[3] = new LuaOutputStream(this.resultSet);
                int i = 4;
                for (Value value : this.statement.getFunctionArgs()) {
                    int i2 = i;
                    i++;
                    luaValueArr[i2] = value.getLuaValue(this.lua);
                }
                this.lua.call("apply_stream", luaValueArr);
                this.resultSet.put(ResultSet.END);
                LuaCache.putInstance(this.lua);
            } catch (Exception e) {
                stop(e);
                this.resultSet.put(ResultSet.END);
                LuaCache.putInstance(this.lua);
            }
        } catch (Throwable th) {
            this.resultSet.put(ResultSet.END);
            LuaCache.putInstance(this.lua);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getTaskId() {
        return this.taskId;
    }

    public ResultSet getResultSet() {
        return this.resultSet;
    }
}
