package io.pravega.client.netty.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.ScheduledFuture;
import io.pravega.client.stream.impl.ConnectionClosedException;
import io.pravega.common.Exceptions;
import io.pravega.common.util.ReusableFutureLatch;
import io.pravega.shared.NameUtils;
import io.pravega.shared.metrics.ClientMetricKeys;
import io.pravega.shared.metrics.MetricNotifier;
import io.pravega.shared.protocol.netty.AppendBatchSizeTracker;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.Reply;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommands;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/netty/impl/FlowHandler.class */
public class FlowHandler extends ChannelInboundHandlerAdapter implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(FlowHandler.class);
    private static final int KEEP_ALIVE_TIMEOUT_SECONDS = 20;
    private static final int FLOW_DISABLED = 0;
    private final String connectionName;
    private final MetricNotifier metricNotifier;
    private final AtomicReference<Channel> channel;
    private final AtomicReference<ScheduledFuture<?>> keepAliveFuture;
    private final AtomicBoolean recentMessage;
    private final AtomicBoolean closed;

    @VisibleForTesting
    private final KeepAliveTask keepAlive;
    private final ReusableFutureLatch<Void> registeredFutureLatch;

    @VisibleForTesting
    private final ConcurrentHashMap<Integer, ReplyProcessor> flowIdReplyProcessorMap;
    private final ConcurrentHashMap<Integer, AppendBatchSizeTracker> flowIDBatchSizeTrackerMap;
    private final AtomicBoolean disableFlow;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pravega/client/netty/impl/FlowHandler$KeepAliveTask.class */
    public final class KeepAliveTask implements Runnable {

        @VisibleForTesting
        private final ChannelFutureListener listener = new ChannelFutureListener() { // from class: io.pravega.client.netty.impl.FlowHandler.KeepAliveTask.1
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                FlowHandler.this.recentMessage.set(true);
                if (channelFuture.isSuccess()) {
                    return;
                }
                FlowHandler.log.warn("Keepalive failed for connection {}", FlowHandler.this.connectionName);
                FlowHandler.this.close();
            }
        };

        KeepAliveTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!FlowHandler.this.recentMessage.getAndSet(false)) {
                FlowHandler.log.error("Connection {} stalled for more than {} seconds. Closing.", FlowHandler.this.connectionName, Integer.valueOf(FlowHandler.KEEP_ALIVE_TIMEOUT_SECONDS));
                FlowHandler.this.close();
                return;
            }
            try {
                FlowHandler.this.getChannel().writeAndFlush(new WireCommands.KeepAlive()).addListener(this.listener);
            } catch (Exception e) {
                FlowHandler.log.warn("Failed to send KeepAlive to {}. Closing this connection.", FlowHandler.this.connectionName, e);
                FlowHandler.this.close();
            }
        }

        @SuppressFBWarnings(justification = "generated code")
        ChannelFutureListener getListener() {
            return this.listener;
        }
    }

    public FlowHandler(String str) {
        this(str, MetricNotifier.NO_OP_METRIC_NOTIFIER);
    }

    public FlowHandler(String str, MetricNotifier metricNotifier) {
        this.channel = new AtomicReference<>();
        this.keepAliveFuture = new AtomicReference<>();
        this.recentMessage = new AtomicBoolean(true);
        this.closed = new AtomicBoolean(false);
        this.keepAlive = new KeepAliveTask();
        this.registeredFutureLatch = new ReusableFutureLatch<>();
        this.flowIdReplyProcessorMap = new ConcurrentHashMap<>();
        this.flowIDBatchSizeTrackerMap = new ConcurrentHashMap<>();
        this.disableFlow = new AtomicBoolean(false);
        this.connectionName = str;
        this.metricNotifier = metricNotifier;
    }

    public ClientConnection createFlow(Flow flow, ReplyProcessor replyProcessor) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkState(!this.disableFlow.get(), "Ensure flows are enabled.");
        int flowId = flow.getFlowId();
        log.info("Creating Flow {} for endpoint {}. The current Channel is {}.", new Object[]{Integer.valueOf(flow.getFlowId()), this.connectionName, this.channel.get()});
        if (this.flowIdReplyProcessorMap.put(Integer.valueOf(flowId), replyProcessor) != null) {
            throw new IllegalArgumentException("Multiple flows cannot be created with the same Flow id " + flowId);
        }
        createAppendBatchSizeTrackerIfNeeded(flowId);
        return new ClientConnectionImpl(this.connectionName, flowId, this);
    }

    public ClientConnection createConnectionWithFlowDisabled(ReplyProcessor replyProcessor) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkState(!this.disableFlow.getAndSet(true), "Flows are disabled, incorrect usage pattern.");
        log.info("Creating a new connection with flow disabled for endpoint {}. The current Channel is {}.", this.connectionName, this.channel.get());
        this.flowIdReplyProcessorMap.put(Integer.valueOf(FLOW_DISABLED), replyProcessor);
        createAppendBatchSizeTrackerIfNeeded(FLOW_DISABLED);
        return new ClientConnectionImpl(this.connectionName, FLOW_DISABLED, this);
    }

    public void closeFlow(ClientConnection clientConnection) {
        ClientConnectionImpl clientConnectionImpl = (ClientConnectionImpl) clientConnection;
        int flowId = clientConnectionImpl.getFlowId();
        log.info("Closing Flow {} for endpoint {}", Integer.valueOf(flowId), clientConnectionImpl.getConnectionName());
        this.flowIdReplyProcessorMap.remove(Integer.valueOf(flowId));
        this.flowIDBatchSizeTrackerMap.remove(Integer.valueOf(flowId));
        if (flowId == 0) {
            close();
        }
    }

    private void createAppendBatchSizeTrackerIfNeeded(int i) {
        if (this.flowIDBatchSizeTrackerMap.containsKey(Integer.valueOf(i))) {
            log.debug("Reusing Batch size tracker for Flow ID {}.", Integer.valueOf(i));
        } else {
            log.debug("Creating Batch size tracker for flow ID {}.", Integer.valueOf(i));
            this.flowIDBatchSizeTrackerMap.put(Integer.valueOf(i), new AppendBatchSizeTrackerImpl());
        }
    }

    public AppendBatchSizeTracker getAppendBatchSizeTracker(long j) {
        return this.flowIDBatchSizeTrackerMap.get(Integer.valueOf(this.disableFlow.get() ? FLOW_DISABLED : Flow.toFlowID(j)));
    }

    public int getOpenFlowCount() {
        return this.flowIdReplyProcessorMap.size();
    }

    public boolean isConnectionEstablished() {
        return this.channel.get() != null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Channel getChannel() throws ConnectionFailedException {
        Channel channel = this.channel.get();
        if (channel == null) {
            throw new ConnectionFailedException("Connection to " + this.connectionName + " is not established.");
        }
        return channel;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setRecentMessage() {
        this.recentMessage.set(true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeWhenReady(CompletableFuture<Void> completableFuture) {
        Preconditions.checkNotNull(completableFuture, "future");
        this.registeredFutureLatch.register(completableFuture);
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        Channel channel = channelHandlerContext.channel();
        this.channel.set(channel);
        log.info("Connection established with endpoint {} on channel {}", this.connectionName, channel);
        channel.writeAndFlush(new WireCommands.Hello(9, 5), channel.voidPromise());
        this.registeredFutureLatch.release((Object) null);
        ScheduledFuture<?> andSet = this.keepAliveFuture.getAndSet(channel.eventLoop().scheduleWithFixedDelay(this.keepAlive, 20L, 20L, TimeUnit.SECONDS));
        if (andSet != null) {
            andSet.cancel(false);
        }
    }

    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        ScheduledFuture<?> scheduledFuture = this.keepAliveFuture.get();
        if (scheduledFuture != null) {
            scheduledFuture.cancel(false);
        }
        this.channel.set(null);
        log.info("Connection drop observed with endpoint {}", this.connectionName);
        this.flowIdReplyProcessorMap.forEach((num, replyProcessor) -> {
            try {
                log.debug("Connection dropped for flow id {}", num);
                replyProcessor.connectionDropped();
            } catch (Exception e) {
                log.warn("Encountered exception invoking ReplyProcessor for flow id {}", num, e);
            }
        });
        this.registeredFutureLatch.releaseExceptionally(new ConnectionClosedException());
        super.channelUnregistered(channelHandlerContext);
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        WireCommands.DataAppended dataAppended = (Reply) obj;
        log.debug(this.connectionName + " processing reply {} with flow {}", dataAppended, Flow.from(dataAppended.getRequestId()));
        this.recentMessage.set(true);
        if (dataAppended instanceof WireCommands.Hello) {
            this.flowIdReplyProcessorMap.forEach((num, replyProcessor) -> {
                try {
                    replyProcessor.hello((WireCommands.Hello) dataAppended);
                } catch (Exception e) {
                    log.warn("Encountered exception invoking ReplyProcessor.hello for flow id {}", num, e);
                }
            });
            return;
        }
        if (dataAppended instanceof WireCommands.DataAppended) {
            WireCommands.DataAppended dataAppended2 = dataAppended;
            AppendBatchSizeTracker appendBatchSizeTracker = getAppendBatchSizeTracker(dataAppended2.getRequestId());
            if (appendBatchSizeTracker != null) {
                long recordAck = appendBatchSizeTracker.recordAck(dataAppended2.getEventNumber());
                if (!this.metricNotifier.equals(MetricNotifier.NO_OP_METRIC_NOTIFIER)) {
                    this.metricNotifier.updateSuccessMetric(ClientMetricKeys.CLIENT_OUTSTANDING_APPEND_COUNT, NameUtils.writerTags(dataAppended2.getWriterId().toString()), recordAck);
                }
            }
        }
        getReplyProcessor(dataAppended).ifPresent(replyProcessor2 -> {
            try {
                replyProcessor2.process(dataAppended);
            } catch (Exception e) {
                log.warn("ReplyProcessor.process failed for reply {} due to {}", dataAppended, e.getMessage());
                replyProcessor2.processingFailure(e);
            }
        });
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        invokeProcessingFailureForAllFlows(th);
    }

    private void invokeProcessingFailureForAllFlows(Throwable th) {
        this.flowIdReplyProcessorMap.forEach((num, replyProcessor) -> {
            try {
                log.debug("Exception observed for flow id {} due to {}", num, th.getMessage());
                replyProcessor.processingFailure(new ConnectionFailedException(th));
            } catch (Exception e) {
                log.warn("Encountered exception invoking ReplyProcessor.processingFailure for flow id {}", num, e);
            }
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Channel andSet;
        if (!this.closed.compareAndSet(false, true) || (andSet = this.channel.getAndSet(null)) == null) {
            return;
        }
        log.info("Closing connection with endpoint {} on channel {}", this.connectionName, andSet);
        int size = this.flowIdReplyProcessorMap.size();
        if (size != 0) {
            log.debug("{} flows are not closed", Integer.valueOf(size));
            invokeProcessingFailureForAllFlows(new ConnectionClosedException());
        }
        int size2 = this.flowIDBatchSizeTrackerMap.size();
        if (size2 != 0) {
            log.warn("{} AppendBatchSizeTrackers are not closed", Integer.valueOf(size2));
        }
        andSet.close();
    }

    private Optional<ReplyProcessor> getReplyProcessor(Reply reply) {
        int flowID = this.disableFlow.get() ? FLOW_DISABLED : Flow.toFlowID(reply.getRequestId());
        ReplyProcessor replyProcessor = this.flowIdReplyProcessorMap.get(Integer.valueOf(flowID));
        if (replyProcessor == null) {
            log.warn("No ReplyProcessor found for the provided flowId {}. Ignoring response", Integer.valueOf(flowID));
        }
        return Optional.ofNullable(replyProcessor);
    }

    @SuppressFBWarnings(justification = "generated code")
    public MetricNotifier getMetricNotifier() {
        return this.metricNotifier;
    }

    @SuppressFBWarnings(justification = "generated code")
    KeepAliveTask getKeepAlive() {
        return this.keepAlive;
    }

    @SuppressFBWarnings(justification = "generated code")
    public ReusableFutureLatch<Void> getRegisteredFutureLatch() {
        return this.registeredFutureLatch;
    }

    @SuppressFBWarnings(justification = "generated code")
    ConcurrentHashMap<Integer, ReplyProcessor> getFlowIdReplyProcessorMap() {
        return this.flowIdReplyProcessorMap;
    }
}
