package io.pravega.client.segment.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.auth.AuthenticationException;
import io.pravega.client.netty.impl.ClientConnection;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.netty.impl.Flow;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.stream.impl.ConnectionClosedException;
import io.pravega.client.stream.impl.Controller;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.ByteBufferUtils;
import io.pravega.common.util.Retry;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.FailingReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommands;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/client/segment/impl/AsyncSegmentInputStreamImpl.class */
public class AsyncSegmentInputStreamImpl extends AsyncSegmentInputStream {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(AsyncSegmentInputStreamImpl.class);
    private final Retry.RetryWithBackoff backoffSchedule;
    private final ConnectionFactory connectionFactory;
    private final Object lock;

    @GuardedBy("lock")
    private CompletableFuture<ClientConnection> connection;

    @GuardedBy("lock")
    private final Map<Long, CompletableFuture<WireCommands.SegmentRead>> outstandingRequests;
    private final ResponseProcessor responseProcessor;
    private final AtomicBoolean closed;
    private final Controller controller;
    private final DelegationTokenProvider tokenProvider;

    @VisibleForTesting
    private final long requestId;

    /* loaded from: input_file:io/pravega/client/segment/impl/AsyncSegmentInputStreamImpl$ResponseProcessor.class */
    private final class ResponseProcessor extends FailingReplyProcessor {
        private ResponseProcessor() {
        }

        @Override // io.pravega.shared.protocol.netty.ReplyProcessor
        public void connectionDropped() {
            AsyncSegmentInputStreamImpl.this.closeConnection(new ConnectionFailedException());
        }

        @Override // io.pravega.shared.protocol.netty.FailingReplyProcessor, io.pravega.shared.protocol.netty.ReplyProcessor
        public void wrongHost(WireCommands.WrongHost wrongHost) {
            AsyncSegmentInputStreamImpl.this.closeConnection(new ConnectionFailedException(wrongHost.toString()));
        }

        @Override // io.pravega.shared.protocol.netty.FailingReplyProcessor, io.pravega.shared.protocol.netty.ReplyProcessor
        public void noSuchSegment(WireCommands.NoSuchSegment noSuchSegment) {
            AsyncSegmentInputStreamImpl.log.info("Received noSuchSegment {}", noSuchSegment);
            CompletableFuture<WireCommands.SegmentRead> grabFuture = grabFuture(noSuchSegment.getSegment(), noSuchSegment.getOffset());
            if (grabFuture != null) {
                grabFuture.completeExceptionally(new SegmentTruncatedException("Segment no longer exists."));
            }
        }

        @Override // io.pravega.shared.protocol.netty.FailingReplyProcessor, io.pravega.shared.protocol.netty.ReplyProcessor
        public void segmentIsTruncated(WireCommands.SegmentIsTruncated segmentIsTruncated) {
            AsyncSegmentInputStreamImpl.log.info("Received segmentIsTruncated {}", segmentIsTruncated);
            CompletableFuture<WireCommands.SegmentRead> grabFuture = grabFuture(segmentIsTruncated.getSegment(), segmentIsTruncated.getOffset());
            if (grabFuture != null) {
                grabFuture.completeExceptionally(new SegmentTruncatedException());
            }
        }

        @Override // io.pravega.shared.protocol.netty.FailingReplyProcessor, io.pravega.shared.protocol.netty.ReplyProcessor
        public void segmentIsSealed(WireCommands.SegmentIsSealed segmentIsSealed) {
            AsyncSegmentInputStreamImpl.log.info("Received segmentSealed {}", segmentIsSealed);
            CompletableFuture<WireCommands.SegmentRead> grabFuture = grabFuture(segmentIsSealed.getSegment(), segmentIsSealed.getOffset());
            if (grabFuture != null) {
                grabFuture.complete(new WireCommands.SegmentRead(segmentIsSealed.getSegment(), segmentIsSealed.getOffset(), true, true, ByteBufferUtils.EMPTY, segmentIsSealed.getRequestId()));
            }
        }

        @Override // io.pravega.shared.protocol.netty.FailingReplyProcessor, io.pravega.shared.protocol.netty.ReplyProcessor
        public void segmentRead(WireCommands.SegmentRead segmentRead) {
            AsyncSegmentInputStreamImpl.log.trace("Received read result {}", segmentRead);
            CompletableFuture<WireCommands.SegmentRead> grabFuture = grabFuture(segmentRead.getSegment(), segmentRead.getOffset());
            if (grabFuture != null) {
                grabFuture.complete(segmentRead);
            }
        }

        private CompletableFuture<WireCommands.SegmentRead> grabFuture(String str, long j) {
            CompletableFuture<WireCommands.SegmentRead> completableFuture;
            checkSegment(str);
            synchronized (AsyncSegmentInputStreamImpl.this.lock) {
                completableFuture = (CompletableFuture) AsyncSegmentInputStreamImpl.this.outstandingRequests.remove(Long.valueOf(j));
            }
            return completableFuture;
        }

        @Override // io.pravega.shared.protocol.netty.ReplyProcessor
        public void processingFailure(Exception exc) {
            AsyncSegmentInputStreamImpl.log.warn("Processing failure: ", exc);
            AsyncSegmentInputStreamImpl.this.closeConnection(exc);
        }

        @Override // io.pravega.shared.protocol.netty.ReplyProcessor
        public void authTokenCheckFailed(WireCommands.AuthTokenCheckFailed authTokenCheckFailed) {
            AsyncSegmentInputStreamImpl.log.warn("Auth failed {}", authTokenCheckFailed);
            AsyncSegmentInputStreamImpl.this.closeConnection(new AuthenticationException(authTokenCheckFailed.toString()));
        }

        private void checkSegment(String str) {
            Preconditions.checkState(AsyncSegmentInputStreamImpl.this.segmentId.getScopedName().equals(str), "Operating on segmentId {} but received sealed for segment {}", AsyncSegmentInputStreamImpl.this.segmentId, str);
        }
    }

    public AsyncSegmentInputStreamImpl(Controller controller, ConnectionFactory connectionFactory, Segment segment, DelegationTokenProvider delegationTokenProvider) {
        super(segment);
        this.backoffSchedule = Retry.withExpBackoff(1L, 10, 9, 30000L);
        this.lock = new Object();
        this.connection = null;
        this.outstandingRequests = new HashMap();
        this.responseProcessor = new ResponseProcessor();
        this.closed = new AtomicBoolean(false);
        this.requestId = Flow.create().asLong();
        this.tokenProvider = delegationTokenProvider;
        Preconditions.checkNotNull(controller);
        Preconditions.checkNotNull(connectionFactory);
        Preconditions.checkNotNull(segment);
        this.controller = controller;
        this.connectionFactory = connectionFactory;
    }

    @Override // io.pravega.client.segment.impl.AsyncSegmentInputStream, java.lang.AutoCloseable
    public void close() {
        log.info("Closing reader for {}", this.segmentId);
        if (this.closed.compareAndSet(false, true)) {
            closeConnection(new ConnectionClosedException());
        }
    }

    @Override // io.pravega.client.segment.impl.AsyncSegmentInputStream
    public boolean isClosed() {
        return this.closed.get();
    }

    @Override // io.pravega.client.segment.impl.AsyncSegmentInputStream
    public CompletableFuture<WireCommands.SegmentRead> read(long j, int i) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        return this.tokenProvider.retrieveToken().thenComposeAsync(str -> {
            WireCommands.ReadSegment readSegment = new WireCommands.ReadSegment(this.segmentId.getScopedName(), j, i, str, this.requestId);
            return this.backoffSchedule.retryWhen(th -> {
                Throwable unwrap = Exceptions.unwrap(th);
                if (this.closed.get()) {
                    log.debug("Exception while reading from Segment : {}", this.segmentId, unwrap);
                } else {
                    log.warn("Exception while reading from Segment : {}", this.segmentId, unwrap);
                }
                return (!(unwrap instanceof Exception) || (unwrap instanceof ConnectionClosedException) || (unwrap instanceof SegmentTruncatedException)) ? false : true;
            }).runAsync(() -> {
                return getConnection().whenComplete((clientConnection, th2) -> {
                    if (th2 != null) {
                        log.warn("Exception while establishing connection with Pravega node", th2);
                        closeConnection(new ConnectionFailedException(th2));
                    }
                }).thenCompose(clientConnection2 -> {
                    return sendRequestOverConnection(readSegment, clientConnection2).whenComplete((segmentRead, th3) -> {
                        if (th3 instanceof ConnectionFailedException) {
                            log.debug("ConnectionFailedException observed when sending request {}", readSegment, th3);
                            closeConnection((ConnectionFailedException) th3);
                        }
                    });
                });
            }, this.connectionFactory.getInternalExecutor());
        }, (Executor) this.connectionFactory.getInternalExecutor());
    }

    private CompletableFuture<WireCommands.SegmentRead> sendRequestOverConnection(WireCommands.ReadSegment readSegment, ClientConnection clientConnection) {
        CompletableFuture<WireCommands.SegmentRead> completableFuture = new CompletableFuture<>();
        if (this.closed.get()) {
            completableFuture.completeExceptionally(new ConnectionClosedException());
            return completableFuture;
        }
        synchronized (this.lock) {
            this.outstandingRequests.put(Long.valueOf(readSegment.getOffset()), completableFuture);
        }
        log.trace("Sending read request {}", readSegment);
        clientConnection.sendAsync(readSegment, connectionFailedException -> {
            if (connectionFailedException != null) {
                log.error("Error while sending request {}", readSegment, connectionFailedException);
                synchronized (this.lock) {
                    this.outstandingRequests.remove(Long.valueOf(readSegment.getOffset()));
                }
                completableFuture.completeExceptionally(connectionFailedException);
            }
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeConnection(Exception exc) {
        CompletableFuture<ClientConnection> completableFuture;
        if (this.closed.get()) {
            log.info("Closing connection to segment: {}", this.segmentId);
        } else {
            log.warn("Closing connection to segment {} with exception: {}", this.segmentId, exc.toString());
        }
        synchronized (this.lock) {
            completableFuture = this.connection;
            this.connection = null;
        }
        if (completableFuture != null && Futures.isSuccessful(completableFuture)) {
            try {
                completableFuture.getNow(null).close();
            } catch (Exception e) {
                log.warn("Exception tearing down connection: ", e);
            }
        }
        failAllInflight(exc);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<ClientConnection> getConnection() {
        synchronized (this.lock) {
            if (this.connection == null) {
                return this.controller.getEndpointForSegment(this.segmentId.getScopedName()).thenCompose(pravegaNodeUri -> {
                    CompletableFuture<ClientConnection> completableFuture;
                    synchronized (this.lock) {
                        if (this.connection == null) {
                            this.connection = this.connectionFactory.establishConnection(Flow.from(this.requestId), pravegaNodeUri, this.responseProcessor);
                        }
                        completableFuture = this.connection;
                    }
                    return completableFuture;
                });
            }
            return this.connection;
        }
    }

    private void failAllInflight(Exception exc) {
        ArrayList arrayList;
        log.info("Connection failed due to a {}. Read requests will be retransmitted.", exc.toString());
        synchronized (this.lock) {
            arrayList = new ArrayList(this.outstandingRequests.values());
            this.outstandingRequests.clear();
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((CompletableFuture) it.next()).completeExceptionally(exc);
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    public long getRequestId() {
        return this.requestId;
    }
}
