package io.pravega.client.connection.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.shared.metrics.MetricNotifier;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.FailingReplyProcessor;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.protocol.netty.Reply;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommand;
import io.pravega.shared.protocol.netty.WireCommands;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/connection/impl/FlowHandler.class */
public class FlowHandler extends FailingReplyProcessor implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(FlowHandler.class);
    private static final int FLOW_DISABLED = 0;
    private final PravegaNodeUri location;
    private ClientConnection channel;
    private final MetricNotifier metricNotifier;
    private ScheduledFuture<?> keepAliveFuture;

    @VisibleForTesting
    private final KeepAliveTask keepAliveTask = new KeepAliveTask();
    private final AtomicBoolean recentMessage = new AtomicBoolean(false);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    @VisibleForTesting
    private final ConcurrentHashMap<Integer, ReplyProcessor> flowIdReplyProcessorMap = new ConcurrentHashMap<>();
    private final AtomicBoolean disableFlow = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:io/pravega/client/connection/impl/FlowHandler$KeepAliveTask.class */
    public final class KeepAliveTask implements Runnable {
        private final AtomicInteger concurrentlyRunning = new AtomicInteger(FlowHandler.FLOW_DISABLED);

        KeepAliveTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (!FlowHandler.this.recentMessage.getAndSet(false) && !FlowHandler.this.closed.get()) {
                    if (this.concurrentlyRunning.getAndIncrement() > 0) {
                        handleError(new TimeoutException("KeepAliveTask: Connection write was blocked for too long."));
                    }
                    FlowHandler.this.channel.send((WireCommand) new WireCommands.KeepAlive());
                    this.concurrentlyRunning.decrementAndGet();
                }
            } catch (Exception e) {
                handleError(e);
            }
        }

        @VisibleForTesting
        void handleError(Exception exc) {
            FlowHandler.log.warn("Failed to send KeepAlive to {}. Closing this connection.", FlowHandler.this.location, exc);
            FlowHandler.this.close();
        }
    }

    private FlowHandler(PravegaNodeUri pravegaNodeUri, MetricNotifier metricNotifier) {
        this.location = pravegaNodeUri;
        this.metricNotifier = metricNotifier;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static CompletableFuture<FlowHandler> openConnection(PravegaNodeUri pravegaNodeUri, MetricNotifier metricNotifier, ConnectionFactory connectionFactory) {
        FlowHandler flowHandler = new FlowHandler(pravegaNodeUri, metricNotifier);
        return connectionFactory.establishConnection(pravegaNodeUri, flowHandler).thenApply(clientConnection -> {
            flowHandler.channel = clientConnection;
            flowHandler.keepAliveFuture = connectionFactory.getInternalExecutor().scheduleAtFixedRate(flowHandler.keepAliveTask, 20L, 10L, TimeUnit.SECONDS);
            try {
                clientConnection.send((WireCommand) new WireCommands.Hello(15, 5));
                return flowHandler;
            } catch (ConnectionFailedException e) {
                throw Exceptions.sneakyThrow(e);
            }
        });
    }

    public ClientConnection createFlow(Flow flow, ReplyProcessor replyProcessor) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkState(!this.disableFlow.get(), "Ensure flows are enabled.");
        int flowId = flow.getFlowId();
        log.debug("Creating Flow {} for endpoint {}. ", Integer.valueOf(flow.getFlowId()), this.location);
        if (this.flowIdReplyProcessorMap.put(Integer.valueOf(flowId), replyProcessor) != null) {
            throw new IllegalArgumentException("Multiple flows cannot be created with the same Flow id " + flowId);
        }
        return new FlowClientConnection(this.location.toString(), this.channel, flowId, this);
    }

    public ClientConnection createConnectionWithFlowDisabled(ReplyProcessor replyProcessor) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkState(!this.disableFlow.getAndSet(true), "Flows are disabled, incorrect usage pattern.");
        log.debug("Creating a new connection with flow disabled for endpoint {}.", this.location);
        this.flowIdReplyProcessorMap.put(Integer.valueOf(FLOW_DISABLED), replyProcessor);
        return new FlowClientConnection(this.location.toString(), this.channel, FLOW_DISABLED, this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeFlow(FlowClientConnection flowClientConnection) {
        int flowId = flowClientConnection.getFlowId();
        log.debug("Closing Flow {} for endpoint {}", Integer.valueOf(flowId), flowClientConnection.getConnectionName());
        this.flowIdReplyProcessorMap.remove(Integer.valueOf(flowId));
        if (flowId == 0) {
            close();
        }
    }

    public int getOpenFlowCount() {
        return this.flowIdReplyProcessorMap.size();
    }

    void setRecentMessage() {
        this.recentMessage.set(true);
    }

    public void process(Reply reply) {
        if (log.isDebugEnabled()) {
            log.debug("{} processing reply {} with flow {}", new Object[]{this.location, reply, Flow.from(reply.getRequestId())});
        }
        setRecentMessage();
        if (reply instanceof WireCommands.Hello) {
            this.flowIdReplyProcessorMap.forEach((num, replyProcessor) -> {
                try {
                    replyProcessor.hello((WireCommands.Hello) reply);
                } catch (Exception e) {
                    log.warn("Encountered exception invoking ReplyProcessor.hello for flow id {}", num, e);
                }
            });
            return;
        }
        if (reply instanceof WireCommands.KeepAlive) {
            return;
        }
        ReplyProcessor replyProcessor2 = getReplyProcessor(reply);
        if (replyProcessor2 == null) {
            if (reply instanceof WireCommands.ReleasableCommand) {
                ((WireCommands.ReleasableCommand) reply).release();
            }
        } else {
            try {
                replyProcessor2.process(reply);
            } catch (Exception e) {
                log.warn("ReplyProcessor.process failed for reply {} due to {}", reply, e.getMessage());
                replyProcessor2.processingFailure(e);
            }
        }
    }

    public void errorMessage(WireCommands.ErrorMessage errorMessage) {
        log.warn("Received an errorMessage containing an unhandled {} on segment {}", errorMessage.getErrorCode().getExceptionType().getSimpleName(), errorMessage.getSegment());
        processingFailure(errorMessage.getThrowableException());
    }

    public void processingFailure(Exception exc) {
        invokeProcessingFailureForAllFlows(exc);
    }

    private void invokeProcessingFailureForAllFlows(Throwable th) {
        this.flowIdReplyProcessorMap.forEach((num, replyProcessor) -> {
            try {
                log.debug("Exception observed for flow id {} due to {}", num, th.getMessage());
                replyProcessor.processingFailure(new ConnectionFailedException(th));
            } catch (Exception e) {
                log.warn("Encountered exception invoking ReplyProcessor.processingFailure for flow id {}", num, e);
            }
        });
    }

    public void connectionDropped() {
        close();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            if (this.keepAliveFuture != null) {
                this.keepAliveFuture.cancel(false);
            }
            log.debug("Connection closed observed with endpoint {}", this.location);
            this.flowIdReplyProcessorMap.forEach((num, replyProcessor) -> {
                try {
                    log.debug("Connection dropped for flow id {}", num);
                    replyProcessor.connectionDropped();
                } catch (Exception e) {
                    log.warn("Encountered exception invoking ReplyProcessor for flow id {}", num, e);
                }
            });
            this.channel.close();
        }
    }

    public final boolean isClosed() {
        return this.closed.get();
    }

    private ReplyProcessor getReplyProcessor(Reply reply) {
        int flowID = this.disableFlow.get() ? FLOW_DISABLED : Flow.toFlowID(reply.getRequestId());
        ReplyProcessor replyProcessor = this.flowIdReplyProcessorMap.get(Integer.valueOf(flowID));
        if (replyProcessor == null) {
            log.warn("No ReplyProcessor found for the provided flowId {}. Ignoring response", Integer.valueOf(flowID));
            if (reply instanceof WireCommands.ReleasableCommand) {
                ((WireCommands.ReleasableCommand) reply).release();
            }
        }
        return replyProcessor;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public MetricNotifier getMetricNotifier() {
        return this.metricNotifier;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    KeepAliveTask getKeepAliveTask() {
        return this.keepAliveTask;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    ConcurrentHashMap<Integer, ReplyProcessor> getFlowIdReplyProcessorMap() {
        return this.flowIdReplyProcessorMap;
    }
}
