package org.apache.pulsar.broker.admin.v2;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Produces({"application/json"})
@Api(value = "/persistent", description = "Persistent topic admin apis", tags = {"persistent topic"})
@Path("/persistent")
/* loaded from: input_file:org/apache/pulsar/broker/admin/v2/PersistentTopics.class */
public class PersistentTopics extends PersistentTopicsBase {
    private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);

    @GET
    @ApiResponses({@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespacee"), @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"), @ApiResponse(code = 412, message = "Namespace name is not valid"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}")
    @ApiOperation(value = "Get the list of topics under a namespace.", response = String.class, responseContainer = "List")
    public void getList(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @QueryParam("bundle") @ApiParam(value = "Specify the bundle name", required = false) String str3) {
        try {
            validateNamespaceName(str, str2);
            asyncResponse.resume(internalGetList(Optional.ofNullable(str3)));
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"), @ApiResponse(code = 412, message = "Namespace name is not valid"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/partitioned")
    @ApiOperation(value = "Get the list of partitioned topics under a namespace.", response = String.class, responseContainer = "List")
    public List<String> getPartitionedTopicList(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2) {
        validateNamespaceName(str, str2);
        return internalGetPartitionedTopicList();
    }

    @GET
    @ApiResponses({@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/permissions")
    @ApiOperation(value = "Get permissions on a topic.", notes = "Retrieve the effective permissions for a topic. These permissions are defined by the permissions set at thenamespace level combined (union) with any eventual specific permission set on the topic.")
    public Map<String, Set<AuthAction>> getPermissionsOnTopic(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3) {
        validateTopicName(str, str2, str3);
        return internalGetPermissionsOnTopic();
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/permissions/{role}")
    @ApiOperation("Grant a new permission to a role on a single topic.")
    @POST
    public void grantPermissionsOnTopic(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("role") @ApiParam(value = "Client role to which grant permissions", required = true) String str4, @ApiParam(value = "Actions to be granted (produce,functions,consume)", allowableValues = "produce,functions,consume") Set<AuthAction> set) {
        validateTopicName(str, str2, str3);
        internalGrantPermissionsOnTopic(str4, set);
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"), @ApiResponse(code = 412, message = "Permissions are not set at the topic level"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/permissions/{role}")
    @DELETE
    @ApiOperation(value = "Revoke permissions on a topic.", notes = "Revoke permissions to a role on a single topic. If the permission was not set at the topiclevel, but rather at the namespace level, this operation will return an error (HTTP status code 412).")
    public void revokePermissionsOnTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("role") @ApiParam(value = "Client role to which grant permissions", required = true) String str4) {
        try {
            validateTopicName(str, str2, str3);
            internalRevokePermissionsOnTopic(asyncResponse, str4);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant does not exist"), @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code = 409, message = "Partitioned topic already exist"), @ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
    @PUT
    public void createPartitionedTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0") int i, @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean z) {
        try {
            validateNamespaceName(str, str2);
            validateGlobalNamespaceOwnership();
            validatePartitionedTopicName(str, str2, str3);
            validateTopicPolicyOperation(this.topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
            validateCreateTopic(this.topicName);
            internalCreatePartitionedTopic(asyncResponse, i, z);
        } catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{clientAppId(), this.topicName, e});
            resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant does not exist"), @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code = 409, message = "Partitioned topic already exist"), @ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/partitions")
    @Consumes({"application/vnd.partitioned-topic-metadata+json"})
    @ApiOperation(value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
    @PUT
    public void createPartitionedTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @ApiParam(value = "The metadata for the topic", required = true, type = "PartitionedTopicMetadata") PartitionedTopicMetadata partitionedTopicMetadata, @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean z) {
        try {
            validateNamespaceName(str, str2);
            validateGlobalNamespaceOwnership();
            validatePartitionedTopicName(str, str2, str3);
            validateTopicPolicyOperation(this.topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
            validateCreateTopic(this.topicName);
            internalCreatePartitionedTopic(asyncResponse, partitionedTopicMetadata.partitions, z, partitionedTopicMetadata.properties);
        } catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{clientAppId(), this.topicName, e});
            resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 409, message = "Partitioned topic already exist"), @ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}")
    @ApiOperation(value = "Create a non-partitioned topic.", notes = "This is the only REST endpoint from which non-partitioned topics could be created.")
    @PUT
    public void createNonPartitionedTopic(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @ApiParam("Key value pair properties for the topic metadata") Map<String, String> map) {
        validateNamespaceName(str, str2);
        validateGlobalNamespaceOwnership();
        validateTopicName(str, str2, str3);
        validateCreateTopic(this.topicName);
        internalCreateNonPartitionedTopic(z, map);
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/offloadPolicies")
    @ApiOperation("Get offload policies on a topic.")
    public void getOffloadPolicies(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetOffloadPolicies(z, z2);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getOffloadPolicies", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/offloadPolicies")
    @ApiOperation("Set offload policies on a topic.")
    @POST
    public void setOffloadPolicies(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @ApiParam("Offload policies for the specified topic") OffloadPoliciesImpl offloadPoliciesImpl) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r7 -> {
            return internalSetOffloadPolicies(offloadPoliciesImpl, z2);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setOffloadPolicies", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/offloadPolicies")
    @DELETE
    @ApiOperation("Delete offload policies on a topic.")
    public void removeOffloadPolicies(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r6 -> {
            return internalSetOffloadPolicies(null, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeOffloadPolicies", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
    @ApiOperation("Get max unacked messages per consumer config on a topic.")
    public void getMaxUnackedMessagesOnConsumer(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetMaxUnackedMessagesOnConsumer(z, z2);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
    @ApiOperation("Set max unacked messages per consumer config on a topic.")
    @POST
    public void setMaxUnackedMessagesOnConsumer(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @ApiParam("Max unacked messages on consumer policies for the specified topic") Integer num) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetMaxUnackedMessagesOnConsumer(num, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setMaxUnackedMessagesOnConsumer", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
    @DELETE
    @ApiOperation("Delete max unacked messages per consumer config on a topic.")
    public void deleteMaxUnackedMessagesOnConsumer(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r6 -> {
            return internalSetMaxUnackedMessagesOnConsumer(null, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("deleteMaxUnackedMessagesOnConsumer", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/deduplicationSnapshotInterval")
    @ApiOperation("Get deduplicationSnapshotInterval config on a topic.")
    public void getDeduplicationSnapshotInterval(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r6 -> {
            return getTopicPoliciesAsyncWithRetry(this.topicName, z);
        }).thenAccept((Consumer<? super U>) optional -> {
            asyncResponse.resume(((TopicPolicies) optional.orElseGet(TopicPolicies::new)).getDeduplicationSnapshotIntervalSeconds());
        }).exceptionally(th -> {
            handleTopicPolicyException("getDeduplicationSnapshotInterval", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/deduplicationSnapshotInterval")
    @ApiOperation("Set deduplicationSnapshotInterval config on a topic.")
    @POST
    public void setDeduplicationSnapshotInterval(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @ApiParam("Interval to take deduplication snapshot for the specified topic") Integer num, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetDeduplicationSnapshotInterval(num, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setDeduplicationSnapshotInterval", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/deduplicationSnapshotInterval")
    @DELETE
    @ApiOperation("Delete deduplicationSnapshotInterval config on a topic.")
    public void deleteDeduplicationSnapshotInterval(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r6 -> {
            return internalSetDeduplicationSnapshotInterval(null, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("deleteDeduplicationSnapshotInterval", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
    @ApiOperation("Get inactive topic policies on a topic.")
    public void getInactiveTopicPolicies(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetInactiveTopicPolicies(z, z2);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getInactiveTopicPolicies", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
    @ApiOperation("Set inactive topic policies on a topic.")
    @POST
    public void setInactiveTopicPolicies(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @ApiParam("inactive topic policies for the specified topic") InactiveTopicPolicies inactiveTopicPolicies) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r7 -> {
            return internalSetInactiveTopicPolicies(inactiveTopicPolicies, z2);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setInactiveTopicPolicies", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
    @DELETE
    @ApiOperation("Delete inactive topic policies on a topic.")
    public void deleteInactiveTopicPolicies(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r6 -> {
            return internalSetInactiveTopicPolicies(null, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("deleteInactiveTopicPolicies", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
    @ApiOperation("Get max unacked messages per subscription config on a topic.")
    public void getMaxUnackedMessagesOnSubscription(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetMaxUnackedMessagesOnSubscription(z, z2);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getMaxUnackedMessagesOnSubscription", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
    @ApiOperation("Set max unacked messages per subscription config on a topic.")
    @POST
    public void setMaxUnackedMessagesOnSubscription(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @ApiParam("Max unacked messages on subscription policies for the specified topic") Integer num) {
        validateTopicName(str, str2, str3);
        validateTopicPolicyOperation(this.topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetMaxUnackedMessagesOnSubscription(num, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setMaxUnackedMessagesOnSubscription", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
    @DELETE
    @ApiOperation("Delete max unacked messages per subscription config on a topic.")
    public void deleteMaxUnackedMessagesOnSubscription(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        validateTopicPolicyOperation(this.topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
        preValidation(z2).thenCompose(r6 -> {
            return internalSetMaxUnackedMessagesOnSubscription(null, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("deleteMaxUnackedMessagesOnSubscription", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
    @ApiOperation("Get delayed delivery messages config on a topic.")
    public void getDelayedDeliveryPolicies(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("applied") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetDelayedDeliveryPolicies(z2, z);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getDelayedDeliveryPolicies", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
    @ApiOperation("Set delayed delivery messages config on a topic.")
    @POST
    public void setDelayedDeliveryPolicies(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @ApiParam("Delayed delivery policies for the specified topic") DelayedDeliveryPolicies delayedDeliveryPolicies) {
        validateTopicName(str, str2, str3);
        validatePoliciesReadOnlyAccess();
        validateTopicPolicyOperation(this.topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetDelayedDeliveryPolicies(delayedDeliveryPolicies, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setDelayedDeliveryPolicies", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
    @DELETE
    @ApiOperation("Set delayed delivery messages config on a topic.")
    public void deleteDelayedDeliveryPolicies(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        validatePoliciesReadOnlyAccess();
        validateTopicPolicyOperation(this.topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
        preValidation(z2).thenCompose(r6 -> {
            return internalSetDelayedDeliveryPolicies(null, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("deleteDelayedDeliveryPolicies", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant does not exist"), @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code = 409, message = "Partitioned topic does not exist"), @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value = "Increment partitions of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic")
    @POST
    public void updatePartitionedTopic(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @QueryParam("force") @DefaultValue("false") boolean z3, @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0") int i) {
        validatePartitionedTopicName(str, str2, str3);
        validatePartitionedTopicMetadata(str, str2, str3);
        internalUpdatePartitionedTopic(i, z, z2, z3);
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant does not exist"), @ApiResponse(code = 409, message = "Partitioned topic does not exist"), @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/createMissedPartitions")
    @ApiOperation("Create missed partitions of an existing partitioned topic.")
    @POST
    public void createMissedPartitions(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3) {
        try {
            validatePartitionedTopicName(str, str2, str3);
            internalCreateMissedPartitions(asyncResponse);
        } catch (Exception e) {
            resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Partitioned topic does not exist"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation("Get partitioned topic metadata.")
    public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("checkAllowAutoCreation") @ApiParam("Is check configuration required to automatically create topic") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        return internalGetPartitionedMetadata(z, z2);
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Partitioned topic does not exist"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/partitions")
    @DELETE
    @ApiOperation(value = "Delete a partitioned topic.", notes = "It will also delete all the partitions of the topic if it exists.")
    public void deletePartitionedTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("force") @ApiParam(value = "Stop all producer/consumer/replicator and delete topic forcefully", defaultValue = "false", type = "boolean") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @QueryParam("deleteSchema") @ApiParam("Delete the topic's schema storage") @DefaultValue("false") boolean z3) {
        try {
            validatePartitionedTopicName(str, str2, str3);
            internalDeletePartitionedTopic(asyncResponse, z2, z, z3);
        } catch (WebApplicationException e) {
            asyncResponse.resume(e);
        } catch (Exception e2) {
            asyncResponse.resume(new RestException(e2));
        }
    }

    @ApiResponses({@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Topic name is not valid or can't find owner for topic"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/unload")
    @ApiOperation("Unload a topic")
    @PUT
    public void unloadTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalUnloadTopic(asyncResponse, z);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Topic has active producers/subscriptions"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}")
    @DELETE
    @ApiOperation(value = "Delete a topic.", notes = "The topic cannot be deleted if delete is not forcefully and there's any active subscription or producer connected to the it. Force delete ignores connected clients and deletes topic by explicitly closing them.")
    public void deleteTopic(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("force") @ApiParam(value = "Stop all producer/consumer/replicator and delete topic forcefully", defaultValue = "false", type = "boolean") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @QueryParam("deleteSchema") @ApiParam("Delete the topic's schema storage") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        internalDeleteTopic(z2, z, z3);
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/subscriptions")
    @ApiOperation("Get the list of persistent subscriptions for a given topic.")
    public void getSubscriptions(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalGetSubscriptions(asyncResponse, z);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("{tenant}/{namespace}/{topic}/stats")
    @ApiOperation("Get the stats for the topic.")
    /* renamed from: getStats */
    public TopicStats mo31getStats(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("getPreciseBacklog") @ApiParam("If return precise backlog or imprecise backlog") @DefaultValue("false") boolean z2, @QueryParam("subscriptionBacklogSize") @ApiParam("If return backlog size for each subscription, require locking on ledger so be careful not to use when there's heavy traffic.") @DefaultValue("false") boolean z3, @QueryParam("getEarliestTimeInBacklog") @ApiParam("If return time of the earliest message in backlog") @DefaultValue("false") boolean z4) {
        validateTopicName(str, str2, str3);
        return internalGetStats(z, z2, z3, z4);
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("{tenant}/{namespace}/{topic}/internalStats")
    @ApiOperation("Get the internal stats for the topic.")
    public PersistentTopicInternalStats getInternalStats(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("metadata") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        return internalGetInternalStats(z, z2);
    }

    @GET
    @ApiResponses({@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("{tenant}/{namespace}/{topic}/internal-info")
    @ApiOperation("Get the stored topic metadata.")
    public void getManagedLedgerInfo(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @Suspended AsyncResponse asyncResponse) {
        validateTopicName(str, str2, str3);
        internalGetManagedLedgerInfo(asyncResponse, z);
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("{tenant}/{namespace}/{topic}/partitioned-stats")
    @ApiOperation("Get the stats for the partitioned topic.")
    public void getPartitionedStats(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("perPartition") @ApiParam("Get per partition stats") @DefaultValue("true") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @QueryParam("getPreciseBacklog") @ApiParam("If return precise backlog or imprecise backlog") @DefaultValue("false") boolean z3, @QueryParam("subscriptionBacklogSize") @ApiParam("If return backlog size for each subscription, require locking on ledger so be careful not to use when there's heavy traffic.") @DefaultValue("false") boolean z4, @QueryParam("getEarliestTimeInBacklog") @ApiParam("If return the earliest time in backlog") @DefaultValue("false") boolean z5) {
        try {
            validatePartitionedTopicName(str, str2, str3);
            internalGetPartitionedStats(asyncResponse, z2, z, z3, z4, z5);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("{tenant}/{namespace}/{topic}/partitioned-internalStats")
    @ApiOperation(hidden = true, value = "Get the stats-internal for the partitioned topic.")
    public void getPartitionedStatsInternal(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalGetPartitionedStatsInternal(asyncResponse, z);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Subscription has active consumers"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}")
    @DELETE
    @ApiOperation(value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there are any active consumers attached to it. Force delete ignores connected consumers and deletes subscription by explicitly closing them.")
    public void deleteSubscription(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("subName") @ApiParam("Subscription to be deleted") String str4, @QueryParam("force") @ApiParam(value = "Disconnect and close all consumers and delete subscription forcefully", defaultValue = "false", type = "boolean") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        try {
            validateTopicName(str, str2, str3);
            validateTopicOwnership(this.topicName, z2);
            internalDeleteSubscription(asyncResponse, Codec.decode(str4), z2, z);
        } catch (WebApplicationException e) {
            asyncResponse.resume(e);
        } catch (Exception e2) {
            asyncResponse.resume(new RestException(e2));
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist"), @ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"), @ApiResponse(code = 412, message = "Can't find owner for topic"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/skip_all")
    @ApiOperation(value = "Skip all messages on a topic subscription.", notes = "Completely clears the backlog on the subscription.")
    @POST
    public void skipAllMessages(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("subName") @ApiParam("Name of subscription") String str4, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalSkipAllMessages(asyncResponse, Codec.decode(str4), z);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist"), @ApiResponse(code = 405, message = "Skipping messages on a partitioned topic is not allowed"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}")
    @ApiOperation("Skipping messages on a topic subscription.")
    @POST
    public void skipMessages(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("subName") @ApiParam("Name of subscription") String str4, @PathParam("numMessages") @ApiParam(value = "The number of messages to skip", defaultValue = "0") int i, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalSkipMessages(asyncResponse, Codec.decode(str4), i, z);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist"), @ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages/{expireTimeInSeconds}")
    @ApiOperation("Expiry messages on a topic subscription.")
    @POST
    public void expireTopicMessages(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("subName") @ApiParam("Subscription to be Expiry messages on") String str4, @PathParam("expireTimeInSeconds") @ApiParam(value = "Expires beyond the specified number of seconds", defaultValue = "0") int i, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalExpireMessagesByTimestamp(asyncResponse, Codec.decode(str4), i, z);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist"), @ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages")
    @ApiOperation("Expiry messages on a topic subscription.")
    @POST
    public void expireTopicMessages(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("subName") @ApiParam("Subscription to be Expiry messages on") String str4, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)") ResetCursorData resetCursorData) {
        try {
            validateTopicName(str, str2, str3);
            internalExpireMessagesByPosition(asyncResponse, Codec.decode(str4), z, new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(), resetCursorData.getPartitionIndex()), resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist"), @ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"), @ApiResponse(code = 412, message = "Can't find owner for topic"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/all_subscription/expireMessages/{expireTimeInSeconds}")
    @ApiOperation("Expiry messages on all subscriptions of topic.")
    @POST
    public void expireMessagesForAllSubscriptions(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("expireTimeInSeconds") @ApiParam(value = "Expires beyond the specified number of seconds", defaultValue = "0") int i, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalExpireMessagesForAllSubscriptions(asyncResponse, i, z);
        } catch (WebApplicationException e) {
            asyncResponse.resume(e);
        } catch (Exception e2) {
            asyncResponse.resume(new RestException(e2));
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 400, message = "Create subscription on non persistent topic is not supported"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), @ApiResponse(code = 405, message = "Not supported for partitioned topics"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subscriptionName}")
    @ApiOperation(value = "Create a subscription on the topic.", notes = "Creates a subscription on the topic at the specified message id")
    @PUT
    public void createSubscription(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("subscriptionName") @ApiParam(value = "Subscription to create position on", required = true) String str4, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @ApiParam(name = "messageId", value = "messageId where to create the subscription. It can be 'latest', 'earliest' or (ledgerId:entryId)", defaultValue = "latest", allowableValues = "latest, earliest, ledgerId:entryId") ResetCursorData resetCursorData, @QueryParam("replicated") @ApiParam("Is replicated required to perform this operation") boolean z2) {
        try {
            validateTopicName(str, str2, str3);
            if (!this.topicName.isPersistent()) {
                throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic can only be done through client");
            }
            internalCreateSubscription(asyncResponse, Codec.decode(str4), resetCursorData == null ? null : new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(), resetCursorData.getPartitionIndex()), z, z2);
        } catch (WebApplicationException e) {
            asyncResponse.resume(e);
        } catch (Exception e2) {
            asyncResponse.resume(new RestException(e2));
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), @ApiResponse(code = 405, message = "Method Not Allowed"), @ApiResponse(code = 412, message = "Failed to reset cursor on subscription or Unable to find position for timestamp specified"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor/{timestamp}")
    @ApiOperation(value = "Reset subscription to message position closest to absolute timestamp (in ms).", notes = "It fence cursor and disconnects all active consumers before reseting cursor.")
    @POST
    public void resetCursor(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("subName") @ApiParam(value = "Subscription to reset position on", required = true) String str4, @PathParam("timestamp") @ApiParam("the timestamp to reset back") long j, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalResetCursor(asyncResponse, Codec.decode(str4), j, z);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), @ApiResponse(code = 405, message = "Not supported for partitioned topics"), @ApiResponse(code = 412, message = "Unable to find position for position specified"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor")
    @ApiOperation(value = "Reset subscription to message position closest to given position.", notes = "It fence cursor and disconnects all active consumers before reseting cursor.")
    @POST
    public void resetCursorOnPosition(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("subName") @ApiParam(name = "subName", value = "Subscription to reset position on", required = true) String str4, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)") ResetCursorData resetCursorData) {
        try {
            validateTopicName(str, str2, str3);
            internalResetCursorOnPosition(asyncResponse, Codec.decode(str4), z, new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(), resetCursorData.getPartitionIndex()), resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
        } catch (Exception e) {
            resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist"), @ApiResponse(code = 405, message = "Skipping messages on a non-persistent topic is not allowed"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/position/{messagePosition}")
    @ApiOperation("Peek nth message on a topic subscription.")
    public Response peekNthMessage(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("subName") @ApiParam(name = "subName", value = "Subscribed message expired", required = true) String str4, @PathParam("messagePosition") @ApiParam(value = "The number of messages (default 1)", defaultValue = "1") int i, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3);
        return internalPeekNthMessage(Codec.decode(str4), i, z);
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic, the message position does not exist"), @ApiResponse(code = 405, message = "If given partitioned topic"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/examinemessage")
    @ApiOperation("Examine a specific message on a topic by position relative to the earliest or the latest message.")
    public Response examineMessage(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("initialPosition") @ApiParam(name = "initialPosition", value = "Relative start position to examine message.It can be 'latest' or 'earliest'", defaultValue = "latest", allowableValues = "latest, earliest") String str4, @QueryParam("messagePosition") @ApiParam(value = "The position of messages (default 1)", defaultValue = "1") long j, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3);
        return internalExamineMessage(str4, j, z);
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist"), @ApiResponse(code = 405, message = "Skipping messages on a non-persistent topic is not allowed"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}")
    @ApiOperation("Get message by its messageId.")
    public void getMessageById(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("ledgerId") @ApiParam(value = "The ledger id", required = true) long j, @PathParam("entryId") @ApiParam(value = "The entry id", required = true) long j2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalGetMessageById(asyncResponse, j, j2, z);
        } catch (WebApplicationException e) {
            asyncResponse.resume(e);
        } catch (Exception e2) {
            asyncResponse.resume(new RestException(e2));
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic is not non-partitioned and persistent"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/messageid/{timestamp}")
    @ApiOperation("Get message ID published at or just after this absolute timestamp (in ms).")
    public void getMessageIdByTimestamp(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("timestamp") @ApiParam(value = "Specify the timestamp", required = true) long j, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3);
        internalGetMessageIdByTimestamp(j, z).thenAccept(messageId -> {
            if (messageId == null) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Message ID not found"));
            } else {
                asyncResponse.resume(messageId);
            }
        }).exceptionally(th -> {
            log.error("[{}] Failed to get message ID by timestamp {} from {}", new Object[]{clientAppId(), Long.valueOf(j), this.topicName, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("{tenant}/{namespace}/{topic}/backlog")
    @ApiOperation("Get estimated backlog for offline topic.")
    public PersistentOfflineTopicStats getBacklog(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3);
        return internalGetBacklog(z);
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/backlogSize")
    @ApiOperation("Calculate backlog size by a message ID (in bytes).")
    @PUT
    public void getBacklogSizeByMessageId(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, MessageIdImpl messageIdImpl) {
        validateTopicName(str, str2, str3);
        internalGetBacklogSizeByMessageId(asyncResponse, messageIdImpl, z);
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic policy does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
    @Path("/{tenant}/{namespace}/{topic}/backlogQuotaMap")
    @ApiOperation("Get backlog quota map on a topic.")
    public void getBacklogQuotaMap(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @QueryParam("isGlobal") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z2).thenCompose(r7 -> {
            return internalGetBacklogQuota(z, z3);
        });
        asyncResponse.getClass();
        thenCompose.thenAccept((Consumer<? super U>) (v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getBacklogQuotaMap", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 412, message = "Specified backlog quota exceeds retention quota. Increase retention quota and retry request")})
    @Path("/{tenant}/{namespace}/{topic}/backlogQuota")
    @ApiOperation("Set a backlog quota for a topic.")
    @POST
    public void setBacklogQuota(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuotaImpl backlogQuotaImpl) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r9 -> {
            return internalSetBacklogQuota(backlogQuotaType, backlogQuotaImpl, z2);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setBacklogQuota", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/backlogQuota")
    @DELETE
    @ApiOperation("Remove a backlog quota policy from a topic.")
    public void removeBacklogQuota(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r8 -> {
            return internalSetBacklogQuota(backlogQuotaType, null, z2);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeBacklogQuota", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry")})
    @Path("/{tenant}/{namespace}/{topic}/replication")
    @ApiOperation("Get the replication clusters for a topic")
    public void getReplicationClusters(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r4 -> {
            return getTopicPoliciesAsyncWithRetry(this.topicName);
        }).thenAccept((Consumer<? super U>) optional -> {
            asyncResponse.resume(optional.map((v0) -> {
                return v0.getReplicationClustersSet();
            }).orElseGet(() -> {
                if (z) {
                    return getNamespacePolicies(this.namespaceName).replication_clusters;
                }
                return null;
            }));
        }).exceptionally(th -> {
            handleTopicPolicyException("getReplicationClusters", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 412, message = "Topic is not global or invalid cluster ids")})
    @Path("/{tenant}/{namespace}/{topic}/replication")
    @ApiOperation("Set the replication clusters for a topic.")
    @POST
    public void setReplicationClusters(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @ApiParam(value = "List of replication clusters", required = true) List<String> list) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r5 -> {
            return internalSetReplicationClusters(list);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setReplicationClusters", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/replication")
    @DELETE
    @ApiOperation("Remove the replication clusters from a topic.")
    public void removeReplicationClusters(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r3 -> {
            return internalRemoveReplicationClusters();
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeReplicationClusters", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry")})
    @Path("/{tenant}/{namespace}/{topic}/messageTTL")
    @ApiOperation("Get message TTL in seconds for a topic")
    public void getMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        preValidation(z3).thenCompose(r6 -> {
            return getTopicPoliciesAsyncWithRetry(this.topicName, z2);
        }).thenAccept((Consumer<? super U>) optional -> {
            asyncResponse.resume(optional.map((v0) -> {
                return v0.getMessageTTLInSeconds();
            }).orElseGet(() -> {
                if (!z) {
                    return null;
                }
                Integer num = getNamespacePolicies(this.namespaceName).message_ttl_in_seconds;
                return Integer.valueOf(num == null ? pulsar().getConfiguration().getTtlDurationDefaultInSeconds() : num.intValue());
            }));
        }).exceptionally(th -> {
            handleTopicPolicyException("getMessageTTL", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Not authenticate to perform the request or policy is read only"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry"), @ApiResponse(code = 412, message = "Invalid message TTL value")})
    @Path("/{tenant}/{namespace}/{topic}/messageTTL")
    @ApiOperation("Set message TTL in seconds for a topic")
    @POST
    public void setMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("messageTTL") @ApiParam(value = "TTL in seconds for the specified namespace", required = true) Integer num, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetMessageTTL(num, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setMessageTTL", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Not authenticate to perform the request or policy is read only"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry"), @ApiResponse(code = 412, message = "Invalid message TTL value")})
    @Path("/{tenant}/{namespace}/{topic}/messageTTL")
    @DELETE
    @ApiOperation("Remove message TTL in seconds for a topic")
    public void removeMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("authoritative") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r6 -> {
            return internalSetMessageTTL(null, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeMessageTTL", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
    @Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
    @ApiOperation("Get deduplication configuration of a topic.")
    public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetDeduplication(z, z2);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getDeduplication", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
    @Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
    @ApiOperation("Set deduplication enabled on a topic.")
    @POST
    public void setDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("authoritative") @DefaultValue("false") boolean z2, @ApiParam("DeduplicationEnabled policies for the specified topic") Boolean bool) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetDeduplication(bool, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setDeduplication", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
    @DELETE
    @ApiOperation("Remove deduplication configuration for specified topic.")
    public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("authoritative") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r6 -> {
            return internalSetDeduplication(null, z);
        }).thenRun(() -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeDeduplication", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/retention")
    @ApiOperation("Get retention configuration for specified topic.")
    public void getRetention(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("applied") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetRetention(z2, z);
        });
        asyncResponse.getClass();
        thenCompose.thenAccept((Consumer<? super U>) (v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getRetention", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota")})
    @Path("/{tenant}/{namespace}/{topic}/retention")
    @ApiOperation("Set retention configuration for specified topic.")
    @POST
    public void setRetention(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @ApiParam("Retention policies for the specified namespace") RetentionPolicies retentionPolicies) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r7 -> {
            return internalSetRetention(retentionPolicies, z2);
        }).thenRun(() -> {
            try {
                log.info("[{}] Successfully updated retention: namespace={}, topic={}, retention={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName(), jsonMapper().writeValueAsString(retentionPolicies)});
            } catch (JsonProcessingException e) {
            }
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setRetention", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota")})
    @Path("/{tenant}/{namespace}/{topic}/retention")
    @DELETE
    @ApiOperation("Remove retention configuration for specified topic.")
    public void removeRetention(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("authoritative") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalRemoveRetention(z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove retention: namespace={}, topic={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeRetention", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/persistence")
    @ApiOperation("Get configuration of persistence policies for specified topic.")
    public void getPersistence(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetPersistence(z, z2);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getPersistence", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 400, message = "Invalid persistence policies")})
    @Path("/{tenant}/{namespace}/{topic}/persistence")
    @ApiOperation("Set configuration of persistence policies for specified topic.")
    @POST
    public void setPersistence(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @ApiParam("Bookkeeper persistence policies for specified topic") PersistencePolicies persistencePolicies) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r7 -> {
            return internalSetPersistence(persistencePolicies, z2);
        }).thenRun(() -> {
            try {
                log.info("[{}] Successfully updated persistence policies: namespace={}, topic={}, persistencePolicies={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName(), jsonMapper().writeValueAsString(persistencePolicies)});
            } catch (JsonProcessingException e) {
            }
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setPersistence", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/persistence")
    @DELETE
    @ApiOperation("Remove configuration of persistence policies for specified topic.")
    public void removePersistence(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalRemovePersistence(z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove persistence policies: namespace={}, topic={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removePersistence", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
    @ApiOperation("Get maxSubscriptionsPerTopic config for specified topic.")
    public void getMaxSubscriptionsPerTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalGetMaxSubscriptionsPerTopic(z);
        }).thenAccept((Consumer<? super U>) optional -> {
            asyncResponse.resume(optional.isPresent() ? optional.get() : Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("getMaxSubscriptions", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Invalid value of maxSubscriptionsPerTopic")})
    @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
    @ApiOperation("Set maxSubscriptionsPerTopic config for specified topic.")
    @POST
    public void setMaxSubscriptionsPerTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @ApiParam("The max subscriptions of the topic") int i) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetMaxSubscriptionsPerTopic(Integer.valueOf(i), z);
        }).thenRun(() -> {
            log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}, maxSubscriptions={}, isGlobal={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName(), Integer.valueOf(i), Boolean.valueOf(z)});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setMaxSubscriptions", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
    @DELETE
    @ApiOperation("Remove maxSubscriptionsPerTopic config for specified topic.")
    public void removeMaxSubscriptionsPerTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r6 -> {
            return internalSetMaxSubscriptionsPerTopic(null, z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove maxSubscriptionsPerTopic: namespace={}, topic={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeMaxSubscriptions", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
    @ApiOperation("Get replicatorDispatchRate config for specified topic.")
    public void getReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("applied") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetReplicatorDispatchRate(z2, z);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getReplicatorDispatchRate", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Invalid value of replicatorDispatchRate")})
    @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
    @ApiOperation("Set replicatorDispatchRate config for specified topic.")
    @POST
    public void setReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @ApiParam("Replicator dispatch rate of the topic") DispatchRateImpl dispatchRateImpl) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetReplicatorDispatchRate(dispatchRateImpl, z);
        }).thenRun(() -> {
            log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}, replicatorDispatchRate={}, isGlobal={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName(), dispatchRateImpl, Boolean.valueOf(z)});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setReplicatorDispatchRate", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
    @DELETE
    @ApiOperation("Remove replicatorDispatchRate config for specified topic.")
    public void removeReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r6 -> {
            return internalSetReplicatorDispatchRate(null, z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeReplicatorDispatchRate", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/maxProducers")
    @ApiOperation("Get maxProducers config for specified topic.")
    public void getMaxProducers(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetMaxProducers(z, z2);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getMaxProducers", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Invalid value of maxProducers")})
    @Path("/{tenant}/{namespace}/{topic}/maxProducers")
    @ApiOperation("Set maxProducers config for specified topic.")
    @POST
    public void setMaxProducers(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @ApiParam("The max producers of the topic") int i) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r7 -> {
            return internalSetMaxProducers(Integer.valueOf(i), z2);
        }).thenRun(() -> {
            log.info("[{}] Successfully updated max producers: namespace={}, topic={}, maxProducers={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName(), Integer.valueOf(i)});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setMaxProducers", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/maxProducers")
    @DELETE
    @ApiOperation("Remove maxProducers config for specified topic.")
    public void removeMaxProducers(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalRemoveMaxProducers(z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove max producers: namespace={}, topic={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeMaxProducers", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/maxConsumers")
    @ApiOperation("Get maxConsumers config for specified topic.")
    public void getMaxConsumers(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("applied") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetMaxConsumers(z2, z);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getMaxConsumers", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Invalid value of maxConsumers")})
    @Path("/{tenant}/{namespace}/{topic}/maxConsumers")
    @ApiOperation("Set maxConsumers config for specified topic.")
    @POST
    public void setMaxConsumers(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @ApiParam("The max consumers of the topic") int i) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetMaxConsumers(Integer.valueOf(i), z);
        }).thenRun(() -> {
            log.info("[{}] Successfully updated max consumers: namespace={}, topic={}, maxConsumers={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName(), Integer.valueOf(i)});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setMaxConsumers", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/maxConsumers")
    @DELETE
    @ApiOperation("Remove maxConsumers config for specified topic.")
    public void removeMaxConsumers(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalRemoveMaxConsumers(z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove max consumers: namespace={}, topic={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeMaxConsumers", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
    @ApiOperation("Get maxMessageSize config for specified topic.")
    public void getMaxMessageSize(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalGetMaxMessageSize(z);
        }).thenAccept((Consumer<? super U>) optional -> {
            asyncResponse.resume(optional.isPresent() ? optional.get() : Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("getMaxMessageSize", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Invalid value of maxConsumers")})
    @Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
    @ApiOperation("Set maxMessageSize config for specified topic.")
    @POST
    public void setMaxMessageSize(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @ApiParam("The max message size of the topic") int i) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetMaxMessageSize(Integer.valueOf(i), z);
        }).thenRun(() -> {
            log.info("[{}] Successfully set max message size: namespace={}, topic={}, maxMessageSiz={}, isGlobal={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName(), Integer.valueOf(i), Boolean.valueOf(z)});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setMaxMessageSize", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
    @DELETE
    @ApiOperation("Remove maxMessageSize config for specified topic.")
    public void removeMaxMessageSize(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r6 -> {
            return internalSetMaxMessageSize(null, z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove max message size: namespace={}, topic={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeMaxMessageSize", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Termination of a partitioned topic is not allowed"), @ApiResponse(code = 406, message = "Need to provide a persistent topic name"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/terminate")
    @ApiOperation("Terminate a topic. A topic that is terminated will not accept any more messages to be published and will let consumer to drain existing messages in backlog")
    @POST
    public MessageId terminate(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        validatePersistentTopicName(str, str2, str3);
        return internalTerminate(z);
    }

    @ApiResponses({@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Termination of a non-partitioned topic is not allowed"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/terminate/partitions")
    @ApiOperation("Terminate all partitioned topic. A topic that is terminated will not accept any more messages to be published and will let consumer to drain existing messages in backlog")
    @POST
    public void terminatePartitionedTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3);
        internalTerminatePartitionedTopic(asyncResponse, z);
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"), @ApiResponse(code = 409, message = "Compaction already running"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/compaction")
    @ApiOperation("Trigger a compaction operation on a topic.")
    @PUT
    public void compact(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalTriggerCompaction(asyncResponse, z);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run"), @ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/compaction")
    @ApiOperation("Get the status of a compaction operation for a topic.")
    public LongRunningProcessStatus compactionStatus(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3);
        return internalCompactionStatus(z);
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 400, message = "Message ID is null"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"), @ApiResponse(code = 409, message = "Offload already running"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/offload")
    @ApiOperation("Offload a prefix of a topic to long term storage")
    @PUT
    public void triggerOffload(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, MessageIdImpl messageIdImpl) {
        try {
            if (messageIdImpl == null) {
                throw new RestException(Response.Status.BAD_REQUEST, "messageId is null");
            }
            validateTopicName(str, str2, str3);
            internalTriggerOffload(asyncResponse, z, messageIdImpl);
        } catch (WebApplicationException e) {
            asyncResponse.resume(e);
        } catch (Exception e2) {
            asyncResponse.resume(new RestException(e2));
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/offload")
    @ApiOperation("Offload a prefix of a topic to long term storage")
    public void offloadStatus(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalOffloadStatus(asyncResponse, z);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/lastMessageId")
    @ApiOperation("Return the last commit message id of topic")
    public void getLastMessageId(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalGetLastMessageId(asyncResponse, z);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/dispatchRate")
    @ApiOperation("Get dispatch rate configuration for specified topic.")
    public void getDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetDispatchRate(z, z2);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getDispatchRate", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/dispatchRate")
    @ApiOperation("Set message dispatch rate configuration for specified topic.")
    @POST
    public void setDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @ApiParam("Dispatch rate for the specified topic") DispatchRateImpl dispatchRateImpl) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r7 -> {
            return internalSetDispatchRate(dispatchRateImpl, z2);
        }).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName(), jsonMapper().writeValueAsString(dispatchRateImpl)});
            } catch (JsonProcessingException e) {
            }
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setDispatchRate", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/dispatchRate")
    @DELETE
    @ApiOperation("Remove message dispatch rate configuration for specified topic.")
    public void removeDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalRemoveDispatchRate(z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove topic dispatch rate: tenant={}, namespace={}, topic={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeDispatchRate", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
    @ApiOperation("Get subscription message dispatch rate configuration for specified topic.")
    public void getSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetSubscriptionDispatchRate(z, z2);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getSubscriptionDispatchRate", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
    @ApiOperation("Set subscription message dispatch rate configuration for specified topic.")
    @POST
    public void setSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @ApiParam("Subscription message dispatch rate for the specified topic") DispatchRateImpl dispatchRateImpl) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r7 -> {
            return internalSetSubscriptionDispatchRate(dispatchRateImpl, z2);
        }).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic subscription dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName(), jsonMapper().writeValueAsString(dispatchRateImpl)});
            } catch (JsonProcessingException e) {
            }
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setSubscriptionDispatchRate", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
    @DELETE
    @ApiOperation("Remove subscription message dispatch rate configuration for specified topic.")
    public void removeSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalRemoveSubscriptionDispatchRate(z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeSubscriptionDispatchRate", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
    @ApiOperation("Get compaction threshold configuration for specified topic.")
    public void getCompactionThreshold(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetCompactionThreshold(z, z2);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getCompactionThreshold", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
    @ApiOperation("Set compaction threshold configuration for specified topic.")
    @POST
    public void setCompactionThreshold(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @ApiParam("Dispatch rate for the specified topic") long j) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r8 -> {
            return internalSetCompactionThreshold(Long.valueOf(j), z2);
        }).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic compaction threshold: tenant={}, namespace={}, topic={}, compactionThreshold={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName(), jsonMapper().writeValueAsString(Long.valueOf(j))});
            } catch (JsonProcessingException e) {
            }
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setCompactionThreshold", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
    @DELETE
    @ApiOperation("Remove compaction threshold configuration for specified topic.")
    public void removeCompactionThreshold(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalRemoveCompactionThreshold(z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeCompactionThreshold", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
    @ApiOperation("Get max consumers per subscription configuration for specified topic.")
    public void getMaxConsumersPerSubscription(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalGetMaxConsumersPerSubscription(z);
        }).thenAccept((Consumer<? super U>) optional -> {
            asyncResponse.resume(optional.isPresent() ? optional.get() : Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("getMaxConsumersPerSubscription", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
    @ApiOperation("Set max consumers per subscription configuration for specified topic.")
    @POST
    public void setMaxConsumersPerSubscription(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @ApiParam("Dispatch rate for the specified topic") int i) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetMaxConsumersPerSubscription(Integer.valueOf(i), z);
        }).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic max consumers per subscription: tenant={}, namespace={}, topic={}, maxConsumersPerSubscription={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName(), jsonMapper().writeValueAsString(Integer.valueOf(i))});
            } catch (JsonProcessingException e) {
            }
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setMaxConsumersPerSubscription", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
    @DELETE
    @ApiOperation("Remove max consumers per subscription configuration for specified topic.")
    public void removeMaxConsumersPerSubscription(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalRemoveMaxConsumersPerSubscription(z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove topic max consumers per subscription: tenant={}, namespace={}, topic={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeMaxConsumersPerSubscription", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/publishRate")
    @ApiOperation("Get publish rate configuration for specified topic.")
    public void getPublishRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalGetPublishRate(z);
        }).thenAccept((Consumer<? super U>) optional -> {
            asyncResponse.resume(optional.isPresent() ? optional.get() : Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("getPublishRate", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/publishRate")
    @ApiOperation("Set message publish rate configuration for specified topic.")
    @POST
    public void setPublishRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @ApiParam("Dispatch rate for the specified topic") PublishRate publishRate) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetPublishRate(publishRate, z);
        }).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic publish rate: tenant={}, namespace={}, topic={}, isGlobal={}, publishRate={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName(), Boolean.valueOf(z), jsonMapper().writeValueAsString(publishRate)});
            } catch (JsonProcessingException e) {
            }
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setPublishRate", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/publishRate")
    @DELETE
    @ApiOperation("Remove message publish rate configuration for specified topic.")
    public void removePublishRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalRemovePublishRate(z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}, isGlobal={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName(), Boolean.valueOf(z)});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removePublishRate", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
    @ApiOperation("Get is enable sub type fors specified topic.")
    public void getSubscriptionTypesEnabled(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("authoritative") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalGetSubscriptionTypesEnabled(z);
        }).thenAccept((Consumer<? super U>) optional -> {
            asyncResponse.resume(optional.isPresent() ? optional.get() : Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("getSubscriptionTypesEnabled", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
    @ApiOperation("Set is enable sub types for specified topic")
    @POST
    public void setSubscriptionTypesEnabled(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("authoritative") @DefaultValue("false") boolean z2, @ApiParam("Enable sub types for the specified topic") Set<SubscriptionType> set) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetSubscriptionTypesEnabled(set, z);
        }).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic is enabled sub types : tenant={}, namespace={}, topic={}, subscriptionTypesEnabled={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName(), jsonMapper().writeValueAsString(set)});
            } catch (JsonProcessingException e) {
            }
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setSubscriptionTypesEnabled", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
    @DELETE
    @ApiOperation("Remove subscription types enabled for specified topic.")
    public void removeSubscriptionTypesEnabled(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("authoritative") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalRemoveSubscriptionTypesEnabled(z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove subscription types enabled: namespace={}, topic={}", new Object[]{clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeSubscriptionTypesEnabled", th, asyncResponse);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/subscribeRate")
    @ApiOperation("Get subscribe rate configuration for specified topic.")
    public void getSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetSubscribeRate(z, z2);
        });
        asyncResponse.getClass();
        thenCompose.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getSubscribeRate", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/subscribeRate")
    @ApiOperation("Set subscribe rate configuration for specified topic.")
    @POST
    public void setSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2, @ApiParam("Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetSubscribeRate(subscribeRate, z);
        }).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic subscribe rate: tenant={}, namespace={}, topic={}, isGlobal={} subscribeRate={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName(), Boolean.valueOf(z), jsonMapper().writeValueAsString(subscribeRate)});
            } catch (JsonProcessingException e) {
            }
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setSubscribeRate", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/subscribeRate")
    @DELETE
    @ApiOperation("Remove subscribe rate configuration for specified topic.")
    public void removeSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @DefaultValue("false") boolean z2, @ApiParam("Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalRemoveSubscribeRate(z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove topic subscribe rate: tenant={}, namespace={}, topic={}, isGlobal={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName(), Boolean.valueOf(z)});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeSubscribeRate", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/truncate")
    @DELETE
    @ApiOperation(value = "Truncate a topic.", notes = "The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers.")
    public void truncateTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3);
        internalTruncateTopic(asyncResponse, z);
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or subscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist"), @ApiResponse(code = 405, message = "Operation not allowed on this topic"), @ApiResponse(code = 412, message = "Can't find owner for topic"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus")
    @ApiOperation("Enable or disable a replicated subscription on a topic.")
    @POST
    public void setReplicatedSubscriptionStatus(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("subName") @ApiParam(value = "Name of subscription", required = true) String str4, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @ApiParam(value = "Whether to enable replicated subscription", required = true) boolean z2) {
        try {
            validateTopicName(str, str2, str3);
            internalSetReplicatedSubscriptionStatus(asyncResponse, Codec.decode(str4), z, z2);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 401, message = "Don't have permission to administrate resources"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Can't find owner for topic"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus")
    @ApiOperation("Get replicated subscription status on a topic.")
    public void getReplicatedSubscriptionStatus(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @PathParam("subName") @ApiParam(value = "Name of subscription", required = true) String str4, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3);
        internalGetReplicatedSubscriptionStatus(asyncResponse, Codec.decode(str4), z);
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), @ApiResponse(code = 404, message = "Topic does not exist")})
    @Path("/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy")
    @ApiOperation("Get schema compatibility strategy on a topic")
    public void getSchemaCompatibilityStrategy(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the cluster", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z2).thenCompose(r5 -> {
            return internalGetSchemaCompatibilityStrategy(z);
        });
        asyncResponse.getClass();
        thenCompose.thenAccept((Consumer<? super U>) (v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getSchemaCompatibilityStrategy", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), @ApiResponse(code = 404, message = "Topic does not exist")})
    @Path("/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy")
    @ApiOperation("Set schema compatibility strategy on a topic")
    @PUT
    public void setSchemaCompatibilityStrategy(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @ApiParam("Strategy used to check the compatibility of new schema") SchemaCompatibilityStrategy schemaCompatibilityStrategy) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r5 -> {
            return internalSetSchemaCompatibilityStrategy(schemaCompatibilityStrategy);
        }).thenRun(() -> {
            log.info("[{}] Successfully set topic schema compatibility strategy: tenant={}, namespace={}, topic={}, schemaCompatibilityStrategy={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName(), schemaCompatibilityStrategy});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setSchemaCompatibilityStrategy", th, asyncResponse);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), @ApiResponse(code = 404, message = "Topic does not exist")})
    @Path("/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy")
    @DELETE
    @ApiOperation("Remove schema compatibility strategy on a topic")
    public void removeSchemaCompatibilityStrategy(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @ApiParam("Strategy used to check the compatibility of new schema") SchemaCompatibilityStrategy schemaCompatibilityStrategy) {
        validateTopicName(str, str2, str3);
        preValidation(z).thenCompose(r4 -> {
            return internalSetSchemaCompatibilityStrategy(null);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove topic schema compatibility strategy: tenant={}, namespace={}, topic={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName()});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeSchemaCompatibilityStrategy", th, asyncResponse);
            return null;
        });
    }
}
