package io.pravega.client.segment.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.netty.impl.ClientConnection;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.impl.ConnectionClosedException;
import io.pravega.client.stream.impl.Controller;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.FutureHelpers;
import io.pravega.common.util.Retry;
import io.pravega.connectors.flink.util.FlinkPravegaParams;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.FailingReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommand;
import io.pravega.shared.protocol.netty.WireCommands;
import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/client/segment/impl/SegmentMetadataClientImpl.class */
public class SegmentMetadataClientImpl implements SegmentMetadataClient {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(SegmentMetadataClientImpl.class);
    private static final Retry.RetryWithBackoff RETRY_SCHEDULE = Retry.withExpBackoff(1, 10, 9, 30000);
    private final Segment segmentId;
    private final Controller controller;
    private final ConnectionFactory connectionFactory;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Object lock = new Object();

    @GuardedBy("lock")
    private CompletableFuture<ClientConnection> connection = null;

    @GuardedBy("lock")
    private final Map<Long, CompletableFuture<WireCommands.StreamSegmentInfo>> infoRequests = new HashMap();

    @GuardedBy("lock")
    private final Map<Long, CompletableFuture<WireCommands.SegmentAttribute>> getAttributeRequests = new HashMap();

    @GuardedBy("lock")
    private final Map<Long, CompletableFuture<WireCommands.SegmentAttributeUpdated>> setAttributeRequests = new HashMap();
    private final Supplier<Long> requestIdGenerator;
    private final ResponseProcessor responseProcessor;

    /* loaded from: input_file:io/pravega/client/segment/impl/SegmentMetadataClientImpl$ResponseProcessor.class */
    private final class ResponseProcessor extends FailingReplyProcessor {
        private ResponseProcessor() {
        }

        @Override // io.pravega.shared.protocol.netty.FailingReplyProcessor, io.pravega.shared.protocol.netty.ReplyProcessor
        public void streamSegmentInfo(WireCommands.StreamSegmentInfo streamSegmentInfo) {
            CompletableFuture completableFuture;
            SegmentMetadataClientImpl.log.debug("Received stream segment info {}", streamSegmentInfo);
            synchronized (SegmentMetadataClientImpl.this.lock) {
                completableFuture = (CompletableFuture) SegmentMetadataClientImpl.this.infoRequests.remove(Long.valueOf(streamSegmentInfo.getRequestId()));
            }
            if (completableFuture != null) {
                completableFuture.complete(streamSegmentInfo);
            }
        }

        @Override // io.pravega.shared.protocol.netty.FailingReplyProcessor, io.pravega.shared.protocol.netty.ReplyProcessor
        public void segmentAttribute(WireCommands.SegmentAttribute segmentAttribute) {
            CompletableFuture completableFuture;
            SegmentMetadataClientImpl.log.debug("Received stream segment attribute {}", segmentAttribute);
            synchronized (SegmentMetadataClientImpl.this.lock) {
                completableFuture = (CompletableFuture) SegmentMetadataClientImpl.this.getAttributeRequests.remove(Long.valueOf(segmentAttribute.getRequestId()));
            }
            if (completableFuture != null) {
                completableFuture.complete(segmentAttribute);
            }
        }

        @Override // io.pravega.shared.protocol.netty.FailingReplyProcessor, io.pravega.shared.protocol.netty.ReplyProcessor
        public void segmentAttributeUpdated(WireCommands.SegmentAttributeUpdated segmentAttributeUpdated) {
            CompletableFuture completableFuture;
            SegmentMetadataClientImpl.log.debug("Received stream segment attribute update result {}", segmentAttributeUpdated);
            synchronized (SegmentMetadataClientImpl.this.lock) {
                completableFuture = (CompletableFuture) SegmentMetadataClientImpl.this.setAttributeRequests.remove(Long.valueOf(segmentAttributeUpdated.getRequestId()));
            }
            if (completableFuture != null) {
                completableFuture.complete(segmentAttributeUpdated);
            }
        }

        @Override // io.pravega.shared.protocol.netty.ReplyProcessor
        public void connectionDropped() {
            SegmentMetadataClientImpl.this.closeConnection(new ConnectionFailedException());
        }

        @Override // io.pravega.shared.protocol.netty.FailingReplyProcessor, io.pravega.shared.protocol.netty.ReplyProcessor
        public void wrongHost(WireCommands.WrongHost wrongHost) {
            SegmentMetadataClientImpl.this.closeConnection(new ConnectionFailedException(wrongHost.toString()));
        }

        @Override // io.pravega.shared.protocol.netty.FailingReplyProcessor, io.pravega.shared.protocol.netty.ReplyProcessor
        public void noSuchSegment(WireCommands.NoSuchSegment noSuchSegment) {
            SegmentMetadataClientImpl.this.closeConnection(new InvalidStreamException(noSuchSegment.toString()));
        }

        @Override // io.pravega.shared.protocol.netty.ReplyProcessor
        public void processingFailure(Exception exc) {
            SegmentMetadataClientImpl.log.warn("Processing failure: ", exc);
            SegmentMetadataClientImpl.this.closeConnection(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeConnection(Throwable th) {
        CompletableFuture<ClientConnection> completableFuture;
        log.info("Closing connection with exception: {}", th.getMessage());
        synchronized (this.lock) {
            completableFuture = this.connection;
            this.connection = null;
        }
        if (completableFuture != null && FutureHelpers.isSuccessful(completableFuture)) {
            try {
                completableFuture.getNow(null).close();
            } catch (Exception e) {
                log.warn("Exception tearing down connection: ", e);
            }
        }
        failAllInflight(th);
    }

    private void failAllInflight(Throwable th) {
        ArrayList arrayList;
        ArrayList arrayList2;
        ArrayList arrayList3;
        log.info("SegmentMetadata connection failed due to a {}.", th.getMessage());
        synchronized (this.lock) {
            arrayList = new ArrayList(this.infoRequests.values());
            arrayList2 = new ArrayList(this.getAttributeRequests.values());
            arrayList3 = new ArrayList(this.setAttributeRequests.values());
            this.infoRequests.clear();
            this.getAttributeRequests.clear();
            this.setAttributeRequests.clear();
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((CompletableFuture) it.next()).completeExceptionally(th);
        }
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            ((CompletableFuture) it2.next()).completeExceptionally(th);
        }
        Iterator it3 = arrayList3.iterator();
        while (it3.hasNext()) {
            ((CompletableFuture) it3.next()).completeExceptionally(th);
        }
    }

    CompletableFuture<ClientConnection> getConnection() {
        synchronized (this.lock) {
            if (this.connection == null) {
                return this.controller.getEndpointForSegment(this.segmentId.getScopedName()).thenCompose(pravegaNodeUri -> {
                    CompletableFuture<ClientConnection> completableFuture;
                    log.info("Connecting to {}", pravegaNodeUri);
                    synchronized (this.lock) {
                        if (this.connection == null) {
                            this.connection = this.connectionFactory.establishConnection(pravegaNodeUri, this.responseProcessor);
                        }
                        completableFuture = this.connection;
                    }
                    return completableFuture;
                });
            }
            return this.connection;
        }
    }

    private CompletableFuture<WireCommands.StreamSegmentInfo> getSegmentInfo() {
        CompletableFuture<WireCommands.StreamSegmentInfo> completableFuture = new CompletableFuture<>();
        long longValue = this.requestIdGenerator.get().longValue();
        synchronized (this.lock) {
            this.infoRequests.put(Long.valueOf(longValue), completableFuture);
        }
        getConnection().thenAccept(clientConnection -> {
            log.debug("Getting segment info for segment: {}", this.segmentId);
            send(clientConnection, new WireCommands.GetStreamSegmentInfo(longValue, this.segmentId.getScopedName()));
        }).exceptionally(th -> {
            closeConnection(th);
            return null;
        });
        return completableFuture;
    }

    private void send(ClientConnection clientConnection, WireCommand wireCommand) {
        try {
            clientConnection.send(wireCommand);
        } catch (ConnectionFailedException e) {
            throw e;
        }
    }

    private CompletableFuture<WireCommands.SegmentAttribute> getPropertyAsync(UUID uuid) {
        CompletableFuture<WireCommands.SegmentAttribute> completableFuture = new CompletableFuture<>();
        long longValue = this.requestIdGenerator.get().longValue();
        synchronized (this.lock) {
            this.getAttributeRequests.put(Long.valueOf(longValue), completableFuture);
        }
        getConnection().thenAccept(clientConnection -> {
            log.debug("Getting segment attribute: {}", uuid);
            send(clientConnection, new WireCommands.GetSegmentAttribute(longValue, this.segmentId.getScopedName(), uuid));
        }).exceptionally(th -> {
            closeConnection(th);
            return null;
        });
        return completableFuture;
    }

    private CompletableFuture<WireCommands.SegmentAttributeUpdated> updatePropertyAsync(UUID uuid, long j, long j2) {
        CompletableFuture<WireCommands.SegmentAttributeUpdated> completableFuture = new CompletableFuture<>();
        long longValue = this.requestIdGenerator.get().longValue();
        synchronized (this.lock) {
            this.setAttributeRequests.put(Long.valueOf(longValue), completableFuture);
        }
        getConnection().thenAccept(clientConnection -> {
            log.trace("Updating segment attribute: {}", uuid);
            send(clientConnection, new WireCommands.UpdateSegmentAttribute(longValue, this.segmentId.getScopedName(), uuid, j2, j));
        }).exceptionally(th -> {
            closeConnection(th);
            return null;
        });
        return completableFuture;
    }

    @Override // io.pravega.client.segment.impl.SegmentMetadataClient
    public long fetchCurrentStreamLength() {
        Exceptions.checkNotClosed(this.closed.get(), this);
        return ((Long) RETRY_SCHEDULE.retryingOn(ConnectionFailedException.class).throwingOn(InvalidStreamException.class).run(() -> {
            return Long.valueOf(((WireCommands.StreamSegmentInfo) FutureHelpers.getThrowingException(getSegmentInfo())).getSegmentLength());
        })).longValue();
    }

    @Override // io.pravega.client.segment.impl.SegmentMetadataClient
    public long fetchProperty(SegmentAttribute segmentAttribute) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        return ((Long) RETRY_SCHEDULE.retryingOn(ConnectionFailedException.class).throwingOn(InvalidStreamException.class).run(() -> {
            return Long.valueOf(((WireCommands.SegmentAttribute) FutureHelpers.getThrowingException(getPropertyAsync(segmentAttribute.getValue()))).getValue());
        })).longValue();
    }

    @Override // io.pravega.client.segment.impl.SegmentMetadataClient
    public boolean compareAndSetAttribute(SegmentAttribute segmentAttribute, long j, long j2) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        return ((Boolean) RETRY_SCHEDULE.retryingOn(ConnectionFailedException.class).throwingOn(InvalidStreamException.class).run(() -> {
            return Boolean.valueOf(((WireCommands.SegmentAttributeUpdated) FutureHelpers.getThrowingException(updatePropertyAsync(segmentAttribute.getValue(), j, j2))).isSuccess());
        })).booleanValue();
    }

    @Override // io.pravega.client.segment.impl.SegmentMetadataClient, java.lang.AutoCloseable
    public void close() {
        log.info("Closing segment metadata connection for {}", this.segmentId);
        if (this.closed.compareAndSet(false, true)) {
            closeConnection(new ConnectionClosedException());
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    @ConstructorProperties({"segmentId", FlinkPravegaParams.CONTROLLER_PARAM_NAME, "connectionFactory"})
    public SegmentMetadataClientImpl(Segment segment, Controller controller, ConnectionFactory connectionFactory) {
        AtomicLong atomicLong = new AtomicLong();
        atomicLong.getClass();
        this.requestIdGenerator = atomicLong::incrementAndGet;
        this.responseProcessor = new ResponseProcessor();
        this.segmentId = segment;
        this.controller = controller;
        this.connectionFactory = connectionFactory;
    }
}
