/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.dse.driver.internal.core.cql.continuous;

import com.datastax.dse.driver.DseSessionMetric;
import com.datastax.dse.driver.api.core.DseProtocolVersion;
import com.datastax.dse.driver.api.core.config.DseDriverOption;
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet;
import com.datastax.dse.driver.internal.core.DseProtocolFeature;
import com.datastax.dse.driver.internal.core.cql.DseConversions;
import com.datastax.dse.driver.internal.core.cql.continuous.DefaultContinuousAsyncResultSet;
import com.datastax.dse.protocol.internal.request.Revise;
import com.datastax.dse.protocol.internal.response.result.DseRowsMetadata;
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.RequestThrottlingException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.config.DriverOption;
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.detach.AttachmentPoint;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.TokenMap;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
import com.datastax.oss.driver.api.core.retry.RetryDecision;
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
import com.datastax.oss.driver.api.core.servererrors.BootstrappingException;
import com.datastax.oss.driver.api.core.servererrors.CoordinatorException;
import com.datastax.oss.driver.api.core.servererrors.FunctionFailureException;
import com.datastax.oss.driver.api.core.servererrors.ProtocolError;
import com.datastax.oss.driver.api.core.servererrors.QueryValidationException;
import com.datastax.oss.driver.api.core.servererrors.ReadTimeoutException;
import com.datastax.oss.driver.api.core.servererrors.UnavailableException;
import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.session.throttling.RequestThrottler;
import com.datastax.oss.driver.api.core.session.throttling.Throttled;
import com.datastax.oss.driver.internal.core.ProtocolFeature;
import com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler;
import com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler;
import com.datastax.oss.driver.internal.core.adminrequest.UnexpectedResponseException;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.channel.ResponseCallback;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.cql.Conversions;
import com.datastax.oss.driver.internal.core.cql.DefaultExecutionInfo;
import com.datastax.oss.driver.internal.core.cql.DefaultRow;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
import com.datastax.oss.driver.internal.core.util.CountingIterator;
import com.datastax.oss.driver.internal.core.util.Loggers;
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.request.Prepare;
import com.datastax.oss.protocol.internal.response.Error;
import com.datastax.oss.protocol.internal.response.Result;
import com.datastax.oss.protocol.internal.response.error.Unprepared;
import com.datastax.oss.protocol.internal.response.result.Rows;
import com.datastax.oss.protocol.internal.response.result.RowsMetadata;
import com.datastax.oss.protocol.internal.util.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.netty.handler.codec.EncoderException;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class ContinuousCqlRequestHandler
implements ResponseCallback,
GenericFutureListener<Future<Void>>,
Throttled {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousCqlRequestHandler.class);
    private final String logPrefix;
    private final Statement<?> statement;
    private final DefaultSession session;
    private final InternalDriverContext context;
    private final DriverExecutionProfile executionProfile;
    private final Queue<Node> queryPlan;
    private final Set<Node> replicas;
    private final RetryPolicy retryPolicy;
    private final RequestThrottler throttler;
    private final int maxEnqueuedPages;
    private final int maxPages;
    private final boolean protocolBackpressureAvailable;
    private final boolean isIdempotent;
    private final Message message;
    private final Duration timeoutFirstPage;
    private final Duration timeoutOtherPages;
    private final Timer timer;
    private final List<Map.Entry<Node, Throwable>> errors = new CopyOnWriteArrayList<Map.Entry<Node, Throwable>>();
    private final ReentrantLock lock = new ReentrantLock();
    @GuardedBy(value="lock")
    private final Queue<Object> queue;
    @GuardedBy(value="lock")
    @VisibleForTesting
    CompletableFuture<ContinuousAsyncResultSet> pendingResult;
    @GuardedBy(value="lock")
    private int numPagesRequested;
    @GuardedBy(value="lock")
    @VisibleForTesting
    int state = 1;
    private static final int STATE_FINISHED = -1;
    private static final int STATE_FAILED = -2;
    private volatile long startTimeNanos;
    private volatile ColumnDefinitions columnDefinitions;
    private volatile Node node;
    private volatile DriverChannel channel;
    private volatile int streamId;
    private volatile long messageStartTimeNanos;
    private volatile Timeout timeout;
    private volatile int retryCount;

    public ContinuousCqlRequestHandler(@NonNull Statement<?> statement, @NonNull DefaultSession session, @NonNull InternalDriverContext context, @NonNull String sessionLogPrefix) {
        ProtocolVersion protocolVersion = context.getProtocolVersion();
        if (!context.getProtocolVersionRegistry().supports(protocolVersion, (ProtocolFeature)DseProtocolFeature.CONTINUOUS_PAGING)) {
            throw new IllegalStateException("Cannot execute continuous paging requests with protocol version " + protocolVersion);
        }
        this.logPrefix = sessionLogPrefix + "|" + this.hashCode();
        LOG.trace("[{}] Creating new continuous handler for request {}", (Object)this.logPrefix, statement);
        this.statement = statement;
        this.session = session;
        this.context = context;
        if (statement.getExecutionProfile() != null) {
            this.executionProfile = statement.getExecutionProfile();
        } else {
            DriverConfig config = context.getConfig();
            String profileName = statement.getExecutionProfileName();
            this.executionProfile = profileName == null || profileName.isEmpty() ? config.getDefaultProfile() : config.getProfile(profileName);
        }
        this.queryPlan = statement.getNode() != null ? new QueryPlan(new Object[]{statement.getNode()}) : context.getLoadBalancingPolicyWrapper().newQueryPlan(statement, this.executionProfile.getName(), (Session)session);
        this.retryPolicy = context.getRetryPolicy(this.executionProfile.getName());
        Boolean idempotent = statement.isIdempotent();
        this.isIdempotent = idempotent == null ? this.executionProfile.getBoolean((DriverOption)DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE) : idempotent.booleanValue();
        this.timeoutFirstPage = this.executionProfile.getDuration((DriverOption)DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE);
        this.timeoutOtherPages = this.executionProfile.getDuration((DriverOption)DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES);
        this.timer = context.getNettyOptions().getTimer();
        this.maxEnqueuedPages = this.executionProfile.getInt((DriverOption)DseDriverOption.CONTINUOUS_PAGING_MAX_ENQUEUED_PAGES);
        this.queue = new ArrayDeque<Object>(this.maxEnqueuedPages);
        this.maxPages = this.executionProfile.getInt((DriverOption)DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES);
        this.protocolBackpressureAvailable = protocolVersion.getCode() >= DseProtocolVersion.DSE_V2.getCode();
        this.numPagesRequested = this.protocolBackpressureAvailable ? this.maxEnqueuedPages : 0;
        this.message = DseConversions.toContinuousPagingMessage(statement, this.executionProfile, context);
        this.replicas = this.getReplicas();
        this.throttler = context.getRequestThrottler();
        this.throttler.register((Throttled)this);
        this.startTimeNanos = System.nanoTime();
    }

    public void onStreamIdAssigned(int streamId) {
        LOG.trace("[{}] Assigned streamId {} on node {}", new Object[]{this.logPrefix, streamId, this.node});
        this.streamId = streamId;
    }

    public boolean isLastResponse(@NonNull Frame responseFrame) {
        Message message = responseFrame.message;
        if (message instanceof Rows) {
            Rows rows = (Rows)message;
            DseRowsMetadata metadata = (DseRowsMetadata)rows.getMetadata();
            return metadata.isLastContinuousPage;
        }
        return message instanceof Error;
    }

    public void onThrottleReady(boolean wasDelayed) {
        if (wasDelayed) {
            this.session.getMetricUpdater().updateTimer((Object)DefaultSessionMetric.THROTTLING_DELAY, this.executionProfile.getName(), System.nanoTime() - this.startTimeNanos, TimeUnit.NANOSECONDS);
        }
        this.sendRequest(null);
    }

    public CompletionStage<ContinuousAsyncResultSet> handle() {
        return this.dequeueOrCreatePending();
    }

    private void sendRequest(@Nullable Node node) {
        this.channel = null;
        if (node == null || (this.channel = this.session.getChannel(node, this.logPrefix)) == null) {
            while ((node = this.queryPlan.poll()) != null) {
                this.channel = this.session.getChannel(node, this.logPrefix);
                if (this.channel == null) continue;
            }
        }
        if (this.channel == null || node == null) {
            this.lock.lock();
            try {
                this.abort((Throwable)AllNodesFailedException.fromErrors(this.errors), false);
            }
            finally {
                this.lock.unlock();
            }
        } else {
            if (this.replicas.isEmpty()) {
                LOG.warn("[{}] Could not determine if the node is a replica, continuous paging may not be available: {}", (Object)this.logPrefix, (Object)node);
            } else if (!this.replicas.contains(node)) {
                LOG.warn("[{}] Contacting a node that is likely not a replica, continuous paging may not be available: {}", (Object)this.logPrefix, (Object)node);
            }
            this.node = node;
            this.streamId = -1;
            this.messageStartTimeNanos = System.nanoTime();
            this.channel.write(this.message, false, this.statement.getCustomPayload(), (ResponseCallback)this).addListener((GenericFutureListener)this);
        }
    }

    public void operationComplete(@NonNull Future<Void> future) {
        if (!future.isSuccess()) {
            Throwable error = future.cause();
            if (error instanceof EncoderException && error.getCause() instanceof FrameTooLongException) {
                this.trackNodeError(this.node, error.getCause());
                this.lock.lock();
                try {
                    this.abort(error.getCause(), false);
                }
                finally {
                    this.lock.unlock();
                }
            } else {
                LOG.trace("[{}] Failed to send request on {}, trying next node (cause: {})", new Object[]{this.logPrefix, this.channel, error});
                ((DefaultNode)this.node).getMetricUpdater().incrementCounter((Object)DefaultNodeMetric.UNSENT_REQUESTS, this.executionProfile.getName());
                this.recordError(this.node, error);
                this.trackNodeError(this.node, error.getCause());
                this.sendRequest(null);
            }
        } else {
            LOG.trace("[{}] Request sent on {}", (Object)this.logPrefix, (Object)this.channel);
            this.timeout = this.scheduleTimeout(1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onResponse(@NonNull Frame response) {
        block9: {
            this.stopNodeMessageTimer();
            this.cancelTimeout();
            this.lock.lock();
            try {
                if (this.state < 0) {
                    LOG.trace("[{}] Got result but the request has been cancelled, ignoring", (Object)this.logPrefix);
                    return;
                }
                try {
                    Message responseMessage = response.message;
                    if (responseMessage instanceof Result) {
                        LOG.trace("[{}] Got result", (Object)this.logPrefix);
                        this.processResultResponse((Result)responseMessage, response);
                        break block9;
                    }
                    if (responseMessage instanceof Error) {
                        LOG.trace("[{}] Got error response", (Object)this.logPrefix);
                        this.processErrorResponse((Error)responseMessage);
                        break block9;
                    }
                    IllegalStateException error = new IllegalStateException("Unexpected response " + responseMessage);
                    this.trackNodeError(this.node, error);
                    this.abort(error, false);
                }
                catch (Throwable t) {
                    this.trackNodeError(this.node, t);
                    this.abort(t, false);
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    public void onFailure(@NonNull Throwable error) {
        this.cancelTimeout();
        LOG.trace(String.format("[%s] Request failure", this.logPrefix), error);
        RetryDecision decision = !this.isIdempotent || error instanceof FrameTooLongException ? RetryDecision.RETHROW : this.retryPolicy.onRequestAborted(this.statement, error, this.retryCount);
        this.updateErrorMetrics(((DefaultNode)this.node).getMetricUpdater(), decision, DefaultNodeMetric.ABORTED_REQUESTS, DefaultNodeMetric.RETRIES_ON_ABORTED, DefaultNodeMetric.IGNORES_ON_ABORTED);
        this.lock.lock();
        try {
            this.processRetryDecision(decision, error);
        }
        finally {
            this.lock.unlock();
        }
    }

    public void onThrottleFailure(@NonNull RequestThrottlingException error) {
        this.session.getMetricUpdater().incrementCounter((Object)DefaultSessionMetric.THROTTLING_ERRORS, this.executionProfile.getName());
        this.lock.lock();
        try {
            this.abort((Throwable)error, false);
        }
        finally {
            this.lock.unlock();
        }
    }

    private void processResultResponse(@NonNull Result result, @Nullable Frame frame) {
        assert (this.lock.isHeldByCurrentThread());
        try {
            DefaultExecutionInfo executionInfo = this.createExecutionInfo(result, frame);
            if (result instanceof Rows) {
                int currentPage;
                int pageNumber;
                DseRowsMetadata rowsMetadata = (DseRowsMetadata)((Rows)result).getMetadata();
                if (this.columnDefinitions == null) {
                    this.columnDefinitions = Conversions.toColumnDefinitions((RowsMetadata)rowsMetadata, (InternalDriverContext)this.context);
                }
                if ((pageNumber = rowsMetadata.continuousPageNumber) != (currentPage = this.state)) {
                    this.abort(new IllegalStateException(String.format("Received page %d but was expecting %d", pageNumber, currentPage)), false);
                } else {
                    DefaultContinuousAsyncResultSet resultSet = this.createResultSet((Rows)result, (ExecutionInfo)executionInfo);
                    if (rowsMetadata.isLastContinuousPage) {
                        LOG.trace("[{}] Received last page ({} - {} rows)", new Object[]{this.logPrefix, pageNumber, resultSet.remaining()});
                        this.state = -1;
                        this.reenableAutoReadIfNeeded();
                        this.enqueueOrCompletePending(resultSet);
                        this.stopGlobalRequestTimer();
                    } else {
                        LOG.trace("[{}] Received page {} ({} rows)", new Object[]{this.logPrefix, pageNumber, resultSet.remaining()});
                        if (currentPage > 0) {
                            this.state = currentPage + 1;
                        }
                        this.enqueueOrCompletePending(resultSet);
                    }
                }
            } else {
                assert (result instanceof com.datastax.oss.protocol.internal.response.result.Void);
                ContinuousAsyncResultSet resultSet = DefaultContinuousAsyncResultSet.empty((ExecutionInfo)executionInfo);
                LOG.trace("[{}] Continuous paging interrupted by retry policy decision to ignore error", (Object)this.logPrefix);
                this.state = -1;
                this.reenableAutoReadIfNeeded();
                this.enqueueOrCompletePending(resultSet);
                this.stopGlobalRequestTimer();
            }
        }
        catch (Throwable error) {
            this.abort(error, false);
        }
    }

    private void processErrorResponse(@NonNull Error errorMessage) {
        assert (this.lock.isHeldByCurrentThread());
        if (errorMessage instanceof Unprepared) {
            this.processUnprepared((Unprepared)errorMessage);
        } else {
            CoordinatorException error = DseConversions.toThrowable(this.node, errorMessage, this.context);
            if (error instanceof BootstrappingException) {
                LOG.trace("[{}] {} is bootstrapping, trying next node", (Object)this.logPrefix, (Object)this.node);
                this.recordError(this.node, (Throwable)error);
                this.trackNodeError(this.node, (Throwable)error);
                this.sendRequest(null);
            } else if (error instanceof QueryValidationException || error instanceof FunctionFailureException || error instanceof ProtocolError || this.state > 1) {
                LOG.trace("[{}] Unrecoverable error, rethrowing", (Object)this.logPrefix);
                NodeMetricUpdater metricUpdater = ((DefaultNode)this.node).getMetricUpdater();
                metricUpdater.incrementCounter((Object)DefaultNodeMetric.OTHER_ERRORS, this.executionProfile.getName());
                this.trackNodeError(this.node, (Throwable)error);
                this.abort((Throwable)error, true);
            } else {
                this.processRecoverableError(error);
            }
        }
    }

    private void processRecoverableError(@NonNull CoordinatorException error) {
        RetryDecision decision;
        assert (this.lock.isHeldByCurrentThread());
        NodeMetricUpdater metricUpdater = ((DefaultNode)this.node).getMetricUpdater();
        if (error instanceof ReadTimeoutException) {
            ReadTimeoutException readTimeout = (ReadTimeoutException)error;
            decision = this.retryPolicy.onReadTimeout(this.statement, readTimeout.getConsistencyLevel(), readTimeout.getBlockFor(), readTimeout.getReceived(), readTimeout.wasDataPresent(), this.retryCount);
            this.updateErrorMetrics(metricUpdater, decision, DefaultNodeMetric.READ_TIMEOUTS, DefaultNodeMetric.RETRIES_ON_READ_TIMEOUT, DefaultNodeMetric.IGNORES_ON_READ_TIMEOUT);
        } else if (error instanceof WriteTimeoutException) {
            WriteTimeoutException writeTimeout = (WriteTimeoutException)error;
            decision = this.isIdempotent ? this.retryPolicy.onWriteTimeout(this.statement, writeTimeout.getConsistencyLevel(), writeTimeout.getWriteType(), writeTimeout.getBlockFor(), writeTimeout.getReceived(), this.retryCount) : RetryDecision.RETHROW;
            this.updateErrorMetrics(metricUpdater, decision, DefaultNodeMetric.WRITE_TIMEOUTS, DefaultNodeMetric.RETRIES_ON_WRITE_TIMEOUT, DefaultNodeMetric.IGNORES_ON_WRITE_TIMEOUT);
        } else if (error instanceof UnavailableException) {
            UnavailableException unavailable = (UnavailableException)error;
            decision = this.retryPolicy.onUnavailable(this.statement, unavailable.getConsistencyLevel(), unavailable.getRequired(), unavailable.getAlive(), this.retryCount);
            this.updateErrorMetrics(metricUpdater, decision, DefaultNodeMetric.UNAVAILABLES, DefaultNodeMetric.RETRIES_ON_UNAVAILABLE, DefaultNodeMetric.IGNORES_ON_UNAVAILABLE);
        } else {
            decision = this.isIdempotent ? this.retryPolicy.onErrorResponse(this.statement, error, this.retryCount) : RetryDecision.RETHROW;
            this.updateErrorMetrics(metricUpdater, decision, DefaultNodeMetric.OTHER_ERRORS, DefaultNodeMetric.RETRIES_ON_OTHER_ERROR, DefaultNodeMetric.IGNORES_ON_OTHER_ERROR);
        }
        this.processRetryDecision(decision, (Throwable)error);
    }

    private void processUnprepared(@NonNull Unprepared errorMessage) {
        assert (this.lock.isHeldByCurrentThread());
        LOG.trace("[{}] Statement is not prepared on {}, re-preparing", (Object)this.logPrefix, (Object)this.node);
        ByteBuffer id = ByteBuffer.wrap(errorMessage.id);
        RepreparePayload repreparePayload = (RepreparePayload)this.session.getRepreparePayloads().get(id);
        if (repreparePayload == null) {
            throw new IllegalStateException(String.format("Tried to execute unprepared query %s but we don't have the data to re-prepare it", Bytes.toHexString((ByteBuffer)id)));
        }
        Prepare prepare = new Prepare(repreparePayload.query);
        Duration timeout = this.executionProfile.getDuration((DriverOption)DefaultDriverOption.REQUEST_TIMEOUT);
        new AdminRequestHandler(this.channel, (Message)prepare, repreparePayload.customPayload, timeout, this.logPrefix, "Re-prepare " + prepare.toString()).start().whenComplete((result, exception) -> this.processPrepared((Throwable)exception));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processPrepared(@Nullable Throwable exception) {
        if (exception == null) {
            LOG.trace("[{}] Re-prepare successful, retrying on the same node ({})", (Object)this.logPrefix, (Object)this.node);
            this.sendRequest(this.node);
        } else {
            Throwable fatalError = null;
            if (exception instanceof UnexpectedResponseException) {
                CoordinatorException prepareError;
                Message prepareErrorMessage = ((UnexpectedResponseException)exception).message;
                if (prepareErrorMessage instanceof Error && ((prepareError = DseConversions.toThrowable(this.node, (Error)prepareErrorMessage, this.context)) instanceof QueryValidationException || prepareError instanceof FunctionFailureException || prepareError instanceof ProtocolError)) {
                    LOG.trace("[{}] Unrecoverable error on re-prepare, rethrowing", (Object)this.logPrefix);
                    this.trackNodeError(this.node, (Throwable)prepareError);
                    fatalError = prepareError;
                }
            } else if (exception instanceof RequestThrottlingException) {
                this.trackNodeError(this.node, exception);
                fatalError = exception;
            }
            if (fatalError != null) {
                this.lock.lock();
                try {
                    this.abort(fatalError, true);
                }
                finally {
                    this.lock.unlock();
                }
            } else {
                LOG.trace("[{}] Re-prepare failed, trying next node", (Object)this.logPrefix);
                this.recordError(this.node, exception);
                this.trackNodeError(this.node, exception);
                this.sendRequest(null);
            }
        }
    }

    private void processRetryDecision(@NonNull RetryDecision decision, @NonNull Throwable error) {
        assert (this.lock.isHeldByCurrentThread());
        LOG.trace("[{}] Processing retry decision {}", (Object)this.logPrefix, (Object)decision);
        switch (decision) {
            case RETRY_SAME: {
                this.recordError(this.node, error);
                this.trackNodeError(this.node, error);
                ++this.retryCount;
                this.sendRequest(this.node);
                break;
            }
            case RETRY_NEXT: {
                this.recordError(this.node, error);
                this.trackNodeError(this.node, error);
                ++this.retryCount;
                this.sendRequest(null);
                break;
            }
            case RETHROW: {
                this.trackNodeError(this.node, error);
                this.abort(error, true);
                break;
            }
            case IGNORE: {
                this.processResultResponse((Result)com.datastax.oss.protocol.internal.response.result.Void.INSTANCE, null);
            }
        }
    }

    private void enqueueOrCompletePending(@NonNull Object pageOrError) {
        assert (this.lock.isHeldByCurrentThread());
        if (this.pendingResult != null) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("[{}] Client was waiting on empty queue, completing with {}", (Object)this.logPrefix, (Object)ContinuousCqlRequestHandler.asTraceString(pageOrError));
            }
            CompletableFuture<ContinuousAsyncResultSet> tmp = this.pendingResult;
            this.pendingResult = null;
            this.completeResultSetFuture(tmp, pageOrError);
        } else {
            if (LOG.isTraceEnabled()) {
                LOG.trace("[{}] Enqueuing {}", (Object)this.logPrefix, (Object)ContinuousCqlRequestHandler.asTraceString(pageOrError));
            }
            this.queue.add(pageOrError);
            if (!this.protocolBackpressureAvailable && this.queue.size() == this.maxEnqueuedPages && this.state > 0) {
                LOG.trace("[{}] Exceeded {} queued response pages, disabling auto-read", (Object)this.logPrefix, (Object)this.queue.size());
                this.channel.config().setAutoRead(false);
            }
        }
    }

    @NonNull
    protected CompletableFuture<ContinuousAsyncResultSet> dequeueOrCreatePending() {
        this.lock.lock();
        try {
            assert (this.pendingResult == null);
            Object head = this.queue.poll();
            if (!this.protocolBackpressureAvailable && head != null && this.queue.size() == this.maxEnqueuedPages - 1) {
                LOG.trace("[{}] Back to {} queued response pages, re-enabling auto-read", (Object)this.logPrefix, (Object)this.queue.size());
                this.channel.config().setAutoRead(true);
            }
            this.maybeRequestMore();
            if (head != null) {
                if (this.state == -2 && !(head instanceof Throwable)) {
                    LOG.trace("[{}] Client requested next page on cancelled queue, discarding page and returning cancelled future", (Object)this.logPrefix);
                    CompletableFuture<ContinuousAsyncResultSet> completableFuture = this.cancelledResultSetFuture();
                    return completableFuture;
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace("[{}] Client requested next page on non-empty queue, returning immediate future of {}", (Object)this.logPrefix, (Object)ContinuousCqlRequestHandler.asTraceString(head));
                }
                CompletableFuture<ContinuousAsyncResultSet> completableFuture = this.immediateResultSetFuture(head);
                return completableFuture;
            }
            if (this.state == -2) {
                LOG.trace("[{}] Client requested next page on cancelled empty queue, returning cancelled future", (Object)this.logPrefix);
                CompletableFuture<ContinuousAsyncResultSet> completableFuture = this.cancelledResultSetFuture();
                return completableFuture;
            }
            LOG.trace("[{}] Client requested next page but queue is empty, installing future", (Object)this.logPrefix);
            this.pendingResult = this.createResultSetFuture();
            if (this.state > 1) {
                this.timeout = this.scheduleTimeout(this.state);
            }
            CompletableFuture<ContinuousAsyncResultSet> completableFuture = this.pendingResult;
            return completableFuture;
        }
        finally {
            this.lock.unlock();
        }
    }

    private void maybeRequestMore() {
        int inFlight;
        assert (this.lock.isHeldByCurrentThread());
        if (this.state < 2 || this.streamId == -1 || !this.protocolBackpressureAvailable) {
            return;
        }
        if (this.maxPages > 0 && this.numPagesRequested >= this.maxPages) {
            return;
        }
        int received = this.state - 1;
        int requested = this.numPagesRequested;
        int freeSpace = this.maxEnqueuedPages - this.queue.size();
        int numPagesFittingInQueue = freeSpace - (inFlight = requested - received);
        if (numPagesFittingInQueue >= this.maxEnqueuedPages / 2) {
            LOG.trace("[{}] Requesting more {} pages", (Object)this.logPrefix, (Object)numPagesFittingInQueue);
            this.numPagesRequested = requested + numPagesFittingInQueue;
            this.sendMorePagesRequest(numPagesFittingInQueue);
        }
    }

    private void sendMorePagesRequest(int nextPages) {
        assert (this.lock.isHeldByCurrentThread());
        assert (this.channel != null) : "expected valid connection in order to request more pages";
        assert (this.protocolBackpressureAvailable);
        assert (this.streamId != -1);
        LOG.trace("[{}] Sending request for more pages", (Object)this.logPrefix);
        new ThrottledAdminRequestHandler(this.channel, (Message)Revise.requestMoreContinuousPages((int)this.streamId, (int)nextPages), this.statement.getCustomPayload(), this.timeoutOtherPages, this.throttler, this.session.getMetricUpdater(), this.logPrefix, "request " + nextPages + " more pages for id " + this.streamId).start().handle((result, error) -> {
            if (error != null) {
                Loggers.warnWithException((Logger)LOG, (String)"[{}] Error requesting more pages, aborting.", (Object[])new Object[]{this.logPrefix, error});
                this.lock.lock();
                try {
                    this.abort((Throwable)error, false);
                }
                finally {
                    this.lock.unlock();
                }
            }
            return null;
        });
    }

    private Timeout scheduleTimeout(int expectedPage) {
        Duration timeout;
        if (expectedPage < 0) {
            return null;
        }
        Duration duration = timeout = expectedPage == 1 ? this.timeoutFirstPage : this.timeoutOtherPages;
        if (timeout.toNanos() <= 0L) {
            return null;
        }
        LOG.trace("[{}] Scheduling timeout for page {} in {}", new Object[]{this.logPrefix, expectedPage, timeout});
        return this.timer.newTimeout(timeout1 -> {
            this.lock.lock();
            try {
                if (this.state == expectedPage) {
                    this.abort((Throwable)new DriverTimeoutException(String.format("Timed out waiting for page %d", expectedPage)), false);
                } else {
                    LOG.trace("[{}] Timeout fired for page {} but query already at state {}, skipping", new Object[]{this.logPrefix, expectedPage, this.state});
                }
            }
            finally {
                this.lock.unlock();
            }
        }, timeout.toNanos(), TimeUnit.NANOSECONDS);
    }

    private void cancelTimeout() {
        Timeout timeout = this.timeout;
        if (timeout != null) {
            LOG.trace("[{}] Cancelling timeout", (Object)this.logPrefix);
            timeout.cancel();
        }
    }

    void cancel() {
        this.lock.lock();
        try {
            if (this.state < 0) {
                return;
            }
            LOG.trace("[{}] Cancelling continuous paging session with state {} on node {}", new Object[]{this.logPrefix, this.state, this.node});
            this.state = -2;
            if (this.pendingResult != null) {
                this.pendingResult.cancel(true);
            }
        }
        finally {
            this.lock.unlock();
        }
        if (this.channel != null) {
            if (!this.channel.closeFuture().isDone()) {
                this.channel.cancel((ResponseCallback)this);
            }
            this.sendCancelRequest();
        }
        this.reenableAutoReadIfNeeded();
    }

    private void sendCancelRequest() {
        LOG.trace("[{}] Sending cancel request", (Object)this.logPrefix);
        new ThrottledAdminRequestHandler(this.channel, (Message)Revise.cancelContinuousPaging((int)this.streamId), this.statement.getCustomPayload(), this.timeoutOtherPages, this.throttler, this.session.getMetricUpdater(), this.logPrefix, "cancel request").start().handle((result, error) -> {
            if (error != null) {
                Loggers.warnWithException((Logger)LOG, (String)"[{}] Error sending cancel request. This is not critical (the request will eventually time out server-side).", (Object[])new Object[]{this.logPrefix, error});
            } else {
                LOG.trace("[{}] Continuous paging session cancelled successfully", (Object)this.logPrefix);
            }
            return null;
        });
    }

    private void reenableAutoReadIfNeeded() {
        LOG.trace("[{}] Re-enabling auto-read", (Object)this.logPrefix);
        if (!this.protocolBackpressureAvailable) {
            this.channel.config().setAutoRead(true);
        }
    }

    private void recordError(@NonNull Node node, @NonNull Throwable error) {
        this.errors.add(new AbstractMap.SimpleEntry<Node, Throwable>(node, error));
    }

    private void trackNodeError(@NonNull Node node, @NonNull Throwable error) {
        long latencyNanos = System.nanoTime() - this.messageStartTimeNanos;
        this.context.getRequestTracker().onNodeError(this.statement, error, latencyNanos, this.executionProfile, node, this.logPrefix);
    }

    private void abort(@NonNull Throwable error, boolean fromServer) {
        assert (this.lock.isHeldByCurrentThread());
        LOG.trace("[{}] Aborting due to {} ({})", new Object[]{this.logPrefix, error.getClass().getSimpleName(), error.getMessage()});
        if (this.channel == null) {
            this.enqueueOrCompletePending(error);
            this.state = -2;
        } else if (this.state > 0) {
            this.enqueueOrCompletePending(error);
            if (fromServer) {
                this.state = -2;
                this.reenableAutoReadIfNeeded();
            } else {
                this.cancel();
            }
        }
        this.stopGlobalRequestTimer();
    }

    private void stopNodeMessageTimer() {
        ((DefaultNode)this.node).getMetricUpdater().updateTimer((Object)DefaultNodeMetric.CQL_MESSAGES, this.executionProfile.getName(), System.nanoTime() - this.messageStartTimeNanos, TimeUnit.NANOSECONDS);
    }

    private void stopGlobalRequestTimer() {
        this.session.getMetricUpdater().updateTimer((Object)DseSessionMetric.CONTINUOUS_CQL_REQUESTS, this.executionProfile.getName(), System.nanoTime() - this.startTimeNanos, TimeUnit.NANOSECONDS);
    }

    private void updateErrorMetrics(@NonNull NodeMetricUpdater metricUpdater, @NonNull RetryDecision decision, @NonNull DefaultNodeMetric error, @NonNull DefaultNodeMetric retriesOnError, @NonNull DefaultNodeMetric ignoresOnError) {
        metricUpdater.incrementCounter((Object)error, this.executionProfile.getName());
        switch (decision) {
            case RETRY_SAME: 
            case RETRY_NEXT: {
                metricUpdater.incrementCounter((Object)DefaultNodeMetric.RETRIES, this.executionProfile.getName());
                metricUpdater.incrementCounter((Object)retriesOnError, this.executionProfile.getName());
                break;
            }
            case IGNORE: {
                metricUpdater.incrementCounter((Object)DefaultNodeMetric.IGNORES, this.executionProfile.getName());
                metricUpdater.incrementCounter((Object)ignoresOnError, this.executionProfile.getName());
                break;
            }
        }
    }

    @NonNull
    private Set<Node> getReplicas() {
        if (this.session.getMetadata().getTokenMap().isPresent()) {
            CqlIdentifier keyspace = this.statement.getKeyspace();
            if (keyspace == null && (keyspace = this.statement.getRoutingKeyspace()) == null) {
                keyspace = this.session.getKeyspace().orElse(null);
            }
            if (keyspace != null) {
                TokenMap tokenMap = (TokenMap)this.session.getMetadata().getTokenMap().get();
                Token routingToken = this.statement.getRoutingToken();
                if (routingToken != null) {
                    return tokenMap.getReplicas(keyspace, routingToken);
                }
                ByteBuffer routingKey = this.statement.getRoutingKey();
                if (routingKey != null) {
                    return tokenMap.getReplicas(keyspace, routingKey);
                }
            }
        }
        return Collections.emptySet();
    }

    @NonNull
    private DefaultExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Frame response) {
        ByteBuffer pagingState = result instanceof Rows ? ((Rows)result).getMetadata().pagingState : null;
        return new DefaultExecutionInfo(this.statement, this.node, 0, 0, this.errors, pagingState, response, true, this.session, this.context, this.executionProfile);
    }

    @NonNull
    private DefaultContinuousAsyncResultSet createResultSet(@NonNull Rows rows, @NonNull ExecutionInfo executionInfo) {
        final Queue data = rows.getData();
        CountingIterator<Row> iterator = new CountingIterator<Row>(data.size()){

            protected Row computeNext() {
                List rowData = (List)data.poll();
                return rowData == null ? (Row)this.endOfData() : new DefaultRow(ContinuousCqlRequestHandler.this.columnDefinitions, rowData, (AttachmentPoint)ContinuousCqlRequestHandler.this.context);
            }
        };
        DseRowsMetadata metadata = (DseRowsMetadata)rows.getMetadata();
        return new DefaultContinuousAsyncResultSet(iterator, this.columnDefinitions, metadata.continuousPageNumber, !metadata.isLastContinuousPage, executionInfo, this);
    }

    @NonNull
    private CompletableFuture<ContinuousAsyncResultSet> createResultSetFuture() {
        CompletableFuture<ContinuousAsyncResultSet> future = new CompletableFuture<ContinuousAsyncResultSet>();
        future.whenComplete((rs, t) -> {
            if (t instanceof CancellationException) {
                this.cancel();
            }
        });
        return future;
    }

    @NonNull
    private CompletableFuture<ContinuousAsyncResultSet> immediateResultSetFuture(@NonNull Object pageOrError) {
        CompletableFuture<ContinuousAsyncResultSet> future = this.createResultSetFuture();
        this.completeResultSetFuture(future, pageOrError);
        return future;
    }

    @NonNull
    private CompletableFuture<ContinuousAsyncResultSet> cancelledResultSetFuture() {
        return this.immediateResultSetFuture(new CancellationException("Can't get more results because the continuous query has failed already. Most likely this is because the query was cancelled"));
    }

    private void completeResultSetFuture(@NonNull CompletableFuture<ContinuousAsyncResultSet> future, @NonNull Object pageOrError) {
        long now = System.nanoTime();
        long totalLatencyNanos = now - this.startTimeNanos;
        long nodeLatencyNanos = now - this.messageStartTimeNanos;
        if (pageOrError instanceof ContinuousAsyncResultSet) {
            if (future.complete((ContinuousAsyncResultSet)pageOrError)) {
                this.throttler.signalSuccess((Throttled)this);
                this.context.getRequestTracker().onNodeSuccess(this.statement, nodeLatencyNanos, this.executionProfile, this.node, this.logPrefix);
                this.context.getRequestTracker().onSuccess(this.statement, totalLatencyNanos, this.executionProfile, this.node, this.logPrefix);
            }
        } else {
            Throwable error = (Throwable)pageOrError;
            if (future.completeExceptionally(error)) {
                this.context.getRequestTracker().onError(this.statement, error, totalLatencyNanos, this.executionProfile, this.node, this.logPrefix);
                if (error instanceof DriverTimeoutException) {
                    this.throttler.signalTimeout((Throttled)this);
                    this.session.getMetricUpdater().incrementCounter((Object)DefaultSessionMetric.CQL_CLIENT_TIMEOUTS, this.executionProfile.getName());
                } else if (!(error instanceof RequestThrottlingException)) {
                    this.throttler.signalError((Throttled)this, error);
                }
            }
        }
    }

    @NonNull
    private static String asTraceString(@NonNull Object pageOrError) {
        return pageOrError instanceof ContinuousAsyncResultSet ? "page " + ((ContinuousAsyncResultSet)pageOrError).pageNumber() : ((Exception)pageOrError).getClass().getSimpleName();
    }
}

