package org.apache.pulsar.broker.admin.v1;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.shade.io.swagger.annotations.Api;
import org.apache.pulsar.shade.io.swagger.annotations.ApiOperation;
import org.apache.pulsar.shade.io.swagger.annotations.ApiParam;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponse;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponses;
import org.apache.pulsar.shade.javax.ws.rs.DELETE;
import org.apache.pulsar.shade.javax.ws.rs.DefaultValue;
import org.apache.pulsar.shade.javax.ws.rs.Encoded;
import org.apache.pulsar.shade.javax.ws.rs.GET;
import org.apache.pulsar.shade.javax.ws.rs.POST;
import org.apache.pulsar.shade.javax.ws.rs.PUT;
import org.apache.pulsar.shade.javax.ws.rs.Path;
import org.apache.pulsar.shade.javax.ws.rs.PathParam;
import org.apache.pulsar.shade.javax.ws.rs.Produces;
import org.apache.pulsar.shade.javax.ws.rs.QueryParam;
import org.apache.pulsar.shade.javax.ws.rs.WebApplicationException;
import org.apache.pulsar.shade.javax.ws.rs.container.AsyncResponse;
import org.apache.pulsar.shade.javax.ws.rs.container.Suspended;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Produces({"application/json"})
@Path("/persistent")
@Api(value = "/persistent", description = "Persistent topic admin apis", tags = {"persistent topic"}, hidden = true)
/* loaded from: input_file:org/apache/pulsar/broker/admin/v1/PersistentTopics.class */
public class PersistentTopics extends PersistentTopicsBase {
    private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);

    @Path("/{property}/{cluster}/{namespace}")
    @GET
    @ApiOperation(hidden = true, value = "Get the list of topics under a namespace.", response = String.class, responseContainer = "List")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin or consume permission on namespace"), @ApiResponse(code = 404, message = "Namespace doesn't exist")})
    public void getList(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        try {
            validateNamespaceName(str, str2, str3);
            asyncResponse.resume(internalGetList());
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @Path("/{property}/{cluster}/{namespace}/partitioned")
    @GET
    @ApiOperation(hidden = true, value = "Get the list of partitioned topics under a namespace.", response = String.class, responseContainer = "List")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin or consume permission on namespace"), @ApiResponse(code = 404, message = "Namespace doesn't exist")})
    public List<String> getPartitionedTopicList(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        validateNamespaceName(str, str2, str3);
        return internalGetPartitionedTopicList();
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/permissions")
    @GET
    @ApiOperation(hidden = true, value = "Get permissions on a topic.", notes = "Retrieve the effective permissions for a topic. These permissions are defined by the permissions set at thenamespace level combined (union) with any eventual specific permission set on the topic.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace doesn't exist")})
    public Map<String, Set<AuthAction>> getPermissionsOnTopic(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4) {
        validateTopicName(str, str2, str3, str4);
        return internalGetPermissionsOnTopic();
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/permissions/{role}")
    @POST
    @ApiOperation(hidden = true, value = "Grant a new permission to a role on a single topic.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace doesn't exist"), @ApiResponse(code = 409, message = "Concurrent modification")})
    public void grantPermissionsOnTopic(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @PathParam("role") String str5, Set<AuthAction> set) {
        validateTopicName(str, str2, str3, str4);
        internalGrantPermissionsOnTopic(str5, set);
    }

    @DELETE
    @Path("/{property}/{cluster}/{namespace}/{topic}/permissions/{role}")
    @ApiOperation(hidden = true, value = "Revoke permissions on a topic.", notes = "Revoke permissions to a role on a single topic. If the permission was not set at the topiclevel, but rather at the namespace level, this operation will return an error (HTTP status code 412).")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace doesn't exist"), @ApiResponse(code = 412, message = "Permissions are not set at the topic level")})
    public void revokePermissionsOnTopic(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @PathParam("role") String str5) {
        validateTopicName(str, str2, str3, str4);
        internalRevokePermissionsOnTopic(str5);
    }

    @PUT
    @Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
    @ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code = 409, message = "Partitioned topic already exist")})
    public void createPartitionedTopic(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, int i, @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalCreatePartitionedTopic(asyncResponse, i, z);
        } catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{clientAppId(), this.topicName, e});
            resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @PUT
    @Path("/{tenant}/{cluster}/{namespace}/{topic}")
    @ApiOperation(value = "Create a non-partitioned topic.", notes = "This is the only REST endpoint from which non-partitioned topics could be created.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 409, message = "Partitioned topic already exist"), @ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    public void createNonPartitionedTopic(@ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String str, @ApiParam(value = "Specify the cluster", required = true) @PathParam("cluster") String str2, @ApiParam(value = "Specify the namespace", required = true) @PathParam("namespace") String str3, @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String str4, @ApiParam("Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateNamespaceName(str, str2, str3);
        validateTopicName(str, str2, str3, str4);
        validateGlobalNamespaceOwnership();
        internalCreateNonPartitionedTopic(z);
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
    @POST
    @ApiOperation(hidden = true, value = "Increment partitons of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code = 409, message = "Partitioned topic does not exist")})
    public void updatePartitionedTopic(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean z, @ApiParam("Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean z2, @QueryParam("force") @DefaultValue("false") boolean z3, int i) {
        validateTopicName(str, str2, str3, str4);
        internalUpdatePartitionedTopic(i, z, z2, z3);
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
    @GET
    @ApiOperation(hidden = true, value = "Get partitioned topic metadata.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission")})
    public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z, @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3, str4);
        return internalGetPartitionedMetadata(z, z2);
    }

    @DELETE
    @Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
    @ApiOperation(hidden = true, value = "Delete a partitioned topic.", notes = "It will also delete all the partitions of the topic if it exists.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Partitioned topic does not exist")})
    public void deletePartitionedTopic(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("force") @DefaultValue("false") boolean z, @QueryParam("authoritative") @DefaultValue("false") boolean z2, @QueryParam("deleteSchema") @DefaultValue("false") boolean z3) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalDeletePartitionedTopic(asyncResponse, z2, z, z3);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @PUT
    @Path("/{property}/{cluster}/{namespace}/{topic}/unload")
    @ApiOperation(hidden = true, value = "Unload a topic")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    public void unloadTopic(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3, str4);
        internalUnloadTopic(asyncResponse, z);
    }

    @DELETE
    @Path("/{property}/{cluster}/{namespace}/{topic}")
    @ApiOperation(hidden = true, value = "Delete a topic.", notes = "The topic cannot be deleted if delete is not forcefully and there's any active subscription or producer connected to the it. Force delete ignores connected clients and deletes topic by explicitly closing them.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Topic has active producers/subscriptions")})
    public void deleteTopic(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("force") @DefaultValue("false") boolean z, @QueryParam("authoritative") @DefaultValue("false") boolean z2, @QueryParam("deleteSchema") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3, str4);
        internalDeleteTopic(z2, z, z3);
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/subscriptions")
    @GET
    @ApiOperation(hidden = true, value = "Get the list of persistent subscriptions for a given topic.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    public void getSubscriptions(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalGetSubscriptions(asyncResponse, z);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @Path("{property}/{cluster}/{namespace}/{topic}/stats")
    @GET
    @ApiOperation(hidden = true, value = "Get the stats for the topic.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    public TopicStats getStats(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z, @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3, str4);
        return internalGetStats(z, z2, false);
    }

    @Path("{property}/{cluster}/{namespace}/{topic}/internalStats")
    @GET
    @ApiOperation(hidden = true, value = "Get the internal stats for the topic.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    public PersistentTopicInternalStats getInternalStats(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z, @QueryParam("metadata") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3, str4);
        return internalGetInternalStats(z, z2);
    }

    @Path("{property}/{cluster}/{namespace}/{topic}/internal-info")
    @GET
    @ApiOperation(hidden = true, value = "Get the stored topic metadata.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    public void getManagedLedgerInfo(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @Suspended AsyncResponse asyncResponse, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3, str4);
        internalGetManagedLedgerInfo(asyncResponse, z);
    }

    @Path("{property}/{cluster}/{namespace}/{topic}/partitioned-stats")
    @GET
    @ApiOperation(hidden = true, value = "Get the stats for the partitioned topic.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    public void getPartitionedStats(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("perPartition") @DefaultValue("true") boolean z, @QueryParam("authoritative") @DefaultValue("false") boolean z2) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalGetPartitionedStats(asyncResponse, z2, z, false, false);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @Path("{property}/{cluster}/{namespace}/{topic}/partitioned-internalStats")
    @GET
    @ApiOperation(hidden = true, value = "Get the stats-internal for the partitioned topic.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    public void getPartitionedStatsInternal(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalGetPartitionedStatsInternal(asyncResponse, z);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @DELETE
    @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}")
    @ApiOperation(hidden = true, value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there are any active consumers attached to it. Force delete ignores connected consumers and deletes subscription by explicitly closing them.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Subscription has active consumers")})
    public void deleteSubscription(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @PathParam("subName") String str5, @QueryParam("force") @DefaultValue("false") boolean z, @QueryParam("authoritative") @DefaultValue("false") boolean z2) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalDeleteSubscription(asyncResponse, Codec.decode(str5), z2, z);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skip_all")
    @POST
    @ApiOperation(hidden = true, value = "Skip all messages on a topic subscription.", notes = "Completely clears the backlog on the subscription.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"), @ApiResponse(code = 404, message = "Topic or subscription does not exist")})
    public void skipAllMessages(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @PathParam("subName") String str5, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalSkipAllMessages(asyncResponse, Codec.decode(str5), z);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}")
    @POST
    @ApiOperation(hidden = true, value = "Skip messages on a topic subscription.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist")})
    public void skipMessages(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @PathParam("subName") String str5, @PathParam("numMessages") int i, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3, str4);
        internalSkipMessages(Codec.decode(str5), i, z);
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/expireMessages/{expireTimeInSeconds}")
    @POST
    @ApiOperation(hidden = true, value = "Expire messages on a topic subscription.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist")})
    public void expireTopicMessages(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @PathParam("subName") String str5, @PathParam("expireTimeInSeconds") int i, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalExpireMessagesByTimestamp(asyncResponse, Codec.decode(str5), i, z);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/expireMessages")
    @POST
    @ApiOperation("Expiry messages on a topic subscription.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist"), @ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    public void expireTopicMessages(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @ApiParam(value = "Specify the namespace", required = true) @PathParam("namespace") String str3, @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String str4, @ApiParam("Subscription to be Expiry messages on") @PathParam("subName") String str5, @ApiParam("Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean z, @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)") ResetCursorData resetCursorData) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalExpireMessagesByPosition(asyncResponse, Codec.decode(str5), z, new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(), resetCursorData.getPartitionIndex()), resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/all_subscription/expireMessages/{expireTimeInSeconds}")
    @POST
    @ApiOperation(hidden = true, value = "Expire messages on all subscriptions of topic.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist")})
    public void expireMessagesForAllSubscriptions(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @PathParam("expireTimeInSeconds") int i, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalExpireMessagesForAllSubscriptions(asyncResponse, i, z);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/resetcursor/{timestamp}")
    @POST
    @ApiOperation(hidden = true, value = "Reset subscription to message position closest to absolute timestamp (in ms).", notes = "It fence cursor and disconnects all active consumers before reseting cursor.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic/Subscription does not exist")})
    public void resetCursor(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @PathParam("subName") String str5, @PathParam("timestamp") long j, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalResetCursor(asyncResponse, Codec.decode(str5), j, z);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/resetcursor")
    @POST
    @ApiOperation(hidden = true, value = "Reset subscription to message position closest to given position.", notes = "It fence cursor and disconnects all active consumers before reseting cursor.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), @ApiResponse(code = 405, message = "Not supported for partitioned topics")})
    public void resetCursorOnPosition(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @PathParam("subName") String str5, @QueryParam("authoritative") @DefaultValue("false") boolean z, ResetCursorData resetCursorData) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalResetCursorOnPosition(asyncResponse, Codec.decode(str5), z, new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(), resetCursorData.getPartitionIndex()), resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
        } catch (Exception e) {
            resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @PUT
    @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subscriptionName}")
    @ApiOperation(value = "Create a subscription on the topic.", notes = "Creates a subscription on the topic at the specified message id")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 400, message = "Create subscription on non persistent topic is not supported"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), @ApiResponse(code = 405, message = "Not supported for partitioned topics")})
    public void createSubscription(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @PathParam("subscriptionName") String str5, @QueryParam("authoritative") @DefaultValue("false") boolean z, MessageIdImpl messageIdImpl, @QueryParam("replicated") boolean z2) {
        try {
            validateTopicName(str, str2, str3, str4);
            if (!this.topicName.isPersistent()) {
                throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic can only be done through client");
            }
            internalCreateSubscription(asyncResponse, Codec.decode(str5), messageIdImpl, z, z2);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/position/{messagePosition}")
    @GET
    @ApiOperation(hidden = true, value = "Peek nth message on a topic subscription.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist")})
    public Response peekNthMessage(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @PathParam("subName") String str5, @PathParam("messagePosition") int i, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3, str4);
        return internalPeekNthMessage(Codec.decode(str5), i, z);
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}")
    @GET
    @ApiOperation(hidden = true, value = "Get message by its messageId.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't java admin permission"), @ApiResponse(code = 404, message = "Topic, subscription or the messageId does not exist")})
    public void getMessageByID(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @PathParam("ledgerId") Long l, @PathParam("entryId") Long l2, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalGetMessageById(asyncResponse, l.longValue(), l2.longValue(), z);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @Path("{property}/{cluster}/{namespace}/{topic}/backlog")
    @GET
    @ApiOperation(hidden = true, value = "Get estimated backlog for offline topic.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist")})
    public PersistentOfflineTopicStats getBacklog(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3, str4);
        return internalGetBacklog(z);
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/terminate")
    @POST
    @ApiOperation(hidden = true, value = "Terminate a topic. A topic that is terminated will not accept any more messages to be published and will let consumer to drain existing messages in backlog")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"), @ApiResponse(code = 406, message = "Need to provide a persistent topic name"), @ApiResponse(code = 404, message = "Topic does not exist")})
    public MessageId terminate(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validatePersistentTopicName(str, str2, str3, str4);
        return internalTerminate(z);
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/terminate/partitions")
    @POST
    @ApiOperation(hidden = true, value = "Terminate all partitioned topic. A topic that is terminated will not accept any more messages to be published and will let consumer to drain existing messages in backlog")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"), @ApiResponse(code = 404, message = "Topic does not exist")})
    public void terminatePartitionedTopic(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3, str4);
        internalTerminatePartitionedTopic(asyncResponse, z);
    }

    @PUT
    @Path("/{property}/{cluster}/{namespace}/{topic}/compaction")
    @ApiOperation("Trigger a compaction operation on a topic.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 409, message = "Compaction already running")})
    public void compact(@Suspended AsyncResponse asyncResponse, @PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalTriggerCompaction(asyncResponse, z);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }

    @Path("/{property}/{cluster}/{namespace}/{topic}/compaction")
    @GET
    @ApiOperation("Get the status of a compaction operation for a topic.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), @ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run")})
    public LongRunningProcessStatus compactionStatus(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3, str4);
        return internalCompactionStatus(z);
    }

    @PUT
    @Path("/{tenant}/{cluster}/{namespace}/{topic}/offload")
    @ApiOperation("Offload a prefix of a topic to long term storage")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 409, message = "Offload already running")})
    public void triggerOffload(@PathParam("tenant") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z, MessageIdImpl messageIdImpl) {
        validateTopicName(str, str2, str3, str4);
        internalTriggerOffload(z, messageIdImpl);
    }

    @Path("/{tenant}/{cluster}/{namespace}/{topic}/offload")
    @GET
    @ApiOperation("Offload a prefix of a topic to long term storage")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), @ApiResponse(code = 404, message = "Topic does not exist")})
    public OffloadProcessStatus offloadStatus(@PathParam("tenant") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("topic") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateTopicName(str, str2, str3, str4);
        return internalOffloadStatus(z);
    }

    @Path("/{tenant}/{cluster}/{namespace}/{topic}/lastMessageId")
    @GET
    @ApiOperation("Return the last commit message id of topic")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    public void getLastMessageId(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String str, @ApiParam(value = "Specify the cluster", required = true) @PathParam("cluster") String str2, @ApiParam(value = "Specify the namespace", required = true) @PathParam("namespace") String str3, @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String str4, @ApiParam("Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalGetLastMessageId(asyncResponse, z);
        } catch (Exception e) {
            asyncResponse.resume((Throwable) new RestException(e));
        }
    }

    @Path("/{tenant}/{cluster}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus")
    @POST
    @ApiOperation("Enable or disable a replicated subscription on a topic.")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or subscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist"), @ApiResponse(code = 405, message = "Operation not allowed on this topic"), @ApiResponse(code = 412, message = "Can't find owner for topic"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    public void setReplicatedSubscriptionStatus(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String str, @ApiParam(value = "Specify the cluster", required = true) @PathParam("cluster") String str2, @ApiParam(value = "Specify the namespace", required = true) @PathParam("namespace") String str3, @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String str4, @ApiParam(value = "Name of subscription", required = true) @PathParam("subName") String str5, @ApiParam("Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean z, @ApiParam(value = "Whether to enable replicated subscription", required = true) boolean z2) {
        try {
            validateTopicName(str, str2, str3, str4);
            internalSetReplicatedSubscriptionStatus(asyncResponse, Codec.decode(str5), z, z2);
        } catch (WebApplicationException e) {
            asyncResponse.resume((Throwable) e);
        } catch (Exception e2) {
            asyncResponse.resume((Throwable) new RestException(e2));
        }
    }
}
