package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import io.prometheus.client.Gauge;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.broker.lookup.TopicLookupBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxn;
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandConsumerStats;
import org.apache.pulsar.common.api.proto.CommandEndTxn;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartition;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscription;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.api.proto.CommandGetLastMessageId;
import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandNewTxn;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.CommandProducer;
import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
import org.apache.pulsar.common.api.proto.CommandSeek;
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.CommandUtils;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/ServerCnx.class */
public class ServerCnx extends PulsarHandler implements TransportCnx {
    private final BrokerService service;
    private final SchemaRegistryService schemaService;
    private final String listenerName;
    private final HashMap<Long, Long> recentlyClosedProducers;
    private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
    private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
    private final boolean enableSubscriptionPatternEvaluation;
    private final int maxSubscriptionPatternLength;
    private final TopicListService topicListService;
    private final BrokerInterceptor brokerInterceptor;
    private State state;
    private volatile boolean isActive;
    private String authRole;
    private volatile AuthenticationDataSource authenticationData;
    private AuthenticationProvider authenticationProvider;
    private AuthenticationState authState;
    private AuthenticationState originalAuthState;
    private volatile AuthenticationDataSource originalAuthData;
    private AuthData originalAuthDataCopy;
    private boolean pendingAuthChallengeResponse;
    private ScheduledFuture<?> authRefreshTask;
    private final int maxPendingSendRequests;
    private final int resumeReadsThreshold;
    private int pendingSendRequest;
    private final String replicatorPrefix;
    private String clientVersion;
    private String proxyVersion;
    private int nonPersistentPendingMessages;
    private final int maxNonPersistentPendingMessages;
    private String originalPrincipal;
    private final boolean schemaValidationEnforced;
    private String authMethod;
    private final int maxMessageSize;
    private boolean preciseDispatcherFlowControl;
    private boolean preciseTopicPublishRateLimitingEnable;
    private boolean encryptionRequireOnProducer;
    private volatile boolean autoReadDisabledRateLimiting;
    private FeatureFlags features;
    private PulsarCommandSender commandSender;
    private final ConnectionController connectionController;
    private static final KeySharedMeta emptyKeySharedMeta;
    private boolean autoReadDisabledPublishBufferLimiting;
    private final long maxPendingBytesPerThread;
    private final long resumeThresholdPendingBytesPerThread;
    private final long connectionLivenessCheckTimeoutMillis;
    private static final FastThreadLocal<MutableLong> pendingBytesPerThread;
    private static final FastThreadLocal<Set<ServerCnx>> cnxsPerThread;
    private static final byte[] emptyArray;
    private static final Gauge throttledConnections;
    private static final Gauge throttledConnectionsGlobal;
    private static final Logger log;
    CompletableFuture<Boolean> connectionCheckInProgress;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/ServerCnx$State.class */
    public enum State {
        Start,
        Connected,
        Failed,
        Connecting
    }

    public ServerCnx(PulsarService pulsarService) {
        this(pulsarService, null);
    }

    public ServerCnx(PulsarService pulsarService, String str) {
        super(pulsarService.getBrokerService() != null ? pulsarService.getBrokerService().getKeepAliveIntervalSeconds() : 0, TimeUnit.SECONDS);
        this.isActive = true;
        this.authRole = null;
        this.pendingAuthChallengeResponse = false;
        this.pendingSendRequest = 0;
        this.clientVersion = null;
        this.proxyVersion = null;
        this.nonPersistentPendingMessages = 0;
        this.originalPrincipal = null;
        this.authMethod = "none";
        this.autoReadDisabledRateLimiting = false;
        this.autoReadDisabledPublishBufferLimiting = false;
        this.service = pulsarService.getBrokerService();
        this.schemaService = pulsarService.getSchemaRegistryService();
        this.listenerName = str;
        this.state = State.Start;
        ServiceConfiguration configuration = pulsarService.getConfiguration();
        this.connectionLivenessCheckTimeoutMillis = configuration.getConnectionLivenessCheckTimeoutMillis();
        this.producers = ConcurrentLongHashMap.newBuilder().expectedItems(8).concurrencyLevel(1).build();
        this.consumers = ConcurrentLongHashMap.newBuilder().expectedItems(8).concurrencyLevel(1).build();
        this.recentlyClosedProducers = new HashMap<>();
        this.replicatorPrefix = configuration.getReplicatorPrefix();
        this.maxNonPersistentPendingMessages = configuration.getMaxConcurrentNonPersistentMessagePerConnection();
        this.schemaValidationEnforced = configuration.isSchemaValidationEnforced();
        this.maxMessageSize = configuration.getMaxMessageSize();
        this.maxPendingSendRequests = configuration.getMaxPendingPublishRequestsPerConnection();
        this.resumeReadsThreshold = this.maxPendingSendRequests / 2;
        this.preciseDispatcherFlowControl = configuration.isPreciseDispatcherFlowControl();
        this.preciseTopicPublishRateLimitingEnable = configuration.isPreciseTopicPublishRateLimiterEnable();
        this.encryptionRequireOnProducer = configuration.isEncryptionRequireOnProducer();
        this.maxPendingBytesPerThread = ((configuration.getMaxMessagePublishBufferSizeInMB() * 1024) * 1024) / configuration.getNumIOThreads();
        this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2;
        this.connectionController = new ConnectionController.DefaultConnectionController(configuration.getBrokerMaxConnections(), configuration.getBrokerMaxConnectionsPerIp());
        this.enableSubscriptionPatternEvaluation = configuration.isEnableBrokerSideSubscriptionPatternEvaluation();
        this.maxSubscriptionPatternLength = configuration.getSubscriptionPatternMaxLength();
        this.topicListService = new TopicListService(pulsarService, this, this.enableSubscriptionPatternEvaluation, this.maxSubscriptionPatternLength);
        this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null;
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        ConnectionController.State increaseConnection = this.connectionController.increaseConnection(this.remoteAddress);
        if (!increaseConnection.equals(ConnectionController.State.OK)) {
            NettyChannelUtil.writeAndFlushWithClosePromise(channelHandlerContext, Commands.newError(-1L, ServerError.NotAllowedError, increaseConnection.equals(ConnectionController.State.REACH_MAX_CONNECTION) ? "Reached the maximum number of connections" : "Reached the maximum number of connections on address" + this.remoteAddress));
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("New connection from {}", this.remoteAddress);
        }
        this.ctx = channelHandlerContext;
        this.commandSender = new PulsarCommandSenderImpl(this.brokerInterceptor, this);
        this.service.getPulsarStats().recordConnectionCreate();
        ((Set) cnxsPerThread.get()).add(this);
        this.service.getPulsar().runWhenReadyForIncomingRequests(() -> {
            channelHandlerContext.channel().config().setAutoRead(true);
        });
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        this.connectionController.decreaseConnection(channelHandlerContext.channel().remoteAddress());
        this.isActive = false;
        log.info("Closed connection from {}", this.remoteAddress);
        if (this.brokerInterceptor != null) {
            this.brokerInterceptor.onConnectionClosed(this);
        }
        ((Set) cnxsPerThread.get()).remove(this);
        if (this.authRefreshTask != null) {
            this.authRefreshTask.cancel(false);
        }
        this.producers.forEach((j, completableFuture) -> {
            if ((completableFuture.isDone() || !completableFuture.completeExceptionally(new IllegalStateException("Connection closed."))) && completableFuture.isDone() && !completableFuture.isCompletedExceptionally()) {
                Producer producer = (Producer) completableFuture.getNow(null);
                producer.closeNow(true);
                if (this.brokerInterceptor != null) {
                    this.brokerInterceptor.producerClosed(this, producer, producer.getMetadata());
                }
            }
        });
        this.consumers.forEach((j2, completableFuture2) -> {
            if ((completableFuture2.isDone() || !completableFuture2.completeExceptionally(new IllegalStateException("Connection closed."))) && completableFuture2.isDone() && !completableFuture2.isCompletedExceptionally()) {
                Consumer consumer = (Consumer) completableFuture2.getNow(null);
                try {
                    consumer.close();
                    if (this.brokerInterceptor != null) {
                        this.brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata());
                    }
                } catch (BrokerServiceException e) {
                    log.warn("Consumer {} was already closed: {}", consumer, e);
                }
            }
        });
        this.topicListService.inactivate();
        this.service.getPulsarStats().recordConnectionClose();
        if (this.connectionCheckInProgress == null || this.connectionCheckInProgress.isDone()) {
            return;
        }
        this.connectionCheckInProgress.complete(false);
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Channel writability has changed to: {}", Boolean.valueOf(channelHandlerContext.channel().isWritable()));
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        if (this.state != State.Failed) {
            log.warn("[{}] Got exception {}", this.remoteAddress, ClientCnx.isKnownException(th) ? th.toString() : ExceptionUtils.getStackTrace(th));
            this.state = State.Failed;
            if (log.isDebugEnabled()) {
                log.debug("[{}] connect state change to : [{}]", this.remoteAddress, State.Failed.name());
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Got exception {}", this.remoteAddress, ClientCnx.isKnownException(th) ? th.toString() : ExceptionUtils.getStackTrace(th));
        }
        channelHandlerContext.close();
    }

    private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation topicOperation, AuthenticationDataSource authenticationDataSource, AuthenticationDataSource authenticationDataSource2) {
        CompletableFuture completedFuture;
        if (!this.service.isAuthorizationEnabled()) {
            return CompletableFuture.completedFuture(true);
        }
        if (this.originalPrincipal != null) {
            completedFuture = this.service.getAuthorizationService().allowTopicOperationAsync(topicName, topicOperation, this.originalPrincipal, authenticationDataSource2 != null ? authenticationDataSource2 : authenticationDataSource);
        } else {
            completedFuture = CompletableFuture.completedFuture(true);
        }
        return completedFuture.thenCombine((CompletionStage) this.service.getAuthorizationService().allowTopicOperationAsync(topicName, topicOperation, this.authRole, authenticationDataSource), (bool, bool2) -> {
            if (!bool.booleanValue()) {
                log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}", new Object[]{this.originalPrincipal, topicOperation, topicName});
            }
            if (!bool2.booleanValue()) {
                log.warn("Role {} is not authorized to perform operation {} on topic {}", new Object[]{this.authRole, topicOperation, topicName});
            }
            return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
        });
    }

    private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String str, TopicOperation topicOperation) {
        if (!this.service.isAuthorizationEnabled()) {
            return CompletableFuture.completedFuture(true);
        }
        AuthenticationDataSubscription authenticationDataSubscription = new AuthenticationDataSubscription(this.authenticationData, str);
        AuthenticationDataSubscription authenticationDataSubscription2 = null;
        if (this.originalAuthData != null) {
            authenticationDataSubscription2 = new AuthenticationDataSubscription(this.originalAuthData, str);
        }
        return isTopicOperationAllowed(topicName, topicOperation, authenticationDataSubscription, authenticationDataSubscription2);
    }

    protected void handleLookup(CommandLookupTopic commandLookupTopic) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandLookupTopic.getRequestId();
        boolean isAuthoritative = commandLookupTopic.isAuthoritative();
        String advertisedListenerName = (commandLookupTopic.hasAdvertisedListenerName() && StringUtils.isNotBlank(commandLookupTopic.getAdvertisedListenerName())) ? commandLookupTopic.getAdvertisedListenerName() : this.listenerName;
        if (log.isDebugEnabled()) {
            Logger logger = log;
            Object[] objArr = new Object[4];
            objArr[0] = commandLookupTopic.getTopic();
            objArr[1] = this.remoteAddress;
            objArr[2] = Long.valueOf(requestId);
            objArr[3] = StringUtils.isNotBlank(advertisedListenerName) ? advertisedListenerName : "(none)";
            logger.debug("[{}] Received Lookup from {} for {} requesting listener {}", objArr);
        }
        TopicName validateTopicName = validateTopicName(commandLookupTopic.getTopic(), requestId, commandLookupTopic);
        if (validateTopicName == null) {
            return;
        }
        if (!this.service.getPulsar().isRunning()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed lookup topic {} due to pulsar service is not ready: {} state", new Object[]{this.remoteAddress, validateTopicName, this.service.getPulsar().getState().toString()});
            }
            writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady, "Failed due to pulsar service is not ready", requestId));
            return;
        }
        Semaphore lookupRequestSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupRequestSemaphore.tryAcquire()) {
            isTopicOperationAllowed(validateTopicName, TopicOperation.LOOKUP, this.authenticationData, this.originalAuthData).thenApply(bool -> {
                if (bool.booleanValue()) {
                    TopicLookupBase.lookupTopicAsync(getBrokerService().pulsar(), validateTopicName, isAuthoritative, getPrincipal(), getAuthenticationData(), requestId, advertisedListenerName).handle((byteBuf, th) -> {
                        if (th == null) {
                            writeAndFlush(byteBuf);
                        } else {
                            log.warn("[{}] lookup failed with error {}, {}", new Object[]{this.remoteAddress, validateTopicName, th.getMessage(), th});
                            writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady, th.getMessage(), requestId));
                        }
                        lookupRequestSemaphore.release();
                        return null;
                    });
                    return null;
                }
                log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Client is not authorized to Lookup", getPrincipal(), validateTopicName});
                writeAndFlush(Commands.newLookupErrorResponse(ServerError.AuthorizationError, "Client is not authorized to Lookup", requestId));
                lookupRequestSemaphore.release();
                return null;
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                logAuthException(this.remoteAddress, "lookup", getPrincipal(), Optional.of(validateTopicName), th);
                writeAndFlush(Commands.newLookupErrorResponse(ServerError.AuthorizationError, "Exception occurred while trying to authorize lookup", requestId));
                lookupRequestSemaphore.release();
                return null;
            });
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Failed lookup due to too many lookup-requests {}", this.remoteAddress, validateTopicName);
        }
        writeAndFlush(Commands.newLookupErrorResponse(ServerError.TooManyRequests, "Failed due to too many pending lookup requests", requestId));
    }

    private void writeAndFlush(ByteBuf byteBuf) {
        NettyChannelUtil.writeAndFlushWithVoidPromise(this.ctx, byteBuf);
    }

    protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata commandPartitionedTopicMetadata) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandPartitionedTopicMetadata.getRequestId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", new Object[]{commandPartitionedTopicMetadata.getTopic(), this.remoteAddress, Long.valueOf(requestId)});
        }
        TopicName validateTopicName = validateTopicName(commandPartitionedTopicMetadata.getTopic(), requestId, commandPartitionedTopicMetadata);
        if (validateTopicName == null) {
            return;
        }
        if (!this.service.getPulsar().isRunning()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed PartitionMetadataLookup from {} for {} due to pulsar service is not ready: {} state", new Object[]{commandPartitionedTopicMetadata.getTopic(), this.remoteAddress, Long.valueOf(requestId), this.service.getPulsar().getState().toString()});
            }
            writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, "Failed due to pulsar service is not ready", requestId));
            return;
        }
        Semaphore lookupRequestSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupRequestSemaphore.tryAcquire()) {
            isTopicOperationAllowed(validateTopicName, TopicOperation.LOOKUP, this.authenticationData, this.originalAuthData).thenApply(bool -> {
                if (bool.booleanValue()) {
                    getBrokerService().isAllowAutoTopicCreationAsync(validateTopicName).thenAccept(bool -> {
                        if (commandPartitionedTopicMetadata.isMetadataAutoCreationEnabled() && bool.booleanValue()) {
                            PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), validateTopicName).whenComplete((partitionedTopicMetadata, th) -> {
                                lookupRequestSemaphore.release();
                                if (th == 0) {
                                    this.commandSender.sendPartitionMetadataResponse(partitionedTopicMetadata.partitions, requestId);
                                    return;
                                }
                                if (th instanceof PulsarClientException) {
                                    log.warn("Failed to authorize {} at [{}] on topic {} : {}", new Object[]{getRole(), this.remoteAddress, validateTopicName, th.getMessage()});
                                    this.commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, th.getMessage(), requestId);
                                    return;
                                }
                                ServerError serverError = ServerError.ServiceNotReady;
                                if (th instanceof MetadataStoreException) {
                                    serverError = ServerError.MetadataError;
                                } else if (th instanceof RestException) {
                                    int status = ((RestException) th).getResponse().getStatus();
                                    if (status == Response.Status.NOT_FOUND.getStatusCode()) {
                                        serverError = ServerError.TopicNotFound;
                                    } else if (status < Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) {
                                        serverError = ServerError.MetadataError;
                                    }
                                }
                                if (serverError == ServerError.TopicNotFound) {
                                    log.info("Trying to get Partitioned Metadata for a resource not exist[{}] {}: {}", new Object[]{this.remoteAddress, validateTopicName, th.getMessage()});
                                } else {
                                    log.warn("Failed to get Partitioned Metadata [{}] {}: {}", new Object[]{this.remoteAddress, validateTopicName, th.getMessage(), th});
                                }
                                this.commandSender.sendPartitionMetadataResponse(serverError, th.getMessage(), requestId);
                            });
                        } else {
                            getBrokerService().getPulsar().getNamespaceService().checkTopicExists(validateTopicName).thenAccept(topicExistsInfo -> {
                                lookupRequestSemaphore.release();
                                if (!topicExistsInfo.isExists()) {
                                    writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TopicNotFound, "", requestId));
                                } else if (topicExistsInfo.getTopicType().equals(TopicType.PARTITIONED)) {
                                    this.commandSender.sendPartitionMetadataResponse(topicExistsInfo.getPartitions(), requestId);
                                } else {
                                    this.commandSender.sendPartitionMetadataResponse(0, requestId);
                                }
                                topicExistsInfo.recycle();
                            }).exceptionally(th2 -> {
                                lookupRequestSemaphore.release();
                                log.error("{} {} Failed to get partition metadata", new Object[]{validateTopicName, toString(), th2});
                                writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError, "Failed to get partition metadata", requestId));
                                return null;
                            });
                        }
                    });
                    return null;
                }
                log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Client is not authorized to Get Partition Metadata", getPrincipal(), validateTopicName});
                writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, "Client is not authorized to Get Partition Metadata", requestId));
                lookupRequestSemaphore.release();
                return null;
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                logAuthException(this.remoteAddress, "partition-metadata", getPrincipal(), Optional.of(validateTopicName), th);
                WebApplicationException unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                if ((unwrapCompletionException instanceof WebApplicationException) && unwrapCompletionException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
                    writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TopicNotFound, "Tenant or namespace or topic does not exist: " + validateTopicName.getNamespace(), requestId));
                    lookupRequestSemaphore.release();
                    return null;
                }
                writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, "Exception occurred while trying to authorize get Partition Metadata", requestId));
                lookupRequestSemaphore.release();
                return null;
            });
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requests {}", this.remoteAddress, validateTopicName);
        }
        this.commandSender.sendPartitionMetadataResponse(ServerError.TooManyRequests, "Failed due to too many pending lookup requests", requestId);
    }

    protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) {
        ByteBuf createConsumerStatsResponse;
        Preconditions.checkArgument(this.state == State.Connected);
        if (log.isDebugEnabled()) {
            log.debug("Received CommandConsumerStats call from {}", this.remoteAddress);
        }
        long requestId = commandConsumerStats.getRequestId();
        long consumerId = commandConsumerStats.getConsumerId();
        Consumer consumer = (Consumer) ((CompletableFuture) this.consumers.get(consumerId)).getNow(null);
        if (consumer == null) {
            log.error("Failed to get consumer-stats response - Consumer not found for CommandConsumerStats[remoteAddress = {}, requestId = {}, consumerId = {}]", new Object[]{this.remoteAddress, Long.valueOf(requestId), Long.valueOf(consumerId)});
            createConsumerStatsResponse = Commands.newConsumerStatsResponse(ServerError.ConsumerNotFound, "Consumer " + consumerId + " not found", requestId);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("CommandConsumerStats[requestId = {}, consumer = {}]", Long.valueOf(requestId), consumer);
            }
            createConsumerStatsResponse = createConsumerStatsResponse(consumer, requestId);
        }
        writeAndFlush(createConsumerStatsResponse);
    }

    ByteBuf createConsumerStatsResponse(Consumer consumer, long j) {
        ConsumerStatsImpl stats = consumer.getStats();
        Subscription subscription = consumer.getSubscription();
        BaseCommand newConsumerStatsResponseCommand = Commands.newConsumerStatsResponseCommand(ServerError.UnknownError, (String) null, j);
        newConsumerStatsResponseCommand.getConsumerStatsResponse().clearErrorCode().setRequestId(j).setMsgRateOut(stats.msgRateOut).setMsgThroughputOut(stats.msgThroughputOut).setMsgRateRedeliver(stats.msgRateRedeliver).setConsumerName(stats.consumerName).setAvailablePermits(stats.availablePermits).setUnackedMessages(stats.unackedMessages).setBlockedConsumerOnUnackedMsgs(stats.blockedConsumerOnUnackedMsgs).setAddress(stats.getAddress()).setConnectedSince(stats.getConnectedSince()).setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false)).setMsgRateExpired(subscription.getExpiredMessageRate()).setMessageAckRate(stats.messageAckRate).setType(subscription.getTypeString());
        return Commands.serializeWithSize(newConsumerStatsResponseCommand);
    }

    private void completeConnect(int i, String str) {
        if (this.service.isAuthenticationEnabled()) {
            if (this.service.isAuthorizationEnabled()) {
                if (!this.service.getAuthorizationService().isValidOriginalPrincipal(this.authRole, this.originalPrincipal, this.remoteAddress, false)) {
                    this.state = State.Failed;
                    this.service.getPulsarStats().recordConnectionCreateFail();
                    NettyChannelUtil.writeAndFlushWithClosePromise(this.ctx, Commands.newError(-1L, ServerError.AuthorizationError, "Invalid roles."));
                    return;
                } else if (this.proxyVersion != null && !this.service.getAuthorizationService().isProxyRole(this.authRole)) {
                    this.state = State.Failed;
                    this.service.getPulsarStats().recordConnectionCreateFail();
                    NettyChannelUtil.writeAndFlushWithClosePromise(this.ctx, Commands.newError(-1L, ServerError.AuthorizationError, "Must not set proxyVersion without connecting as a ProxyRole."));
                    return;
                }
            }
            maybeScheduleAuthenticationCredentialsRefresh();
        }
        writeAndFlush(Commands.newConnected(i, this.maxMessageSize, this.enableSubscriptionPatternEvaluation));
        this.state = State.Connected;
        this.service.getPulsarStats().recordConnectionCreateSuccess();
        if (log.isDebugEnabled()) {
            log.debug("[{}] connect state change to : [{}]", this.remoteAddress, State.Connected.name());
        }
        setRemoteEndpointProtocolVersion(i);
        if (StringUtils.isNotBlank(str)) {
            this.clientVersion = str.intern();
        }
        if (!this.service.isAuthenticationEnabled()) {
            log.info("[{}] connected with clientVersion={}, clientProtocolVersion={}, proxyVersion={}", new Object[]{this.remoteAddress, str, Integer.valueOf(i), this.proxyVersion});
        } else if (this.originalPrincipal != null) {
            log.info("[{}] connected role={} and originalAuthRole={} using authMethod={}, clientVersion={}, clientProtocolVersion={}, proxyVersion={}", new Object[]{this.remoteAddress, this.authRole, this.originalPrincipal, this.authMethod, str, Integer.valueOf(i), this.proxyVersion});
        } else {
            log.info("[{}] connected with role={} using authMethod={}, clientVersion={}, clientProtocolVersion={}, proxyVersion={}", new Object[]{this.remoteAddress, this.authRole, this.authMethod, str, Integer.valueOf(i), this.proxyVersion});
        }
        if (this.brokerInterceptor != null) {
            this.brokerInterceptor.onConnectionCreated(this);
        }
    }

    private void doAuthentication(AuthData authData, boolean z, int i, String str) {
        AuthenticationState authenticationState = z ? this.originalAuthState : this.authState;
        String str2 = z ? this.originalPrincipal : this.authRole;
        if (log.isDebugEnabled()) {
            log.debug("Authenticate using original auth state : {}, role = {}", Boolean.valueOf(z), str2);
        }
        authenticationState.authenticateAsync(authData).whenCompleteAsync((authData2, th) -> {
            if (th == null) {
                authChallengeSuccessCallback(authData2, z, str2, i, str);
            } else {
                authenticationFailed(th);
            }
        }, (Executor) this.ctx.executor());
    }

    public void authChallengeSuccessCallback(AuthData authData, boolean z, String str, int i, String str2) {
        try {
            if (authData == null) {
                AuthenticationState authenticationState = z ? this.originalAuthState : this.authState;
                String authRole = authenticationState.getAuthRole();
                AuthenticationDataSource authDataSource = authenticationState.getAuthDataSource();
                if (this.state != State.Connected) {
                    if (!z) {
                        this.authRole = authRole;
                        this.authenticationData = authDataSource;
                    }
                    if (this.originalAuthState != null) {
                        authenticateOriginalData(i, str2);
                    } else {
                        completeConnect(i, str2);
                    }
                } else {
                    if (z) {
                        this.originalAuthData = authDataSource;
                    } else {
                        this.authenticationData = authDataSource;
                    }
                    if (!StringUtils.isEmpty(str)) {
                        if (str.equals(authRole)) {
                            log.info("[{}] Refreshed authentication credentials for role {}", this.remoteAddress, str);
                        } else {
                            log.warn("[{}] Principal cannot change during an authentication refresh expected={} got={}", new Object[]{this.remoteAddress, str, authRole});
                            this.ctx.close();
                        }
                    }
                }
            } else {
                this.ctx.writeAndFlush(Commands.newAuthChallenge(this.authMethod, authData, i));
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Authentication in progress client by method {}.", this.remoteAddress, this.authMethod);
                }
            }
        } catch (AssertionError | Exception e) {
            authenticationFailed(e);
        }
    }

    private void authenticateOriginalData(int i, String str) {
        this.originalAuthState.authenticateAsync(this.originalAuthDataCopy).whenCompleteAsync((authData, th) -> {
            if (th != null) {
                authenticationFailed(th);
                return;
            }
            if (authData != null) {
                authenticationFailed(new AuthenticationException("Failed to authenticate original auth data due to unsupported authChallenge."));
                return;
            }
            try {
                this.originalAuthDataCopy = null;
                this.originalAuthData = this.originalAuthState.getAuthDataSource();
                this.originalPrincipal = this.originalAuthState.getAuthRole();
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Authenticated original role (forwarded from proxy): {}", this.remoteAddress, this.originalPrincipal);
                }
                completeConnect(i, str);
            } catch (AssertionError | Exception e) {
                authenticationFailed(e);
            }
        }, (Executor) this.ctx.executor());
    }

    private void authenticationFailed(Throwable th) {
        String str;
        if (this.state == State.Connecting) {
            this.service.getPulsarStats().recordConnectionCreateFail();
            str = "connect";
        } else {
            str = "authentication-refresh";
        }
        this.state = State.Failed;
        logAuthException(this.remoteAddress, str, getPrincipal(), Optional.empty(), th);
        NettyChannelUtil.writeAndFlushWithClosePromise(this.ctx, Commands.newError(-1L, ServerError.AuthenticationError, "Failed to authenticate"));
    }

    private void maybeScheduleAuthenticationCredentialsRefresh() {
        if (!$assertionsDisabled && !this.ctx.executor().inEventLoop()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.authRefreshTask != null) {
            throw new AssertionError();
        }
        if (this.authState == null) {
            return;
        }
        this.authRefreshTask = this.ctx.executor().scheduleAtFixedRate(this::refreshAuthenticationCredentials, this.service.getPulsar().getConfig().getAuthenticationRefreshCheckSeconds(), this.service.getPulsar().getConfig().getAuthenticationRefreshCheckSeconds(), TimeUnit.SECONDS);
    }

    private void refreshAuthenticationCredentials() {
        if (!$assertionsDisabled && !this.ctx.executor().inEventLoop()) {
            throw new AssertionError();
        }
        AuthenticationState authenticationState = this.originalAuthState != null ? this.originalAuthState : this.authState;
        if (getState() != State.Failed && authenticationState.isExpired()) {
            if (this.originalPrincipal != null && this.originalAuthState == null) {
                log.info("[{}] Cannot revalidate user credential when using proxy and not forwarding the credentials. Closing connection", this.remoteAddress);
                this.ctx.close();
                return;
            }
            if (!supportsAuthenticationRefresh()) {
                log.warn("[{}] Closing connection because client doesn't support auth credentials refresh", this.remoteAddress);
                this.ctx.close();
                return;
            }
            if (this.pendingAuthChallengeResponse) {
                log.warn("[{}] Closing connection after timeout on refreshing auth credentials", this.remoteAddress);
                this.ctx.close();
                return;
            }
            log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}", new Object[]{this.remoteAddress, this.originalPrincipal, this.authRole});
            try {
                writeAndFlush(Commands.newAuthChallenge(this.authMethod, authenticationState.refreshAuthentication(), getRemoteEndpointProtocolVersion()));
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.", this.remoteAddress, this.authMethod);
                }
                this.pendingAuthChallengeResponse = true;
            } catch (AuthenticationException e) {
                log.warn("[{}] Failed to refresh authentication: {}", this.remoteAddress, e);
                this.ctx.close();
            }
        }
    }

    protected void handleConnect(CommandConnect commandConnect) {
        Preconditions.checkArgument(this.state == State.Start);
        if (log.isDebugEnabled()) {
            Logger logger = log;
            Object[] objArr = new Object[4];
            objArr[0] = this.remoteAddress;
            objArr[1] = Boolean.valueOf(this.service.isAuthenticationEnabled());
            objArr[2] = Boolean.valueOf(commandConnect.hasOriginalPrincipal());
            objArr[3] = commandConnect.hasOriginalPrincipal() ? commandConnect.getOriginalPrincipal() : null;
            logger.debug("Received CONNECT from {}, auth enabled: {}: has original principal = {}, original principal = {}", objArr);
        }
        if (!this.service.getPulsar().isRunning()) {
            if (log.isDebugEnabled()) {
                log.debug("Failed CONNECT from {} due to pulsar service is not ready: {} state", this.remoteAddress, this.service.getPulsar().getState().toString());
            }
            writeAndFlush(Commands.newError(-1L, ServerError.ServiceNotReady, "Failed due to pulsar service is not ready"));
            close();
            return;
        }
        String clientVersion = commandConnect.getClientVersion();
        int protocolVersion = commandConnect.getProtocolVersion();
        this.features = new FeatureFlags();
        if (commandConnect.hasFeatureFlags()) {
            this.features.copyFrom(commandConnect.getFeatureFlags());
        }
        if (commandConnect.hasProxyVersion()) {
            this.proxyVersion = commandConnect.getProxyVersion();
        }
        if (!this.service.isAuthenticationEnabled()) {
            completeConnect(protocolVersion, clientVersion);
            return;
        }
        this.state = State.Connecting;
        try {
            AuthData of = AuthData.of(commandConnect.hasAuthData() ? commandConnect.getAuthData() : emptyArray);
            if (commandConnect.hasAuthMethodName()) {
                this.authMethod = commandConnect.getAuthMethodName();
            } else if (commandConnect.hasAuthMethod()) {
                this.authMethod = commandConnect.getAuthMethod().name().substring(10).toLowerCase();
            } else {
                this.authMethod = "none";
            }
            this.authenticationProvider = getBrokerService().getAuthenticationService().getAuthenticationProvider(this.authMethod);
            if (this.authenticationProvider == null) {
                this.authRole = (String) getBrokerService().getAuthenticationService().getAnonymousUserRole().orElseThrow(() -> {
                    return new AuthenticationException("No anonymous role, and no authentication provider configured");
                });
                completeConnect(protocolVersion, clientVersion);
                return;
            }
            SslHandler sslHandler = this.ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
            SSLSession sSLSession = null;
            if (sslHandler != null) {
                sSLSession = sslHandler.engine().getSession();
            }
            this.authState = this.authenticationProvider.newAuthState(of, this.remoteAddress, sSLSession);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Authenticate role : {}", this.remoteAddress, (this.authState == null || !this.authState.isComplete()) ? "authentication incomplete or null" : this.authState.getAuthRole());
            }
            if (commandConnect.hasOriginalPrincipal() && this.service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) {
                String originalAuthMethod = commandConnect.hasOriginalAuthMethod() ? commandConnect.getOriginalAuthMethod() : "none";
                AuthenticationProvider authenticationProvider = getBrokerService().getAuthenticationService().getAuthenticationProvider(originalAuthMethod);
                if (authenticationProvider == null) {
                    String str = originalAuthMethod;
                    this.authRole = (String) getBrokerService().getAuthenticationService().getAnonymousUserRole().orElseThrow(() -> {
                        return new AuthenticationException("No anonymous role, and can't find AuthenticationProvider for original role using auth method [" + str + "] is not available");
                    });
                    this.originalPrincipal = this.authRole;
                    completeConnect(protocolVersion, clientVersion);
                    return;
                }
                this.originalAuthDataCopy = AuthData.of(commandConnect.getOriginalAuthData().getBytes());
                this.originalAuthState = authenticationProvider.newAuthState(this.originalAuthDataCopy, this.remoteAddress, sSLSession);
            } else if (commandConnect.hasOriginalPrincipal()) {
                this.originalPrincipal = commandConnect.getOriginalPrincipal();
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Setting original role (forwarded from proxy): {}", this.remoteAddress, this.originalPrincipal);
                }
            }
            doAuthentication(of, false, protocolVersion, clientVersion);
        } catch (Exception e) {
            authenticationFailed(e);
        }
    }

    protected void handleAuthResponse(CommandAuthResponse commandAuthResponse) {
        Preconditions.checkArgument(commandAuthResponse.hasResponse());
        Preconditions.checkArgument(commandAuthResponse.getResponse().hasAuthData() && commandAuthResponse.getResponse().hasAuthMethodName());
        this.pendingAuthChallengeResponse = false;
        if (log.isDebugEnabled()) {
            log.debug("Received AuthResponse from {}, auth method: {}", this.remoteAddress, commandAuthResponse.getResponse().getAuthMethodName());
        }
        try {
            doAuthentication(AuthData.of(commandAuthResponse.getResponse().getAuthData()), this.originalAuthState != null, commandAuthResponse.getProtocolVersion(), commandAuthResponse.hasClientVersion() ? commandAuthResponse.getClientVersion() : "");
        } catch (Exception e) {
            authenticationFailed(e);
        }
    }

    protected void handleSubscribe(CommandSubscribe commandSubscribe) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandSubscribe.getRequestId();
        long consumerId = commandSubscribe.getConsumerId();
        TopicName validateTopicName = validateTopicName(commandSubscribe.getTopic(), requestId, commandSubscribe);
        if (validateTopicName == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}", new Object[]{this.remoteAddress, this.authRole, this.originalPrincipal});
        }
        String subscription = commandSubscribe.getSubscription();
        CommandSubscribe.SubType subType = commandSubscribe.getSubType();
        String consumerName = commandSubscribe.hasConsumerName() ? commandSubscribe.getConsumerName() : "";
        boolean isDurable = commandSubscribe.isDurable();
        BatchMessageIdImpl batchMessageIdImpl = commandSubscribe.hasStartMessageId() ? new BatchMessageIdImpl(commandSubscribe.getStartMessageId().getLedgerId(), commandSubscribe.getStartMessageId().getEntryId(), commandSubscribe.getStartMessageId().getPartition(), commandSubscribe.getStartMessageId().getBatchIndex()) : null;
        int priorityLevel = commandSubscribe.hasPriorityLevel() ? commandSubscribe.getPriorityLevel() : 0;
        boolean z = commandSubscribe.hasReadCompacted() && commandSubscribe.isReadCompacted();
        Map metadataFromCommand = CommandUtils.metadataFromCommand(commandSubscribe);
        CommandSubscribe.InitialPosition initialPosition = commandSubscribe.getInitialPosition();
        long startMessageRollbackDurationSec = commandSubscribe.hasStartMessageRollbackDurationSec() ? commandSubscribe.getStartMessageRollbackDurationSec() : -1L;
        SchemaData schema = commandSubscribe.hasSchema() ? getSchema(commandSubscribe.getSchema()) : null;
        boolean z2 = commandSubscribe.hasReplicateSubscriptionState() && commandSubscribe.isReplicateSubscriptionState();
        boolean isForceTopicCreation = commandSubscribe.isForceTopicCreation();
        KeySharedMeta copyFrom = commandSubscribe.hasKeySharedMeta() ? new KeySharedMeta().copyFrom(commandSubscribe.getKeySharedMeta()) : emptyKeySharedMeta;
        long consumerEpoch = commandSubscribe.hasConsumerEpoch() ? commandSubscribe.getConsumerEpoch() : -1L;
        Optional<Map<String, String>> propertiesMap = SubscriptionOption.getPropertiesMap(commandSubscribe.getSubscriptionPropertiesList());
        if (log.isDebugEnabled()) {
            Logger logger = log;
            Object[] objArr = new Object[3];
            objArr[0] = validateTopicName;
            objArr[1] = subscription;
            objArr[2] = schema == null ? "absent" : "present";
            logger.debug("Topic name = {}, subscription name = {}, schema is {}", objArr);
        }
        CompletableFuture<Boolean> isTopicOperationAllowed = isTopicOperationAllowed(validateTopicName, subscription, TopicOperation.CONSUME);
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = (CompletableFuture) this.consumers.putIfAbsent(consumerId, completableFuture);
        isTopicOperationAllowed.thenApply(bool -> {
            if (!bool.booleanValue()) {
                log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, "Client is not authorized to subscribe", getPrincipal()});
                this.consumers.remove(consumerId, completableFuture);
                writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, "Client is not authorized to subscribe"));
                return null;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Client is authorized to subscribe with role {}", this.remoteAddress, getPrincipal());
            }
            log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", new Object[]{toString(), validateTopicName, subscription, Long.valueOf(consumerId)});
            try {
                Metadata.validateMetadata(metadataFromCommand, this.service.getPulsar().getConfiguration().getMaxConsumerMetadataSize());
                if (completableFuture2 == null) {
                    this.service.isAllowAutoTopicCreationAsync(validateTopicName.toString()).thenApply(bool -> {
                        return Boolean.valueOf(isForceTopicCreation && bool.booleanValue());
                    }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) bool2 -> {
                        return this.service.getTopic(validateTopicName.toString(), bool2.booleanValue());
                    }).thenCompose(optional -> {
                        if (!optional.isPresent()) {
                            return FutureUtil.failedFuture(new BrokerServiceException.TopicNotFoundException("Topic " + validateTopicName + " does not exist"));
                        }
                        Topic topic = (Topic) optional.get();
                        if (!((AbstractTopic) topic).isConsumersExceededOnTopic()) {
                            return this.service.isAllowAutoSubscriptionCreationAsync(validateTopicName).thenCompose(bool3 -> {
                                if (isDurable && !bool3.booleanValue() && !topic.getSubscriptions().containsKey(subscription) && topic.isPersistent()) {
                                    return FutureUtil.failedFuture(new BrokerServiceException.SubscriptionNotFoundException("Subscription does not exist"));
                                }
                                SubscriptionOption build = SubscriptionOption.builder().cnx(this).subscriptionName(subscription).consumerId(consumerId).subType(subType).priorityLevel(priorityLevel).consumerName(consumerName).isDurable(isDurable).startMessageId(batchMessageIdImpl).metadata(metadataFromCommand).readCompacted(z).initialPosition(initialPosition).startMessageRollbackDurationSec(startMessageRollbackDurationSec).replicatedSubscriptionStateArg(z2).keySharedMeta(copyFrom).subscriptionProperties(propertiesMap).consumerEpoch(consumerEpoch).schemaType(schema == null ? null : schema.getType()).build();
                                return (schema == null || schema.getType() == SchemaType.AUTO_CONSUME) ? topic.subscribe(build) : topic.addSchemaIfIdleOrCheckCompatible(schema).thenCompose(r5 -> {
                                    return topic.subscribe(build);
                                });
                            });
                        }
                        log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", topic);
                        return FutureUtil.failedFuture(new BrokerServiceException.ConsumerBusyException("Topic reached max consumers limit"));
                    }).thenAccept(consumer -> {
                        if (!completableFuture.complete(consumer)) {
                            try {
                                consumer.close();
                                log.info("[{}] Cleared consumer created after timeout on client side {}", this.remoteAddress, consumer);
                            } catch (BrokerServiceException e) {
                                log.warn("[{}] Error closing consumer created after timeout on client side {}: {}", new Object[]{this.remoteAddress, consumer, e.getMessage()});
                            }
                            this.consumers.remove(consumerId, completableFuture);
                            return;
                        }
                        log.info("[{}] Created subscription on topic {} / {}", new Object[]{this.remoteAddress, validateTopicName, subscription});
                        this.commandSender.sendSuccessResponse(requestId);
                        if (this.brokerInterceptor != null) {
                            try {
                                this.brokerInterceptor.consumerCreated(this, consumer, metadataFromCommand);
                            } catch (Throwable th) {
                                log.error("Exception occur when intercept consumer created.", th);
                            }
                        }
                    }).exceptionally(th -> {
                        if (th.getCause() instanceof BrokerServiceException.ConsumerBusyException) {
                            if (log.isDebugEnabled()) {
                                log.debug("[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}", new Object[]{this.remoteAddress, validateTopicName, subscription, th.getCause().getMessage()});
                            }
                        } else if (th.getCause() instanceof BrokerServiceException) {
                            log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", new Object[]{this.remoteAddress, validateTopicName, subscription, Long.valueOf(consumerId), th.getCause().getMessage()});
                        } else {
                            log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", new Object[]{this.remoteAddress, validateTopicName, subscription, Long.valueOf(consumerId), th.getCause().getMessage(), th});
                        }
                        if (completableFuture.completeExceptionally(th)) {
                            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(th.getCause()), th.getCause().getMessage());
                        }
                        this.consumers.remove(consumerId, completableFuture);
                        return null;
                    });
                    return null;
                }
                if (!completableFuture2.isDone()) {
                    log.warn("[{}][{}][{}] Consumer with id is already present on the connection, consumerId={}", new Object[]{this.remoteAddress, validateTopicName, subscription, Long.valueOf(consumerId)});
                    this.commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady, "Consumer is already present on the connection");
                    return null;
                }
                if (completableFuture2.isCompletedExceptionally()) {
                    log.warn("[{}][{}][{}] A failed consumer with id is already present on the connection, consumerId={}", new Object[]{this.remoteAddress, validateTopicName, subscription, Long.valueOf(consumerId)});
                    this.commandSender.sendErrorResponse(requestId, getErrorCodeWithErrorLog(completableFuture2, true, String.format("A failed consumer with id is already present on the connection. consumerId: %s, remoteAddress: %s, subscription: %s", Long.valueOf(consumerId), this.remoteAddress, subscription)), "Consumer that failed is already present on the connection");
                    return null;
                }
                log.warn("[{}] Consumer with the same id is already created: consumerId={}, consumer={}", new Object[]{this.remoteAddress, Long.valueOf(consumerId), (Consumer) completableFuture2.getNow(null)});
                this.commandSender.sendSuccessResponse(requestId);
                return null;
            } catch (IllegalArgumentException e) {
                String message = e.getMessage();
                this.consumers.remove(consumerId, completableFuture);
                this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, message);
                return null;
            }
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            logAuthException(this.remoteAddress, "subscribe", getPrincipal(), Optional.of(validateTopicName), th);
            this.consumers.remove(consumerId, completableFuture);
            this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, th.getMessage());
            return null;
        });
    }

    private SchemaData getSchema(Schema schema) {
        return SchemaData.builder().data(schema.getSchemaData()).isDeleted(false).timestamp(System.currentTimeMillis()).user(Strings.nullToEmpty(this.originalPrincipal)).type(Commands.getSchemaType(schema.getType())).props((Map) schema.getPropertiesList().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).build();
    }

    protected void handleProducer(CommandProducer commandProducer) {
        Preconditions.checkArgument(this.state == State.Connected);
        long producerId = commandProducer.getProducerId();
        long requestId = commandProducer.getRequestId();
        String producerName = commandProducer.hasProducerName() ? commandProducer.getProducerName() : this.service.generateUniqueProducerName();
        long epoch = commandProducer.getEpoch();
        boolean isUserProvidedProducerName = commandProducer.isUserProvidedProducerName();
        boolean isEncrypted = commandProducer.isEncrypted();
        Map metadataFromCommand = CommandUtils.metadataFromCommand(commandProducer);
        SchemaData schema = commandProducer.hasSchema() ? getSchema(commandProducer.getSchema()) : null;
        ProducerAccessMode producerAccessMode = commandProducer.getProducerAccessMode();
        Optional of = commandProducer.hasTopicEpoch() ? Optional.of(Long.valueOf(commandProducer.getTopicEpoch())) : Optional.empty();
        boolean isTxnEnabled = commandProducer.isTxnEnabled();
        String initialSubscriptionName = commandProducer.hasInitialSubscriptionName() ? commandProducer.getInitialSubscriptionName() : null;
        boolean supportsPartialProducer = supportsPartialProducer();
        TopicName validateTopicName = validateTopicName(commandProducer.getTopic(), requestId, commandProducer);
        if (validateTopicName == null) {
            return;
        }
        CompletableFuture<Boolean> isTopicOperationAllowed = isTopicOperationAllowed(validateTopicName, TopicOperation.PRODUCE, this.authenticationData, this.originalAuthData);
        if (!Strings.isNullOrEmpty(initialSubscriptionName)) {
            isTopicOperationAllowed = isTopicOperationAllowed.thenCombine((CompletionStage) isTopicOperationAllowed(validateTopicName, initialSubscriptionName, TopicOperation.SUBSCRIBE), (bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
            });
        }
        isTopicOperationAllowed.thenApply(bool3 -> {
            if (!bool3.booleanValue()) {
                log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, "Client is not authorized to Produce", getPrincipal()});
                writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, "Client is not authorized to Produce"));
                return null;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Client is authorized to Produce with role {}", this.remoteAddress, getPrincipal());
            }
            CompletableFuture completableFuture = new CompletableFuture();
            CompletableFuture completableFuture2 = (CompletableFuture) this.producers.putIfAbsent(producerId, completableFuture);
            if (completableFuture2 == null) {
                if (log.isDebugEnabled()) {
                    Logger logger = log;
                    Object[] objArr = new Object[5];
                    objArr[0] = this.remoteAddress;
                    objArr[1] = validateTopicName;
                    objArr[2] = Long.valueOf(producerId);
                    objArr[3] = producerName;
                    objArr[4] = schema == null ? "absent" : "present";
                    logger.debug("[{}][{}] Creating producer. producerId={}, producerName={}, schema is {}", objArr);
                }
                this.service.getOrCreateTopic(validateTopicName.toString()).thenCompose(topic -> {
                    if (((AbstractTopic) topic).isProducersExceeded(producerName)) {
                        log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
                        return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException("Topic '" + validateTopicName.toString() + "' reached max producers limit"));
                    }
                    CompletableFuture<Void> allOf = CompletableFuture.allOf(topic.checkBacklogQuotaExceeded(producerName, BacklogQuota.BacklogQuotaType.destination_storage), topic.checkBacklogQuotaExceeded(producerName, BacklogQuota.BacklogQuotaType.message_age));
                    allOf.thenRun(() -> {
                        if ((!topic.isEncryptionRequired() && !this.encryptionRequireOnProducer) || isEncrypted || SystemTopicNames.isSystemTopic(validateTopicName)) {
                            disableTcpNoDelayIfNeeded(validateTopicName.toString(), producerName);
                            CompletableFuture<SchemaVersion> tryAddSchema = tryAddSchema(topic, schema);
                            tryAddSchema.exceptionally(th -> {
                                if (completableFuture.completeExceptionally(th)) {
                                    String message = th.getMessage();
                                    if (th.getCause() != null) {
                                        message = message + " caused by " + th.getCause();
                                    }
                                    this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(th), message);
                                }
                                log.error("Try add schema failed, remote address {}, topic {}, producerId {}", new Object[]{this.remoteAddress, validateTopicName, Long.valueOf(producerId), th});
                                this.producers.remove(producerId, completableFuture);
                                return null;
                            });
                            tryAddSchema.thenAccept(schemaVersion -> {
                                topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(r40 -> {
                                    ((Strings.isNullOrEmpty(initialSubscriptionName) || !topic.isPersistent() || topic.getSubscriptions().containsKey(initialSubscriptionName)) ? CompletableFuture.completedFuture(null) : this.service.isAllowAutoSubscriptionCreationAsync(validateTopicName).thenCompose(bool3 -> {
                                        return !bool3.booleanValue() ? CompletableFuture.failedFuture(new BrokerServiceException.NotAllowedException("Could not create the initial subscription due to the auto subscription creation is not allowed.")) : topic.createSubscription(initialSubscriptionName, CommandSubscribe.InitialPosition.Earliest, false, null);
                                    })).whenComplete((subscription, th2) -> {
                                        if (th2 == null) {
                                            buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted, metadataFromCommand, schemaVersion, epoch, isUserProvidedProducerName, validateTopicName, producerAccessMode, of, supportsPartialProducer, completableFuture);
                                            return;
                                        }
                                        Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th2);
                                        if (unwrapCompletionException instanceof BrokerServiceException.NotAllowedException) {
                                            log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", new Object[]{this.remoteAddress, unwrapCompletionException.getMessage(), initialSubscriptionName, validateTopicName});
                                            if (completableFuture.completeExceptionally(unwrapCompletionException)) {
                                                this.commandSender.sendErrorResponse(requestId, ServerError.NotAllowedError, unwrapCompletionException.getMessage());
                                            }
                                            this.producers.remove(producerId, completableFuture);
                                            return;
                                        }
                                        String str = "Failed to create the initial subscription: " + th2.getCause().getMessage();
                                        log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", new Object[]{this.remoteAddress, str, initialSubscriptionName, validateTopicName});
                                        if (completableFuture.completeExceptionally(th2)) {
                                            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(th2), str);
                                        }
                                        this.producers.remove(producerId, completableFuture);
                                    });
                                }).exceptionally(th2 -> {
                                    Throwable cause = th2.getCause();
                                    log.error("producerId {}, requestId {} : TransactionBuffer recover failed", new Object[]{Long.valueOf(producerId), Long.valueOf(requestId), th2});
                                    if (completableFuture.completeExceptionally(th2)) {
                                        this.commandSender.sendErrorResponse(requestId, BrokerServiceException.ServiceUnitNotReadyException.getClientErrorCode(cause), cause.getMessage());
                                    }
                                    this.producers.remove(producerId, completableFuture);
                                    return null;
                                });
                            });
                            return;
                        }
                        String format = String.format("Encryption is required in %s", validateTopicName);
                        log.warn("[{}] {}", this.remoteAddress, format);
                        if (completableFuture.completeExceptionally(new BrokerServiceException.ServerMetadataException(format))) {
                            this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, format);
                        }
                        this.producers.remove(producerId, completableFuture);
                    });
                    return allOf;
                }).exceptionally((Function<Throwable, ? extends U>) th -> {
                    Throwable cause = th.getCause();
                    if (!(cause instanceof BrokerServiceException.TopicBacklogQuotaExceededException)) {
                        if (cause instanceof NoSuchElementException) {
                            cause = new BrokerServiceException.TopicNotFoundException(String.format("Topic not found %s", validateTopicName.toString()));
                            log.warn("[{}] Failed to load topic {}, producerId={}: Topic not found", new Object[]{this.remoteAddress, validateTopicName, Long.valueOf(producerId)});
                        } else if (!Exceptions.areExceptionsPresentInChain(cause, new Class[]{BrokerServiceException.ServiceUnitNotReadyException.class, ManagedLedgerException.class})) {
                            log.error("[{}] Failed to create topic {}, producerId={}", new Object[]{this.remoteAddress, validateTopicName, Long.valueOf(producerId), th});
                        }
                        if (completableFuture.completeExceptionally(th)) {
                            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
                        }
                        this.producers.remove(producerId, completableFuture);
                        return null;
                    }
                    BrokerServiceException.TopicBacklogQuotaExceededException topicBacklogQuotaExceededException = (BrokerServiceException.TopicBacklogQuotaExceededException) cause;
                    IllegalStateException illegalStateException = new IllegalStateException(topicBacklogQuotaExceededException);
                    BacklogQuota.RetentionPolicy retentionPolicy = topicBacklogQuotaExceededException.getRetentionPolicy();
                    if (completableFuture.completeExceptionally(illegalStateException)) {
                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
                            this.commandSender.sendErrorResponse(requestId, ServerError.ProducerBlockedQuotaExceededError, illegalStateException.getMessage());
                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
                            this.commandSender.sendErrorResponse(requestId, ServerError.ProducerBlockedQuotaExceededException, illegalStateException.getMessage());
                        }
                    }
                    this.producers.remove(producerId, completableFuture);
                    return null;
                });
                return null;
            }
            if (!completableFuture2.isDone()) {
                log.warn("[{}][{}] Producer with id is already present on the connection, producerId={}", new Object[]{this.remoteAddress, validateTopicName, Long.valueOf(producerId)});
                this.commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady, "Producer is already present on the connection");
                return null;
            }
            if (!completableFuture2.isCompletedExceptionally()) {
                Producer producer = (Producer) completableFuture2.getNow(null);
                log.info("[{}] [{}] Producer with the same id is already created: producerId={}, producer={}", new Object[]{this.remoteAddress, validateTopicName, Long.valueOf(producerId), producer});
                this.commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(), producer.getSchemaVersion());
                return null;
            }
            log.warn("[{}][{}] Producer with id is failed to register present on the connection, producerId={}", new Object[]{this.remoteAddress, validateTopicName, Long.valueOf(producerId)});
            ServerError errorCode = getErrorCode(completableFuture2);
            this.producers.remove(producerId, completableFuture2);
            this.commandSender.sendErrorResponse(requestId, errorCode, "Producer is already failed to register present on the connection");
            return null;
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            logAuthException(this.remoteAddress, "producer", getPrincipal(), Optional.of(validateTopicName), th);
            this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, th.getMessage());
            return null;
        });
    }

    private void buildProducerAndAddTopic(Topic topic, long j, String str, long j2, boolean z, Map<String, String> map, SchemaVersion schemaVersion, long j3, boolean z2, TopicName topicName, ProducerAccessMode producerAccessMode, Optional<Long> optional, boolean z3, CompletableFuture<Producer> completableFuture) {
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        Producer producer = new Producer(topic, this, j, str, getPrincipal(), z, map, schemaVersion, j3, z2, producerAccessMode, optional, z3);
        topic.addProducer(producer, completableFuture2).thenAccept(optional2 -> {
            if (!isActive()) {
                producer.closeNow(true);
                log.info("[{}] Cleared producer created after connection was closed: {}", this.remoteAddress, producer);
                completableFuture.completeExceptionally(new IllegalStateException("Producer created after connection was closed"));
            } else {
                if (completableFuture.complete(producer)) {
                    log.info("[{}] Created new producer: {}", this.remoteAddress, producer);
                    this.commandSender.sendProducerSuccessResponse(j2, str, producer.getLastSequenceId(), producer.getSchemaVersion(), optional2, true);
                    if (this.brokerInterceptor != null) {
                        try {
                            this.brokerInterceptor.producerCreated(this, producer, map);
                            return;
                        } catch (Throwable th) {
                            log.error("Exception occur when intercept producer created.", th);
                            return;
                        }
                    }
                    return;
                }
                producer.closeNow(true);
                log.info("[{}] Cleared producer created after timeout on client side {}", this.remoteAddress, producer);
            }
            this.producers.remove(j, completableFuture);
        }).exceptionallyAsync(th -> {
            if (th.getCause() instanceof BrokerServiceException.TopicMigratedException) {
                Optional<ClusterData.ClusterUrl> migratedClusterUrl = PersistentTopic.getMigratedClusterUrl(this.service.getPulsar());
                if (!migratedClusterUrl.isPresent()) {
                    log.warn("[{}] failed producer because migration url not configured topic {}: producerId={}, {}", new Object[]{this.remoteAddress, topicName, Long.valueOf(j), th.getCause().getMessage()});
                } else {
                    if (!topic.isReplicationBacklogExist()) {
                        log.info("[{}] redirect migrated producer to topic {}: producerId={}, producerName = {}, {}", new Object[]{this.remoteAddress, topicName, Long.valueOf(j), str, th.getCause().getMessage()});
                        if (!this.commandSender.sendTopicMigrated(CommandTopicMigrated.ResourceType.Producer, j, migratedClusterUrl.get().getBrokerServiceUrl(), migratedClusterUrl.get().getBrokerServiceUrlTls())) {
                            log.info("client doesn't support topic migration handling {}-{}-{}", new Object[]{topic, this.remoteAddress, Long.valueOf(j)});
                        }
                        closeProducer(producer);
                        return null;
                    }
                    log.info("Topic {} is migrated but replication backlog exist: producerId = {}, producerName = {}, {}", new Object[]{topicName, Long.valueOf(j), str, th.getCause().getMessage()});
                }
            } else if (!(th.getCause() instanceof BrokerServiceException.ProducerFencedException)) {
                log.warn("[{}] Failed to add producer to topic {}: producerId={}, {}", new Object[]{this.remoteAddress, topicName, Long.valueOf(j), th.getCause().getMessage()});
            } else if (log.isDebugEnabled()) {
                log.debug("[{}] Failed to add producer to topic {}: producerId={}, {}", new Object[]{this.remoteAddress, topicName, Long.valueOf(j), th.getCause().getMessage()});
            }
            producer.closeNow(true);
            if (!completableFuture.completeExceptionally(th)) {
                return null;
            }
            this.commandSender.sendErrorResponse(j2, BrokerServiceException.getClientErrorCode(th), th.getMessage());
            return null;
        }, (Executor) this.ctx.executor());
        completableFuture2.thenRun(() -> {
            if (isActive()) {
                log.info("[{}] Producer is waiting in queue: {}", this.remoteAddress, producer);
                this.commandSender.sendProducerSuccessResponse(j2, str, producer.getLastSequenceId(), producer.getSchemaVersion(), Optional.empty(), false);
                if (this.brokerInterceptor != null) {
                    this.brokerInterceptor.producerCreated(this, producer, map);
                }
            }
        });
    }

    protected void handleSend(CommandSend commandSend, ByteBuf byteBuf) {
        Preconditions.checkArgument(this.state == State.Connected);
        CompletableFuture completableFuture = (CompletableFuture) this.producers.get(commandSend.getProducerId());
        if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
            if (!this.recentlyClosedProducers.containsKey(Long.valueOf(commandSend.getProducerId()))) {
                log.warn("[{}] Received message, but the producer is not ready : {}. Closing the connection.", this.remoteAddress, Long.valueOf(commandSend.getProducerId()));
                close();
                return;
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Received message, but the producer was recently closed : {}. Ignoring message.", this.remoteAddress, Long.valueOf(commandSend.getProducerId()));
                    return;
                }
                return;
            }
        }
        Producer producer = (Producer) completableFuture.getNow(null);
        if (log.isDebugEnabled()) {
            printSendCommandDebug(commandSend, byteBuf);
        }
        if (producer.isNonPersistentTopic()) {
            if (this.nonPersistentPendingMessages > this.maxNonPersistentPendingMessages) {
                long producerId = commandSend.getProducerId();
                long sequenceId = commandSend.getSequenceId();
                long highestSequenceId = commandSend.getHighestSequenceId();
                this.service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), () -> {
                    this.commandSender.sendSendReceiptResponse(producerId, sequenceId, highestSequenceId, -1L, -1L);
                });
                producer.recordMessageDrop(commandSend.getNumMessages());
                return;
            }
            this.nonPersistentPendingMessages++;
        }
        startSendOperation(producer, byteBuf.readableBytes(), commandSend.getNumMessages());
        if (commandSend.hasTxnidMostBits() && commandSend.hasTxnidLeastBits()) {
            producer.publishTxnMessage(new TxnID(commandSend.getTxnidMostBits(), commandSend.getTxnidLeastBits()), producer.getProducerId(), commandSend.getSequenceId(), commandSend.getHighestSequenceId(), byteBuf, commandSend.getNumMessages(), commandSend.isIsChunk(), commandSend.isMarker());
            return;
        }
        PositionImpl positionImpl = commandSend.hasMessageId() ? PositionImpl.get(commandSend.getMessageId().getLedgerId(), commandSend.getMessageId().getEntryId()) : null;
        if (!commandSend.hasHighestSequenceId() || commandSend.getSequenceId() > commandSend.getHighestSequenceId()) {
            producer.publishMessage(commandSend.getProducerId(), commandSend.getSequenceId(), byteBuf, commandSend.getNumMessages(), commandSend.isIsChunk(), commandSend.isMarker(), positionImpl);
        } else {
            producer.publishMessage(commandSend.getProducerId(), commandSend.getSequenceId(), commandSend.getHighestSequenceId(), byteBuf, commandSend.getNumMessages(), commandSend.isIsChunk(), commandSend.isMarker(), positionImpl);
        }
    }

    private void printSendCommandDebug(CommandSend commandSend, ByteBuf byteBuf) {
        byteBuf.markReaderIndex();
        MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(byteBuf);
        byteBuf.resetReaderIndex();
        if (log.isDebugEnabled()) {
            Logger logger = log;
            Object[] objArr = new Object[9];
            objArr[0] = this.remoteAddress;
            objArr[1] = Long.valueOf(commandSend.getProducerId());
            objArr[2] = Long.valueOf(commandSend.getSequenceId());
            objArr[3] = parseMessageMetadata.getProducerName();
            objArr[4] = Long.valueOf(parseMessageMetadata.getSequenceId());
            objArr[5] = Integer.valueOf(byteBuf.readableBytes());
            objArr[6] = parseMessageMetadata.hasPartitionKey() ? parseMessageMetadata.getPartitionKey() : null;
            objArr[7] = parseMessageMetadata.hasOrderingKey() ? parseMessageMetadata.getOrderingKey() : null;
            objArr[8] = Integer.valueOf(parseMessageMetadata.getUncompressedSize());
            logger.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}, partition key is: {}, ordering key is {}, uncompressedSize is {}", objArr);
        }
    }

    protected void handleAck(CommandAck commandAck) {
        Preconditions.checkArgument(this.state == State.Connected);
        CompletableFuture completableFuture = (CompletableFuture) this.consumers.get(commandAck.getConsumerId());
        boolean hasRequestId = commandAck.hasRequestId();
        long requestId = hasRequestId ? commandAck.getRequestId() : 0L;
        long consumerId = commandAck.getConsumerId();
        CommandAck copyFrom = this.brokerInterceptor != null ? new CommandAck().copyFrom(commandAck) : null;
        if (completableFuture != null && completableFuture.isDone() && !completableFuture.isCompletedExceptionally()) {
            Consumer consumer = (Consumer) completableFuture.getNow(null);
            consumer.messageAcked(commandAck).thenRun(() -> {
                if (hasRequestId) {
                    writeAndFlush(Commands.newAckResponse(requestId, (ServerError) null, (String) null, consumerId));
                }
                if (this.brokerInterceptor != null) {
                    try {
                        this.brokerInterceptor.messageAcked(this, consumer, copyFrom);
                    } catch (Throwable th) {
                        log.error("Exception occur when intercept message acked.", th);
                    }
                }
            }).exceptionally(th -> {
                if (!hasRequestId) {
                    return null;
                }
                writeAndFlush(Commands.newAckResponse(requestId, BrokerServiceException.getClientErrorCode(th), th.getMessage(), consumerId));
                return null;
            });
        } else if (log.isDebugEnabled()) {
            log.debug("Consumer future is not complete(not complete or error), but received command ack. so discard this command. consumerId: {}, cnx: {}, messageIdCount: {}", new Object[]{Long.valueOf(commandAck.getConsumerId()), toString(), Integer.valueOf(commandAck.getMessageIdsCount())});
        }
    }

    protected void handleFlow(CommandFlow commandFlow) {
        Preconditions.checkArgument(this.state == State.Connected);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received flow from consumer {} permits: {}", new Object[]{this.remoteAddress, Long.valueOf(commandFlow.getConsumerId()), Integer.valueOf(commandFlow.getMessagePermits())});
        }
        CompletableFuture completableFuture = (CompletableFuture) this.consumers.get(commandFlow.getConsumerId());
        if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
            return;
        }
        Consumer consumer = (Consumer) completableFuture.getNow(null);
        if (consumer != null) {
            consumer.flowPermits(commandFlow.getMessagePermits());
        } else {
            log.info("[{}] Couldn't find consumer {}", this.remoteAddress, Long.valueOf(commandFlow.getConsumerId()));
        }
    }

    protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessages commandRedeliverUnacknowledgedMessages) {
        Preconditions.checkArgument(this.state == State.Connected);
        if (log.isDebugEnabled()) {
            Logger logger = log;
            Object[] objArr = new Object[3];
            objArr[0] = this.remoteAddress;
            objArr[1] = Long.valueOf(commandRedeliverUnacknowledgedMessages.getConsumerId());
            objArr[2] = commandRedeliverUnacknowledgedMessages.hasConsumerEpoch() ? Long.valueOf(commandRedeliverUnacknowledgedMessages.getConsumerEpoch()) : null;
            logger.debug("[{}] redeliverUnacknowledged from consumer {}, consumerEpoch {}", objArr);
        }
        CompletableFuture completableFuture = (CompletableFuture) this.consumers.get(commandRedeliverUnacknowledgedMessages.getConsumerId());
        if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
            return;
        }
        Consumer consumer = (Consumer) completableFuture.getNow(null);
        if (commandRedeliverUnacknowledgedMessages.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
            consumer.redeliverUnacknowledgedMessages(commandRedeliverUnacknowledgedMessages.getMessageIdsList());
        } else if (commandRedeliverUnacknowledgedMessages.hasConsumerEpoch()) {
            consumer.redeliverUnacknowledgedMessages(commandRedeliverUnacknowledgedMessages.getConsumerEpoch());
        } else {
            consumer.redeliverUnacknowledgedMessages(-1L);
        }
    }

    protected void handleUnsubscribe(CommandUnsubscribe commandUnsubscribe) {
        Preconditions.checkArgument(this.state == State.Connected);
        CompletableFuture completableFuture = (CompletableFuture) this.consumers.get(commandUnsubscribe.getConsumerId());
        if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
            this.commandSender.sendErrorResponse(commandUnsubscribe.getRequestId(), ServerError.MetadataError, "Consumer not found");
        } else {
            ((Consumer) completableFuture.getNow(null)).doUnsubscribe(commandUnsubscribe.getRequestId());
        }
    }

    protected void handleSeek(CommandSeek commandSeek) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandSeek.getRequestId();
        CompletableFuture completableFuture = (CompletableFuture) this.consumers.get(commandSeek.getConsumerId());
        if (!commandSeek.hasMessageId() && !commandSeek.hasMessagePublishTime()) {
            this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Message id and message publish time were not present");
            return;
        }
        boolean z = (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) ? false : true;
        if (!z || !commandSeek.hasMessageId()) {
            if (!z || !commandSeek.hasMessagePublishTime()) {
                this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
                return;
            }
            Subscription subscription = ((Consumer) completableFuture.getNow(null)).getSubscription();
            long messagePublishTime = commandSeek.getMessagePublishTime();
            subscription.resetCursor(messagePublishTime).thenRun(() -> {
                log.info("[{}] [{}][{}] Reset subscription to publish time {}", new Object[]{this.remoteAddress, subscription.getTopic().getName(), subscription.getName(), Long.valueOf(messagePublishTime)});
                this.commandSender.sendSuccessResponse(requestId);
            }).exceptionally(th -> {
                log.warn("[{}][{}] Failed to reset subscription: {}", new Object[]{this.remoteAddress, subscription, th.getMessage(), th});
                this.commandSender.sendErrorResponse(requestId, ServerError.UnknownError, "Reset subscription to publish time error: " + th.getCause().getMessage());
                return null;
            });
            return;
        }
        Subscription subscription2 = ((Consumer) completableFuture.getNow(null)).getSubscription();
        MessageIdData messageId = commandSeek.getMessageId();
        long[] jArr = null;
        if (messageId.getAckSetsCount() > 0) {
            jArr = new long[messageId.getAckSetsCount()];
            for (int i = 0; i < jArr.length; i++) {
                jArr[i] = messageId.getAckSetAt(i);
            }
        }
        PositionImpl positionImpl = new PositionImpl(messageId.getLedgerId(), messageId.getEntryId(), jArr);
        subscription2.resetCursor((Position) positionImpl).thenRun(() -> {
            log.info("[{}] [{}][{}] Reset subscription to message id {}", new Object[]{this.remoteAddress, subscription2.getTopic().getName(), subscription2.getName(), positionImpl});
            this.commandSender.sendSuccessResponse(requestId);
        }).exceptionally(th2 -> {
            log.warn("[{}][{}] Failed to reset subscription: {}", new Object[]{this.remoteAddress, subscription2, th2.getMessage(), th2});
            this.commandSender.sendErrorResponse(requestId, ServerError.UnknownError, "Error when resetting subscription: " + th2.getCause().getMessage());
            return null;
        });
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return Objects.equals(ctx().channel().id(), ((ServerCnx) obj).ctx().channel().id());
    }

    public int hashCode() {
        return Objects.hash(ctx().channel().id());
    }

    protected void handleCloseProducer(CommandCloseProducer commandCloseProducer) {
        Preconditions.checkArgument(this.state == State.Connected);
        long producerId = commandCloseProducer.getProducerId();
        long requestId = commandCloseProducer.getRequestId();
        CompletableFuture completableFuture = (CompletableFuture) this.producers.get(producerId);
        if (completableFuture == null) {
            log.info("[{}] Producer {} was not registered on the connection", this.remoteAddress, Long.valueOf(producerId));
            writeAndFlush(Commands.newSuccess(requestId));
            return;
        }
        if (!completableFuture.isDone() && completableFuture.completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
            log.info("[{}] Closed producer before its creation was completed. producerId={}", this.remoteAddress, Long.valueOf(producerId));
            this.commandSender.sendSuccessResponse(requestId);
            this.producers.remove(producerId, completableFuture);
        } else if (completableFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed producer that already failed to be created. producerId={}", this.remoteAddress, Long.valueOf(producerId));
            this.commandSender.sendSuccessResponse(requestId);
            this.producers.remove(producerId, completableFuture);
        } else {
            Producer producer = (Producer) completableFuture.getNow(null);
            log.info("[{}][{}] Closing producer on cnx {}. producerId={}", new Object[]{producer.getTopic(), producer.getProducerName(), this.remoteAddress, Long.valueOf(producerId)});
            producer.close(true).thenAccept(r15 -> {
                log.info("[{}][{}] Closed producer on cnx {}. producerId={}", new Object[]{producer.getTopic(), producer.getProducerName(), this.remoteAddress, Long.valueOf(producerId)});
                this.commandSender.sendSuccessResponse(requestId);
                this.producers.remove(producerId, completableFuture);
                if (this.brokerInterceptor != null) {
                    this.brokerInterceptor.producerClosed(this, producer, producer.getMetadata());
                }
            });
        }
    }

    protected void handleCloseConsumer(CommandCloseConsumer commandCloseConsumer) {
        Preconditions.checkArgument(this.state == State.Connected);
        log.info("[{}] Closing consumer: consumerId={}", this.remoteAddress, Long.valueOf(commandCloseConsumer.getConsumerId()));
        long requestId = commandCloseConsumer.getRequestId();
        long consumerId = commandCloseConsumer.getConsumerId();
        CompletableFuture completableFuture = (CompletableFuture) this.consumers.get(consumerId);
        if (completableFuture == null) {
            log.info("[{}] Consumer was not registered on the connection: {}", Long.valueOf(consumerId), this.remoteAddress);
            writeAndFlush(Commands.newSuccess(requestId));
            return;
        }
        if (!completableFuture.isDone() && completableFuture.completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) {
            log.info("[{}] Closed consumer before its creation was completed. consumerId={}", this.remoteAddress, Long.valueOf(consumerId));
            this.commandSender.sendSuccessResponse(requestId);
            return;
        }
        if (completableFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed consumer that already failed to be created. consumerId={}", this.remoteAddress, Long.valueOf(consumerId));
            this.commandSender.sendSuccessResponse(requestId);
            return;
        }
        Consumer consumer = (Consumer) completableFuture.getNow(null);
        try {
            consumer.close();
            this.consumers.remove(consumerId, completableFuture);
            this.commandSender.sendSuccessResponse(requestId);
            log.info("[{}] Closed consumer, consumerId={}", this.remoteAddress, Long.valueOf(consumerId));
            if (this.brokerInterceptor != null) {
                this.brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata());
            }
        } catch (BrokerServiceException e) {
            log.warn("[{]] Error closing consumer {} : {}", new Object[]{this.remoteAddress, consumer, e});
            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
        }
    }

    protected void handleGetLastMessageId(CommandGetLastMessageId commandGetLastMessageId) {
        Preconditions.checkArgument(this.state == State.Connected);
        CompletableFuture completableFuture = (CompletableFuture) this.consumers.get(commandGetLastMessageId.getConsumerId());
        if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
            writeAndFlush(Commands.newError(commandGetLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found"));
            return;
        }
        Consumer consumer = (Consumer) completableFuture.getNow(null);
        long requestId = commandGetLastMessageId.getRequestId();
        Topic topic = consumer.getSubscription().getTopic();
        topic.checkIfTransactionBufferRecoverCompletely(true).thenCompose(r3 -> {
            return topic.getLastDispatchablePosition();
        }).thenApply((Function<? super U, ? extends U>) position -> {
            int partitionIndex = TopicName.getPartitionIndex(topic.getName());
            Position position = null;
            if (consumer.getSubscription() instanceof PersistentSubscription) {
                position = ((PersistentSubscription) consumer.getSubscription()).getCursor().getMarkDeletedPosition();
            }
            getLargestBatchIndexWhenPossible(topic, (PositionImpl) position, (PositionImpl) position, partitionIndex, requestId, consumer.getSubscription().getName(), consumer.readCompacted());
            return null;
        }).exceptionally(th -> {
            writeAndFlush(Commands.newError(commandGetLastMessageId.getRequestId(), ServerError.UnknownError, "Failed to recover Transaction Buffer."));
            return null;
        });
    }

    private void getLargestBatchIndexWhenPossible(Topic topic, PositionImpl positionImpl, PositionImpl positionImpl2, int i, long j, String str, boolean z) {
        PersistentTopic persistentTopic = (PersistentTopic) topic;
        ManagedLedgerImpl managedLedger = persistentTopic.getManagedLedger();
        Optional<Position> compactionHorizon = z ? persistentTopic.getCompactedTopic().getCompactionHorizon() : Optional.empty();
        if (positionImpl.getEntryId() == -1 || !managedLedger.ledgerExists(positionImpl.getLedgerId())) {
            if (compactionHorizon == null || !compactionHorizon.isPresent()) {
                writeAndFlush(Commands.newGetLastMessageIdResponse(j, -1L, -1L, i, -1, positionImpl2 != null ? positionImpl2.getLedgerId() : -1L, positionImpl2 != null ? positionImpl2.getEntryId() : -1L));
                return;
            } else {
                handleLastMessageIdFromCompactedLedger(persistentTopic, j, i, positionImpl2);
                return;
            }
        }
        if (compactionHorizon != null && compactionHorizon.isPresent() && positionImpl.compareTo(compactionHorizon.get()) <= 0) {
            handleLastMessageIdFromCompactedLedger(persistentTopic, j, i, positionImpl2);
            return;
        }
        final CompletableFuture completableFuture = new CompletableFuture();
        managedLedger.asyncReadEntry(positionImpl, new AsyncCallbacks.ReadEntryCallback() { // from class: org.apache.pulsar.broker.service.ServerCnx.3
            public void readEntryComplete(Entry entry, Object obj) {
                completableFuture.complete(entry);
            }

            public void readEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, (Object) null);
        completableFuture.thenApply(entry -> {
            MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(entry.getDataBuffer());
            int numMessagesInBatch = parseMessageMetadata.getNumMessagesInBatch();
            entry.release();
            return Integer.valueOf(parseMessageMetadata.hasNumMessagesInBatch() ? numMessagesInBatch : -1);
        }).whenComplete((num, th) -> {
            if (th == null) {
                int intValue = num.intValue() > 0 ? num.intValue() - 1 : -1;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", new Object[]{toString(), topic.getName(), str, positionImpl, Integer.valueOf(i)});
                }
                writeAndFlush(Commands.newGetLastMessageIdResponse(j, positionImpl.getLedgerId(), positionImpl.getEntryId(), i, intValue, positionImpl2 != null ? positionImpl2.getLedgerId() : -1L, positionImpl2 != null ? positionImpl2.getEntryId() : -1L));
                return;
            }
            if ((th.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) && z) {
                handleLastMessageIdFromCompactedLedger(persistentTopic, j, i, positionImpl2);
            } else {
                writeAndFlush(Commands.newError(j, ServerError.MetadataError, "Failed to get batch size for entry " + th.getMessage()));
            }
        });
    }

    private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTopic, long j, int i, PositionImpl positionImpl) {
        persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry -> {
            if (entry == null) {
                writeAndFlush(Commands.newGetLastMessageIdResponse(j, -1L, -1L, i, -1, positionImpl != null ? positionImpl.getLedgerId() : -1L, positionImpl != null ? positionImpl.getEntryId() : -1L));
                return;
            }
            try {
                ByteBuf dataBuffer = entry.getDataBuffer();
                try {
                    writeAndFlush(Commands.newGetLastMessageIdResponse(j, entry.getLedgerId(), entry.getEntryId(), i, calculateTheLastBatchIndexInBatch(Commands.parseMessageMetadata(dataBuffer), dataBuffer), positionImpl != null ? positionImpl.getLedgerId() : -1L, positionImpl != null ? positionImpl.getEntryId() : -1L));
                    entry.release();
                } catch (IOException e) {
                    writeAndFlush(Commands.newError(j, ServerError.MetadataError, "Failed to deserialize batched message from the last entry of the compacted Ledger: " + e.getMessage()));
                    entry.release();
                }
            } catch (Throwable th) {
                entry.release();
                throw th;
            }
        }).exceptionally(th -> {
            writeAndFlush(Commands.newError(j, ServerError.MetadataError, "Failed to read last entry of the compacted Ledger " + th.getCause().getMessage()));
            return null;
        });
    }

    private int calculateTheLastBatchIndexInBatch(MessageMetadata messageMetadata, ByteBuf byteBuf) throws IOException {
        int numMessagesInBatch = messageMetadata.getNumMessagesInBatch();
        if (numMessagesInBatch <= 1) {
            return -1;
        }
        if (messageMetadata.hasCompression()) {
            byteBuf = CompressionCodecProvider.getCompressionCodec(messageMetadata.getCompression()).decode(byteBuf, messageMetadata.getUncompressedSize());
            byteBuf.release();
        }
        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
        int i = -1;
        for (int i2 = 0; i2 < numMessagesInBatch; i2++) {
            Commands.deSerializeSingleMessageInBatch(byteBuf, singleMessageMetadata, i2, numMessagesInBatch).release();
            if (!singleMessageMetadata.isCompactedOut()) {
                i = i2;
            }
        }
        return i;
    }

    private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName, NamespaceOperation namespaceOperation) {
        if (this.service.isAuthorizationEnabled()) {
            return (this.originalPrincipal != null ? this.service.getAuthorizationService().allowNamespaceOperationAsync(namespaceName, namespaceOperation, this.originalPrincipal, this.originalAuthData) : CompletableFuture.completedFuture(true)).thenCombine((CompletionStage) this.service.getAuthorizationService().allowNamespaceOperationAsync(namespaceName, namespaceOperation, this.authRole, this.authenticationData), (bool, bool2) -> {
                if (!bool.booleanValue()) {
                    log.warn("OriginalRole {} is not authorized to perform operation {} on namespace {}", new Object[]{this.originalPrincipal, namespaceOperation, namespaceName});
                }
                if (!bool2.booleanValue()) {
                    log.warn("Role {} is not authorized to perform operation {} on namespace {}", new Object[]{this.authRole, namespaceOperation, namespaceName});
                }
                return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
            });
        }
        return CompletableFuture.completedFuture(true);
    }

    protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandGetTopicsOfNamespace.getRequestId();
        String namespace = commandGetTopicsOfNamespace.getNamespace();
        CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
        Optional ofNullable = Optional.ofNullable(commandGetTopicsOfNamespace.hasTopicsPattern() ? commandGetTopicsOfNamespace.getTopicsPattern() : null);
        Optional ofNullable2 = Optional.ofNullable(commandGetTopicsOfNamespace.hasTopicsHash() ? commandGetTopicsOfNamespace.getTopicsHash() : null);
        NamespaceName namespaceName = NamespaceName.get(namespace);
        Semaphore lookupRequestSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupRequestSemaphore.tryAcquire()) {
            isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(bool -> {
                if (bool.booleanValue()) {
                    getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName, mode).thenAccept(list -> {
                        boolean z = false;
                        List list = list;
                        if (this.enableSubscriptionPatternEvaluation && ofNullable.isPresent()) {
                            if (((String) ofNullable.get()).length() <= this.maxSubscriptionPatternLength) {
                                z = true;
                                list = TopicList.filterTopics(list, (String) ofNullable.get());
                            } else {
                                log.info("[{}] Subscription pattern provided [{}] was longer than maximum {}.", new Object[]{this.remoteAddress, ofNullable.get(), Integer.valueOf(this.maxSubscriptionPatternLength)});
                            }
                        }
                        String calculateHash = TopicList.calculateHash(list);
                        boolean z2 = ofNullable2.isPresent() && ((String) ofNullable2.get()).equals(calculateHash);
                        if (z2) {
                            list = Collections.emptyList();
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", new Object[]{this.remoteAddress, namespace, Long.valueOf(requestId), Integer.valueOf(list.size())});
                        }
                        this.commandSender.sendGetTopicsOfNamespaceResponse(list, calculateHash, z, !z2, requestId);
                        lookupRequestSemaphore.release();
                    }).exceptionally(th -> {
                        log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}", new Object[]{this.remoteAddress, namespace, Long.valueOf(requestId)});
                        this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(new BrokerServiceException.ServerMetadataException(th)), th.getMessage());
                        lookupRequestSemaphore.release();
                        return null;
                    });
                    return null;
                }
                log.warn("[{}] {} with role {} on namespace {}", new Object[]{this.remoteAddress, "Client is not authorized to GetTopicsOfNamespace", getPrincipal(), namespaceName});
                this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Client is not authorized to GetTopicsOfNamespace");
                lookupRequestSemaphore.release();
                return null;
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                logNamespaceNameAuthException(this.remoteAddress, "GetTopicsOfNamespace", getPrincipal(), Optional.of(namespaceName), th);
                this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Exception occurred while trying to authorize GetTopicsOfNamespace");
                lookupRequestSemaphore.release();
                return null;
            });
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Failed GetTopicsOfNamespace lookup due to too many lookup-requests {}", this.remoteAddress, namespaceName);
        }
        this.commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests, "Failed due to too many pending lookup requests");
    }

    protected void handleGetSchema(CommandGetSchema commandGetSchema) {
        Preconditions.checkArgument(this.state == State.Connected);
        if (log.isDebugEnabled()) {
            if (commandGetSchema.hasSchemaVersion()) {
                log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}", new Object[]{this.remoteAddress, new String(commandGetSchema.getSchemaVersion()), commandGetSchema.getTopic(), Long.valueOf(commandGetSchema.getRequestId())});
            } else {
                log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}", new Object[]{this.remoteAddress, null, commandGetSchema.getTopic(), Long.valueOf(commandGetSchema.getRequestId())});
            }
        }
        long requestId = commandGetSchema.getRequestId();
        SchemaVersion schemaVersion = SchemaVersion.Latest;
        if (commandGetSchema.hasSchemaVersion()) {
            if (commandGetSchema.getSchemaVersion().length == 0) {
                this.commandSender.sendGetSchemaErrorResponse(requestId, ServerError.IncompatibleSchema, "Empty schema version");
                return;
            }
            schemaVersion = this.schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
        }
        String topic = commandGetSchema.getTopic();
        try {
            String schemaName = TopicName.get(topic).getSchemaName();
            this.schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
                if (schemaAndMetadata == null) {
                    this.commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound, String.format("Topic not found or no-schema %s", topic));
                } else {
                    this.commandSender.sendGetSchemaResponse(requestId, SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version);
                }
            }).exceptionally(th -> {
                this.commandSender.sendGetSchemaErrorResponse(requestId, ServerError.UnknownError, th.getMessage());
                return null;
            });
        } catch (Throwable th2) {
            this.commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, th2.getMessage());
        }
    }

    protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCreateSchema) {
        Preconditions.checkArgument(this.state == State.Connected);
        if (log.isDebugEnabled()) {
            log.debug("Received CommandGetOrCreateSchema call from {}", this.remoteAddress);
        }
        long requestId = commandGetOrCreateSchema.getRequestId();
        String topic = commandGetOrCreateSchema.getTopic();
        SchemaData schema = getSchema(commandGetOrCreateSchema.getSchema());
        SchemaData schemaData = schema.getType() == SchemaType.NONE ? null : schema;
        this.service.getTopicIfExists(topic).thenAccept(optional -> {
            if (optional.isPresent()) {
                tryAddSchema((Topic) optional.get(), schemaData).exceptionally(th -> {
                    ServerError clientErrorCode = BrokerServiceException.getClientErrorCode(th);
                    String message = th.getMessage();
                    if (th.getCause() != null) {
                        message = message + " caused by " + th.getCause();
                    }
                    this.commandSender.sendGetOrCreateSchemaErrorResponse(requestId, clientErrorCode, message);
                    return null;
                }).thenAccept(schemaVersion -> {
                    this.commandSender.sendGetOrCreateSchemaResponse(requestId, schemaVersion);
                });
            } else {
                this.commandSender.sendGetOrCreateSchemaErrorResponse(requestId, ServerError.TopicNotFound, String.format("Topic not found %s", topic));
            }
        }).exceptionally(th -> {
            this.commandSender.sendGetOrCreateSchemaErrorResponse(requestId, BrokerServiceException.getClientErrorCode(th), th.getMessage());
            return null;
        });
    }

    protected void handleTcClientConnectRequest(CommandTcClientConnectRequest commandTcClientConnectRequest) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandTcClientConnectRequest.getRequestId();
        TransactionCoordinatorID transactionCoordinatorID = TransactionCoordinatorID.get(commandTcClientConnectRequest.getTcId());
        if (log.isDebugEnabled()) {
            log.debug("Receive tc client connect request {} to transaction meta store {} from {}.", new Object[]{Long.valueOf(requestId), transactionCoordinatorID, this.remoteAddress});
        }
        if (checkTransactionEnableAndSendError(requestId)) {
            this.service.pulsar().getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorID).thenAccept(r12 -> {
                if (log.isDebugEnabled()) {
                    log.debug("Handle tc client connect request {} to transaction meta store {} from {} success.", new Object[]{Long.valueOf(requestId), transactionCoordinatorID, this.remoteAddress});
                }
                this.commandSender.sendTcClientConnectResponse(requestId);
            }).exceptionally(th -> {
                log.error("Handle tc client connect request {} to transaction meta store {} from {} fail.", new Object[]{Long.valueOf(requestId), transactionCoordinatorID, this.remoteAddress, th.getCause()});
                this.commandSender.sendTcClientConnectResponse(requestId, BrokerServiceException.getClientErrorCode(th), th.getMessage());
                return null;
            });
        }
    }

    private boolean checkTransactionEnableAndSendError(long j) {
        if (this.service.getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
            return true;
        }
        BrokerServiceException.NotAllowedException notAllowedException = new BrokerServiceException.NotAllowedException("Transactions are not enabled.");
        this.commandSender.sendErrorResponse(j, BrokerServiceException.getClientErrorCode(notAllowedException), notAllowedException.getMessage());
        return false;
    }

    private Throwable handleTxnException(Throwable th, String str, long j) {
        Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
        if (unwrapCompletionException instanceof CoordinatorException.CoordinatorNotFoundException) {
            if (log.isDebugEnabled()) {
                log.debug("The Coordinator was not found for the request {}", str);
            }
            return unwrapCompletionException;
        }
        if (!(unwrapCompletionException instanceof ManagedLedgerException.ManagedLedgerFencedException)) {
            log.error("Send response error for {} request {}.", new Object[]{str, Long.valueOf(j), unwrapCompletionException});
            return unwrapCompletionException;
        }
        if (log.isDebugEnabled()) {
            log.debug("Throw a CoordinatorNotFoundException to client with the message got from a ManagedLedgerFencedException for the request {}", str);
        }
        return new CoordinatorException.CoordinatorNotFoundException(unwrapCompletionException.getMessage());
    }

    protected void handleNewTxn(CommandNewTxn commandNewTxn) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandNewTxn.getRequestId();
        TransactionCoordinatorID transactionCoordinatorID = TransactionCoordinatorID.get(commandNewTxn.getTcId());
        if (log.isDebugEnabled()) {
            log.debug("Receive new txn request {} to transaction meta store {} from {}.", new Object[]{Long.valueOf(requestId), transactionCoordinatorID, this.remoteAddress});
        }
        if (checkTransactionEnableAndSendError(requestId)) {
            TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
            transactionMetadataStoreService.newTransaction(transactionCoordinatorID, commandNewTxn.getTxnTtlSeconds(), getPrincipal()).whenComplete((txnID, th) -> {
                if (th == null) {
                    if (log.isDebugEnabled()) {
                        log.debug("Send response {} for new txn request {}", Long.valueOf(transactionCoordinatorID.getId()), Long.valueOf(requestId));
                    }
                    this.commandSender.sendNewTxnResponse(requestId, txnID, transactionCoordinatorID.getId());
                } else {
                    if (th instanceof CoordinatorException.ReachMaxActiveTxnException) {
                        log.warn("New txn op reach max active transactions! tcId : {}, requestId : {}", new Object[]{Long.valueOf(transactionCoordinatorID.getId()), Long.valueOf(requestId), th});
                        return;
                    }
                    Throwable handleTxnException = handleTxnException(th, BaseCommand.Type.NEW_TXN.name(), requestId);
                    this.commandSender.sendNewTxnErrorResponse(requestId, transactionCoordinatorID.getId(), BrokerServiceException.getClientErrorCode(handleTxnException), handleTxnException.getMessage());
                    transactionMetadataStoreService.handleOpFail(handleTxnException, transactionCoordinatorID);
                }
            });
        }
    }

    protected void handleAddPartitionToTxn(CommandAddPartitionToTxn commandAddPartitionToTxn) {
        Preconditions.checkArgument(this.state == State.Connected);
        TxnID txnID = new TxnID(commandAddPartitionToTxn.getTxnidMostBits(), commandAddPartitionToTxn.getTxnidLeastBits());
        TransactionCoordinatorID transactionCoordinatorID = TransactionCoordinatorID.get(commandAddPartitionToTxn.getTxnidMostBits());
        long requestId = commandAddPartitionToTxn.getRequestId();
        List partitionsList = commandAddPartitionToTxn.getPartitionsList();
        if (log.isDebugEnabled()) {
            partitionsList.forEach(str -> {
                log.debug("Receive add published partition to txn request {} from {} with txnId {}, topic: [{}]", new Object[]{Long.valueOf(requestId), this.remoteAddress, txnID, str});
            });
        }
        if (checkTransactionEnableAndSendError(requestId)) {
            TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
            verifyTxnOwnership(txnID).thenCompose(bool -> {
                return !bool.booleanValue() ? failedFutureTxnNotOwned(txnID) : transactionMetadataStoreService.addProducedPartitionToTxn(txnID, partitionsList);
            }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r16, th) -> {
                if (th == null) {
                    if (log.isDebugEnabled()) {
                        log.debug("Send response success for add published partition to txn request {}", Long.valueOf(requestId));
                    }
                    writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
                } else {
                    Throwable handleTxnException = handleTxnException(th, BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
                    writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(handleTxnException), handleTxnException.getMessage()));
                    transactionMetadataStoreService.handleOpFail(handleTxnException, transactionCoordinatorID);
                }
            });
        }
    }

    private CompletableFuture<Void> failedFutureTxnNotOwned(TxnID txnID) {
        String format = String.format("Client (%s) is neither the owner of the transaction %s nor a super user", getPrincipal(), txnID);
        log.warn("[{}] {}", this.remoteAddress, format);
        return CompletableFuture.failedFuture(new CoordinatorException.TransactionNotFoundException(format));
    }

    private CompletableFuture<Void> failedFutureTxnTcNotAllowed(TxnID txnID) {
        String format = String.format("TC client (%s) is not a super user, and is not allowed to operate on transaction %s", getPrincipal(), txnID);
        log.warn("[{}] {}", this.remoteAddress, format);
        return CompletableFuture.failedFuture(new CoordinatorException.TransactionNotFoundException(format));
    }

    protected void handleEndTxn(CommandEndTxn commandEndTxn) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandEndTxn.getRequestId();
        int value = commandEndTxn.getTxnAction().getValue();
        TxnID txnID = new TxnID(commandEndTxn.getTxnidMostBits(), commandEndTxn.getTxnidLeastBits());
        TransactionCoordinatorID transactionCoordinatorID = TransactionCoordinatorID.get(commandEndTxn.getTxnidMostBits());
        if (checkTransactionEnableAndSendError(requestId)) {
            TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
            verifyTxnOwnership(txnID).thenCompose(bool -> {
                return !bool.booleanValue() ? failedFutureTxnNotOwned(txnID) : transactionMetadataStoreService.endTransaction(txnID, value, false);
            }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r14, th) -> {
                if (th == null) {
                    this.commandSender.sendEndTxnResponse(requestId, txnID, value);
                    return;
                }
                Throwable handleTxnException = handleTxnException(th, BaseCommand.Type.END_TXN.name(), requestId);
                this.commandSender.sendEndTxnErrorResponse(requestId, txnID, BrokerServiceException.getClientErrorCode(handleTxnException), handleTxnException.getMessage());
                transactionMetadataStoreService.handleOpFail(handleTxnException, transactionCoordinatorID);
            });
        }
    }

    private CompletableFuture<Boolean> isSuperUser() {
        if (!$assertionsDisabled && !this.ctx.executor().inEventLoop()) {
            throw new AssertionError();
        }
        if (!this.service.isAuthenticationEnabled() || !this.service.isAuthorizationEnabled()) {
            return CompletableFuture.completedFuture(true);
        }
        CompletableFuture<Boolean> isSuperUser = this.service.getAuthorizationService().isSuperUser(this.authRole, this.authenticationData);
        if (this.originalPrincipal != null) {
            return this.service.getAuthorizationService().isSuperUser(this.originalPrincipal, this.originalAuthData != null ? this.originalAuthData : this.authenticationData).thenCombine((CompletionStage) isSuperUser, (bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
            });
        }
        return isSuperUser;
    }

    private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
        if ($assertionsDisabled || this.ctx.executor().inEventLoop()) {
            return this.service.pulsar().getTransactionMetadataStoreService().verifyTxnOwnership(txnID, getPrincipal()).thenComposeAsync(bool -> {
                return bool.booleanValue() ? CompletableFuture.completedFuture(true) : (this.service.isAuthenticationEnabled() && this.service.isAuthorizationEnabled()) ? isSuperUser() : CompletableFuture.completedFuture(false);
            }, (Executor) this.ctx.executor());
        }
        throw new AssertionError();
    }

    protected void handleEndTxnOnPartition(CommandEndTxnOnPartition commandEndTxnOnPartition) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandEndTxnOnPartition.getRequestId();
        String topic = commandEndTxnOnPartition.getTopic();
        int value = commandEndTxnOnPartition.getTxnAction().getValue();
        TxnID txnID = new TxnID(commandEndTxnOnPartition.getTxnidMostBits(), commandEndTxnOnPartition.getTxnidLeastBits());
        long txnidLeastBitsOfLowWatermark = commandEndTxnOnPartition.getTxnidLeastBitsOfLowWatermark();
        if (log.isDebugEnabled()) {
            log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, Integer.valueOf(value)});
        }
        this.service.getTopicIfExists(TopicName.get(topic).toString()).thenAcceptAsync(optional -> {
            if (optional.isPresent()) {
                isSuperUser().thenCompose(bool -> {
                    return !bool.booleanValue() ? failedFutureTxnTcNotAllowed(txnID) : ((Topic) optional.get()).endTxn(txnID, value, txnidLeastBitsOfLowWatermark);
                }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r16, th) -> {
                    if (th == null) {
                        writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
                        return;
                    }
                    Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                    log.error("handleEndTxnOnPartition fail!, topic {}, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf(value), unwrapCompletionException});
                    writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, BrokerServiceException.getClientErrorCode(unwrapCompletionException), unwrapCompletionException.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
                });
            } else {
                getBrokerService().getManagedLedgerFactory().asyncExists(TopicName.get(topic).getPersistenceNamingEncoding()).thenAccept(bool2 -> {
                    if (bool2.booleanValue()) {
                        log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf(value)});
                        writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, ServerError.ServiceNotReady, "The topic " + topic + " does not exist in broker.", txnID.getLeastSigBits(), txnID.getMostSigBits()));
                    } else {
                        log.warn("handleEndTxnOnPartition fail ! The topic {} has not been created, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf(value)});
                        writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
                    }
                }).exceptionally(th2 -> {
                    log.error("handleEndTxnOnPartition fail ! topic {}, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf(value), th2.getCause()});
                    writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, ServerError.ServiceNotReady, th2.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
                    return null;
                });
            }
        }, (Executor) this.ctx.executor()).exceptionally(th -> {
            log.error("handleEndTxnOnPartition fail ! topic {}, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf(value), th.getCause()});
            writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, ServerError.ServiceNotReady, th.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
            return null;
        });
    }

    protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription commandEndTxnOnSubscription) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandEndTxnOnSubscription.getRequestId();
        long txnidMostBits = commandEndTxnOnSubscription.getTxnidMostBits();
        long txnidLeastBits = commandEndTxnOnSubscription.getTxnidLeastBits();
        String topic = commandEndTxnOnSubscription.getSubscription().getTopic();
        String subscription = commandEndTxnOnSubscription.getSubscription().getSubscription();
        int value = commandEndTxnOnSubscription.getTxnAction().getValue();
        TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
        long txnidLeastBitsOfLowWatermark = commandEndTxnOnSubscription.getTxnidLeastBitsOfLowWatermark();
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", new Object[]{topic, subscription, new TxnID(txnidMostBits, txnidLeastBits), Integer.valueOf(value)});
        }
        this.service.getTopicIfExists(TopicName.get(topic).toString()).thenAcceptAsync(optional -> {
            if (!optional.isPresent()) {
                getBrokerService().getManagedLedgerFactory().asyncExists(TopicName.get(topic).getPersistenceNamingEncoding()).thenAccept(bool -> {
                    if (bool.booleanValue()) {
                        log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, subscription: {}, txnId: [{}], txnAction: [{}]", new Object[]{topic, subscription, txnID, TxnAction.valueOf(value)});
                        writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), ServerError.ServiceNotReady, "The topic " + topic + " does not exist in broker."));
                    } else {
                        log.warn("handleEndTxnOnSubscription fail ! The topic {} has not been created, subscription: {} txnId: [{}], txnAction: [{}]", new Object[]{topic, subscription, txnID, TxnAction.valueOf(value)});
                        writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
                    }
                }).exceptionally(th -> {
                    log.error("handleEndTxnOnSubscription fail ! topic {}, subscription: {}txnId: [{}], txnAction: [{}]", new Object[]{topic, subscription, txnID, TxnAction.valueOf(value), th.getCause()});
                    writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), ServerError.ServiceNotReady, th.getMessage()));
                    return null;
                });
                return;
            }
            Subscription subscription2 = ((Topic) optional.get()).getSubscription(subscription);
            if (subscription2 != null) {
                isSuperUser().thenCompose(bool2 -> {
                    return !bool2.booleanValue() ? failedFutureTxnTcNotAllowed(txnID) : subscription2.endTxn(txnidMostBits, txnidLeastBits, value, txnidLeastBitsOfLowWatermark);
                }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r21, th2) -> {
                    if (th2 == null) {
                        writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
                        return;
                    }
                    Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th2);
                    log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}txnId: [{}], txnAction: [{}]", new Object[]{topic, subscription, txnID, TxnAction.valueOf(value), unwrapCompletionException.getCause()});
                    writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits, BrokerServiceException.getClientErrorCode(unwrapCompletionException), "Handle end txn on subscription failed: " + unwrapCompletionException.getMessage()));
                });
            } else {
                log.warn("handleEndTxnOnSubscription fail! topic {} subscription {} does not exist. txnId: [{}], txnAction: [{}]", new Object[]{((Topic) optional.get()).getName(), subscription, txnID, TxnAction.valueOf(value)});
                writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
            }
        }, (Executor) this.ctx.executor()).exceptionally(th -> {
            log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}txnId: [{}], txnAction: [{}]", new Object[]{topic, subscription, txnID, TxnAction.valueOf(value), th.getCause()});
            writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits, ServerError.ServiceNotReady, "Handle end txn on subscription failed: " + th.getMessage()));
            return null;
        });
    }

    private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schemaData) {
        return schemaData != null ? topic.addSchema(schemaData) : topic.hasSchema().thenCompose(bool -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] {} configured with schema {}", new Object[]{this.remoteAddress, topic.getName(), bool});
            }
            CompletableFuture completableFuture = new CompletableFuture();
            if (bool.booleanValue() && (this.schemaValidationEnforced || topic.getSchemaValidationEnforced())) {
                completableFuture.completeExceptionally(new IncompatibleSchemaException("Producers cannot connect or send message without a schema to topics with a schema"));
            } else {
                completableFuture.complete(SchemaVersion.Empty);
            }
            return completableFuture;
        });
    }

    protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn commandAddSubscriptionToTxn) {
        Preconditions.checkArgument(this.state == State.Connected);
        TxnID txnID = new TxnID(commandAddSubscriptionToTxn.getTxnidMostBits(), commandAddSubscriptionToTxn.getTxnidLeastBits());
        long requestId = commandAddSubscriptionToTxn.getRequestId();
        ArrayList arrayList = new ArrayList();
        Iterator it = commandAddSubscriptionToTxn.getSubscriptionsList().iterator();
        while (it.hasNext()) {
            arrayList.add(new org.apache.pulsar.common.api.proto.Subscription().copyFrom((org.apache.pulsar.common.api.proto.Subscription) it.next()));
        }
        if (log.isDebugEnabled()) {
            log.debug("Receive add published partition to txn request {} from {} with txnId {}", new Object[]{Long.valueOf(requestId), this.remoteAddress, txnID});
        }
        TransactionCoordinatorID transactionCoordinatorID = TransactionCoordinatorID.get(commandAddSubscriptionToTxn.getTxnidMostBits());
        if (checkTransactionEnableAndSendError(requestId)) {
            TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
            verifyTxnOwnership(txnID).thenCompose(bool -> {
                return !bool.booleanValue() ? failedFutureTxnNotOwned(txnID) : transactionMetadataStoreService.addAckedPartitionToTxn(txnID, MLTransactionMetadataStore.subscriptionToTxnSubscription(arrayList));
            }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r16, th) -> {
                if (th == null) {
                    if (log.isDebugEnabled()) {
                        log.debug("Send response success for add published partition to txn request {}", Long.valueOf(requestId));
                    }
                    writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
                } else {
                    Throwable handleTxnException = handleTxnException(th, BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
                    writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(handleTxnException), handleTxnException.getMessage()));
                    transactionMetadataStoreService.handleOpFail(handleTxnException, transactionCoordinatorID);
                }
            });
        }
    }

    protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandWatchTopicList.getRequestId();
        long watcherId = commandWatchTopicList.getWatcherId();
        NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace());
        Pattern compile = Pattern.compile(commandWatchTopicList.hasTopicsPattern() ? commandWatchTopicList.getTopicsPattern() : ".*");
        String topicsHash = commandWatchTopicList.hasTopicsHash() ? commandWatchTopicList.getTopicsHash() : null;
        Semaphore lookupRequestSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupRequestSemaphore.tryAcquire()) {
            isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(bool -> {
                if (bool.booleanValue()) {
                    this.topicListService.handleWatchTopicList(namespaceName, watcherId, requestId, compile, topicsHash, lookupRequestSemaphore);
                    return null;
                }
                log.warn("[{}] {} with role {} on namespace {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to watchTopicList", getPrincipal(), namespaceName});
                this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Proxy Client is not authorized to watchTopicList");
                lookupRequestSemaphore.release();
                return null;
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                logNamespaceNameAuthException(this.remoteAddress, "watchTopicList", getPrincipal(), Optional.of(namespaceName), th);
                this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Exception occurred while trying to handle command WatchTopicList");
                lookupRequestSemaphore.release();
                return null;
            });
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Failed WatchTopicList due to too many lookup-requests {}", this.remoteAddress, namespaceName);
        }
        this.commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests, "Failed due to too many pending lookup requests");
    }

    protected void handleCommandWatchTopicListClose(CommandWatchTopicListClose commandWatchTopicListClose) {
        Preconditions.checkArgument(this.state == State.Connected);
        this.topicListService.handleWatchTopicListClose(commandWatchTopicListClose);
    }

    protected boolean isHandshakeCompleted() {
        return this.state == State.Connected;
    }

    public ChannelHandlerContext ctx() {
        return this.ctx;
    }

    protected void interceptCommand(BaseCommand baseCommand) throws InterceptException {
        if (this.brokerInterceptor != null) {
            this.brokerInterceptor.onPulsarCommand(baseCommand, this);
        }
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public void closeProducer(Producer producer) {
        if (!$assertionsDisabled && !this.ctx.executor().inEventLoop()) {
            throw new AssertionError();
        }
        safelyRemoveProducer(producer);
        if (getRemoteEndpointProtocolVersion() < ProtocolVersion.v5.getValue()) {
            close();
            return;
        }
        writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
        long epoch = producer.getEpoch();
        long producerId = producer.getProducerId();
        this.recentlyClosedProducers.put(Long.valueOf(producerId), Long.valueOf(epoch));
        this.ctx.executor().schedule(() -> {
            this.recentlyClosedProducers.remove(Long.valueOf(producerId), Long.valueOf(epoch));
        }, this.service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public void closeConsumer(Consumer consumer) {
        safelyRemoveConsumer(consumer);
        if (getRemoteEndpointProtocolVersion() >= ProtocolVersion.v5.getValue()) {
            writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), -1L));
        } else {
            close();
        }
    }

    protected void close() {
        if (this.ctx != null) {
            this.ctx.close();
        }
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public void removedConsumer(Consumer consumer) {
        safelyRemoveConsumer(consumer);
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public void removedProducer(Producer producer) {
        safelyRemoveProducer(producer);
    }

    private void safelyRemoveProducer(Producer producer) {
        long producerId = producer.getProducerId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed producer: producerId={}, producer={}", new Object[]{this.remoteAddress, Long.valueOf(producerId), producer});
        }
        CompletableFuture completableFuture = (CompletableFuture) this.producers.get(producerId);
        if (completableFuture != null) {
            completableFuture.whenComplete((producer2, th) -> {
                if (th != null || producer2 == producer) {
                    this.producers.remove(producerId, completableFuture);
                }
            });
        }
    }

    private void safelyRemoveConsumer(Consumer consumer) {
        long consumerId = consumer.consumerId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed consumer: consumerId={}, consumer={}", new Object[]{this.remoteAddress, Long.valueOf(consumerId), consumer});
        }
        CompletableFuture completableFuture = (CompletableFuture) this.consumers.get(consumerId);
        if (completableFuture != null) {
            completableFuture.whenComplete((consumer2, th) -> {
                if (th != null || consumer2 == consumer) {
                    this.consumers.remove(consumerId, completableFuture);
                }
            });
        }
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public boolean isActive() {
        return this.isActive;
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public boolean isWritable() {
        return this.ctx.channel().isWritable();
    }

    public void startSendOperation(Producer producer, int i, int i2) {
        boolean isPublishRateExceeded;
        if (this.preciseTopicPublishRateLimitingEnable) {
            if (producer.getTopic().isTopicPublishRateExceeded(i2, i)) {
                producer.getTopic().disableCnxAutoRead();
                return;
            }
            isPublishRateExceeded = producer.getTopic().isBrokerPublishRateExceeded();
        } else {
            if (producer.getTopic().isResourceGroupRateLimitingEnabled() && producer.getTopic().isResourceGroupPublishRateExceeded(i2, i)) {
                producer.getTopic().disableCnxAutoRead();
                return;
            }
            isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
        }
        int i3 = this.pendingSendRequest + 1;
        this.pendingSendRequest = i3;
        if (i3 == this.maxPendingSendRequests || isPublishRateExceeded) {
            disableCnxAutoRead();
            this.autoReadDisabledRateLimiting = isPublishRateExceeded;
            throttledConnections.inc();
        }
        if (((MutableLong) pendingBytesPerThread.get()).addAndGet(i) < this.maxPendingBytesPerThread || this.autoReadDisabledPublishBufferLimiting || this.maxPendingBytesPerThread <= 0) {
            return;
        }
        MutableInt mutableInt = new MutableInt();
        ((Set) cnxsPerThread.get()).forEach(serverCnx -> {
            if (!serverCnx.hasProducers() || serverCnx.autoReadDisabledPublishBufferLimiting) {
                return;
            }
            serverCnx.disableCnxAutoRead();
            serverCnx.autoReadDisabledPublishBufferLimiting = true;
            mutableInt.increment();
        });
        getBrokerService().pausedConnections(mutableInt.intValue());
    }

    private void recordRateLimitMetrics(ConcurrentLongHashMap<CompletableFuture<Producer>> concurrentLongHashMap) {
        concurrentLongHashMap.forEach((j, completableFuture) -> {
            Producer producer;
            if (completableFuture == null || !completableFuture.isDone() || (producer = (Producer) completableFuture.getNow(null)) == null || producer.getTopic() == null) {
                return;
            }
            producer.getTopic().increasePublishLimitedTimes();
        });
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public void completedSendOperation(boolean z, int i) {
        if (((MutableLong) pendingBytesPerThread.get()).addAndGet(-i) < this.resumeThresholdPendingBytesPerThread && this.autoReadDisabledPublishBufferLimiting) {
            MutableInt mutableInt = new MutableInt();
            ((Set) cnxsPerThread.get()).forEach(serverCnx -> {
                if (serverCnx.autoReadDisabledPublishBufferLimiting) {
                    serverCnx.autoReadDisabledPublishBufferLimiting = false;
                    serverCnx.enableCnxAutoRead();
                    mutableInt.increment();
                }
            });
            getBrokerService().resumedConnections(mutableInt.intValue());
        }
        int i2 = this.pendingSendRequest - 1;
        this.pendingSendRequest = i2;
        if (i2 == this.resumeReadsThreshold) {
            enableCnxAutoRead();
        }
        if (z) {
            this.nonPersistentPendingMessages--;
        }
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public void enableCnxAutoRead() {
        if (this.ctx == null || this.ctx.channel().config().isAutoRead() || this.autoReadDisabledRateLimiting || this.autoReadDisabledPublishBufferLimiting) {
            return;
        }
        this.ctx.channel().config().setAutoRead(true);
        throttledConnections.dec();
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public void disableCnxAutoRead() {
        if (this.ctx == null || !this.ctx.channel().config().isAutoRead()) {
            return;
        }
        this.ctx.channel().config().setAutoRead(false);
        recordRateLimitMetrics(this.producers);
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public void cancelPublishRateLimiting() {
        if (this.autoReadDisabledRateLimiting) {
            this.autoReadDisabledRateLimiting = false;
        }
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public void cancelPublishBufferLimiting() {
        if (this.autoReadDisabledPublishBufferLimiting) {
            this.autoReadDisabledPublishBufferLimiting = false;
            throttledConnectionsGlobal.dec();
        }
    }

    private <T> ServerError getErrorCode(CompletableFuture<T> completableFuture) {
        return getErrorCodeWithErrorLog(completableFuture, false, null);
    }

    private <T> ServerError getErrorCodeWithErrorLog(CompletableFuture<T> completableFuture, boolean z, String str) {
        ServerError serverError = ServerError.UnknownError;
        try {
            completableFuture.getNow(null);
        } catch (Exception e) {
            if (e.getCause() instanceof BrokerServiceException) {
                serverError = BrokerServiceException.getClientErrorCode(e.getCause());
            }
            if (z) {
                log.error(StringUtils.isNotBlank(str) ? str : "Unknown Error", e);
            }
        }
        return serverError;
    }

    private void disableTcpNoDelayIfNeeded(String str, String str2) {
        if (str2 == null || !str2.startsWith(this.replicatorPrefix)) {
            return;
        }
        try {
            if (((Boolean) this.ctx.channel().config().getOption(ChannelOption.TCP_NODELAY)).booleanValue()) {
                this.ctx.channel().config().setOption(ChannelOption.TCP_NODELAY, false);
            }
        } catch (Throwable th) {
            log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", new Object[]{str, str2, toString()});
        }
    }

    private TopicName validateTopicName(String str, long j, Object obj) {
        try {
            return TopicName.get(str);
        } catch (Throwable th) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed to parse topic name '{}'", new Object[]{this.remoteAddress, str, th});
            }
            if (obj instanceof CommandLookupTopic) {
                writeAndFlush(Commands.newLookupErrorResponse(ServerError.InvalidTopicName, "Invalid topic name: " + th.getMessage(), j));
                return null;
            }
            if (obj instanceof CommandPartitionedTopicMetadata) {
                writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.InvalidTopicName, "Invalid topic name: " + th.getMessage(), j));
                return null;
            }
            writeAndFlush(Commands.newError(j, ServerError.InvalidTopicName, "Invalid topic name: " + th.getMessage()));
            return null;
        }
    }

    public ByteBufPair newMessageAndIntercept(long j, long j2, long j3, int i, int i2, ByteBuf byteBuf, long[] jArr, String str, long j4) {
        BaseCommand newMessageCommand = Commands.newMessageCommand(j, j2, j3, i, i2, jArr, j4);
        ByteBufPair serializeCommandMessageWithSize = Commands.serializeCommandMessageWithSize(newMessageCommand, byteBuf);
        if (this.brokerInterceptor != null) {
            try {
                this.brokerInterceptor.onPulsarCommand(newMessageCommand, this);
                CompletableFuture completableFuture = (CompletableFuture) this.consumers.get(j);
                if (completableFuture != null && completableFuture.isDone() && !completableFuture.isCompletedExceptionally()) {
                    this.brokerInterceptor.messageDispatched(this, (Consumer) completableFuture.getNow(null), j2, j3, byteBuf);
                }
            } catch (Exception e) {
                log.error("Exception occur when intercept messages.", e);
            }
        }
        return serializeCommandMessageWithSize;
    }

    public State getState() {
        return this.state;
    }

    public SocketAddress getRemoteAddress() {
        return this.remoteAddress;
    }

    public String toString() {
        ChannelHandlerContext ctx = ctx();
        StringBuilder sb = new StringBuilder(166);
        if (ctx == null) {
            sb.append("[ctx: null]");
        } else {
            sb.append(ctx.channel().toString());
        }
        String clientSourceAddress = clientSourceAddress();
        sb.append(" [SR:").append(clientSourceAddress == null ? "-" : clientSourceAddress).append(", state:").append(this.state).append("]");
        return sb.toString();
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public BrokerService getBrokerService() {
        return this.service;
    }

    public String getRole() {
        return this.authRole;
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public Promise<Void> newPromise() {
        return this.ctx.newPromise();
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public HAProxyMessage getHAProxyMessage() {
        return this.proxyMessage;
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public boolean hasHAProxyMessage() {
        return this.proxyMessage != null;
    }

    boolean hasConsumer(long j) {
        return this.consumers.containsKey(j);
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public boolean isBatchMessageCompatibleVersion() {
        return getRemoteEndpointProtocolVersion() >= ProtocolVersion.v4.getValue();
    }

    boolean supportsAuthenticationRefresh() {
        return this.features != null && this.features.isSupportsAuthRefresh();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean supportBrokerMetadata() {
        return this.features != null && this.features.isSupportsBrokerEntryMetadata();
    }

    boolean supportsPartialProducer() {
        return this.features != null && this.features.isSupportsPartialProducer();
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public String getClientVersion() {
        return this.clientVersion;
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public String getProxyVersion() {
        return this.proxyVersion;
    }

    @VisibleForTesting
    void setAutoReadDisabledRateLimiting(boolean z) {
        this.autoReadDisabledRateLimiting = z;
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public boolean isPreciseDispatcherFlowControl() {
        return this.preciseDispatcherFlowControl;
    }

    public AuthenticationState getAuthState() {
        return this.authState;
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public AuthenticationDataSource getAuthenticationData() {
        return this.originalAuthData != null ? this.originalAuthData : this.authenticationData;
    }

    public String getPrincipal() {
        return this.originalPrincipal != null ? this.originalPrincipal : this.authRole;
    }

    public AuthenticationProvider getAuthenticationProvider() {
        return this.authenticationProvider;
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public String getAuthRole() {
        return this.authRole;
    }

    public String getAuthMethod() {
        return this.authMethod;
    }

    public ConcurrentLongHashMap<CompletableFuture<Consumer>> getConsumers() {
        return this.consumers;
    }

    public ConcurrentLongHashMap<CompletableFuture<Producer>> getProducers() {
        return this.producers;
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public PulsarCommandSender getCommandSender() {
        return this.commandSender;
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public void execute(Runnable runnable) {
        ctx().channel().eventLoop().execute(runnable);
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public String clientSourceAddress() {
        getAuthData();
        if (this.proxyMessage != null) {
            return this.proxyMessage.sourceAddress();
        }
        if (this.remoteAddress instanceof InetSocketAddress) {
            return ((InetSocketAddress) this.remoteAddress).getAddress().getHostAddress();
        }
        return null;
    }

    @Override // org.apache.pulsar.broker.service.TransportCnx
    public CompletableFuture<Boolean> checkConnectionLiveness() {
        return !isActive() ? CompletableFuture.completedFuture(false) : this.connectionLivenessCheckTimeoutMillis > 0 ? NettyFutureUtil.toCompletableFuture(this.ctx.executor().submit(() -> {
            if (!isActive()) {
                return CompletableFuture.completedFuture(false);
            }
            if (this.connectionCheckInProgress != null) {
                return this.connectionCheckInProgress;
            }
            CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
            this.connectionCheckInProgress = completableFuture;
            this.ctx.executor().schedule(() -> {
                if (!isActive()) {
                    completableFuture.complete(false);
                    return;
                }
                if (completableFuture.isDone()) {
                    return;
                }
                if (completableFuture == this.connectionCheckInProgress) {
                    log.warn("[{}] Connection check timed out. Closing connection.", toString());
                    this.ctx.close();
                } else {
                    log.error("[{}] Reached unexpected code block. Completing connection check.", toString());
                    completableFuture.complete(true);
                }
            }, this.connectionLivenessCheckTimeoutMillis, TimeUnit.MILLISECONDS);
            sendPing();
            return completableFuture;
        })).thenCompose(Function.identity()) : CompletableFuture.completedFuture((Boolean) null);
    }

    protected void messageReceived() {
        super.messageReceived();
        if (this.connectionCheckInProgress == null || this.connectionCheckInProgress.isDone()) {
            return;
        }
        this.connectionCheckInProgress.complete(true);
        this.connectionCheckInProgress = null;
    }

    private static void logAuthException(SocketAddress socketAddress, String str, String str2, Optional<TopicName> optional, Throwable th) {
        String str3 = (String) optional.map(topicName -> {
            return ", topic=" + topicName.toString();
        }).orElse("");
        WebApplicationException unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
        if (unwrapCompletionException instanceof AuthenticationException) {
            log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}", new Object[]{socketAddress, str, str2, str3, unwrapCompletionException.getMessage()});
        } else if ((unwrapCompletionException instanceof WebApplicationException) && unwrapCompletionException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
            log.info("[{}] Trying to authenticate for a topic which under a namespace not exists: operation={}, principal={}{}, reason: {}", new Object[]{socketAddress, str, str2, str3, unwrapCompletionException.getMessage()});
        } else {
            log.error("[{}] Error trying to authenticate: operation={}, principal={}{}", new Object[]{socketAddress, str, str2, str3, th});
        }
    }

    private static void logNamespaceNameAuthException(SocketAddress socketAddress, String str, String str2, Optional<NamespaceName> optional, Throwable th) {
        String str3 = (String) optional.map(namespaceName -> {
            return ", namespace=" + namespaceName.toString();
        }).orElse("");
        if (th instanceof AuthenticationException) {
            log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}", new Object[]{socketAddress, str, str2, str3, th.getMessage()});
        } else {
            log.error("[{}] Error trying to authenticate: operation={}, principal={}{}", new Object[]{socketAddress, str, str2, str3, th});
        }
    }

    public boolean hasProducers() {
        return !this.producers.isEmpty();
    }

    @VisibleForTesting
    protected String getOriginalPrincipal() {
        return this.originalPrincipal;
    }

    @VisibleForTesting
    protected AuthenticationDataSource getAuthData() {
        return this.authenticationData;
    }

    @VisibleForTesting
    protected AuthenticationDataSource getOriginalAuthData() {
        return this.originalAuthData;
    }

    @VisibleForTesting
    protected AuthenticationState getOriginalAuthState() {
        return this.originalAuthState;
    }

    @VisibleForTesting
    protected void setAuthRole(String str) {
        this.authRole = str;
    }

    static {
        $assertionsDisabled = !ServerCnx.class.desiredAssertionStatus();
        emptyKeySharedMeta = new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT);
        pendingBytesPerThread = new FastThreadLocal<MutableLong>() { // from class: org.apache.pulsar.broker.service.ServerCnx.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: initialValue, reason: merged with bridge method [inline-methods] */
            public MutableLong m288initialValue() throws Exception {
                return new MutableLong();
            }
        };
        cnxsPerThread = new FastThreadLocal<Set<ServerCnx>>() { // from class: org.apache.pulsar.broker.service.ServerCnx.2
            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: initialValue, reason: merged with bridge method [inline-methods] */
            public Set<ServerCnx> m289initialValue() throws Exception {
                return Collections.newSetFromMap(new IdentityHashMap());
            }
        };
        emptyArray = new byte[0];
        throttledConnections = Gauge.build().name("pulsar_broker_throttled_connections").help("Counter of connections throttled because of per-connection limit").register();
        throttledConnectionsGlobal = Gauge.build().name("pulsar_broker_throttled_connections_global_limit").help("Counter of connections throttled because of per-connection limit").register();
        log = LoggerFactory.getLogger(ServerCnx.class);
    }
}
