package org.apache.pulsar.broker.admin.v2;

import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Produces({"application/json"})
@Api(value = "/non-persistent", description = "Non-Persistent topic admin apis", tags = {"non-persistent topic"})
@Path("/non-persistent")
/* loaded from: input_file:org/apache/pulsar/broker/admin/v2/NonPersistentTopics.class */
public class NonPersistentTopics extends PersistentTopics {
    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopics.class);

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "The tenant/namespace/topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation("Get partitioned topic metadata.")
    public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("checkAllowAutoCreation") @ApiParam("Is check configuration required to automatically create topic") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        return getPartitionedTopicMetadata(this.topicName, z, z2);
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "The tenant/namespace/topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("{tenant}/{namespace}/{topic}/stats")
    @ApiOperation("Get the stats for the topic.")
    /* renamed from: getStats, reason: merged with bridge method [inline-methods] */
    public NonPersistentTopicStats mo27getStats(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("getPreciseBacklog") @ApiParam("If return precise backlog or imprecise backlog") @DefaultValue("false") boolean z2, @QueryParam("subscriptionBacklogSize") @ApiParam("If return backlog size for each subscription, require locking on ledger so be careful not to use when there's heavy traffic.") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        validateAdminOperationOnTopic(this.topicName, z);
        return ((NonPersistentTopic) getTopicReference(this.topicName)).mo122getStats(z2, z3);
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "The tenant/namespace/topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("{tenant}/{namespace}/{topic}/internalStats")
    @ApiOperation("Get the internal stats for the topic.")
    public PersistentTopicInternalStats getInternalStats(@PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z, @QueryParam("metadata") @DefaultValue("false") boolean z2) {
        boolean z3;
        validateTopicName(str, str2, str3);
        validateAdminOperationOnTopic(this.topicName, z);
        Topic topicReference = getTopicReference(this.topicName);
        if (z2) {
            try {
                if (hasSuperUserAccess()) {
                    z3 = true;
                    return topicReference.getInternalStats(z3).get();
                }
            } catch (Exception e) {
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e instanceof ExecutionException ? e.getCause().getMessage() : e.getMessage());
            }
        }
        z3 = false;
        return topicReference.getInternalStats(z3).get();
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "The tenant/namespace does not exist"), @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code = 409, message = "Partitioned topic already exists"), @ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
    @PUT
    public void createPartitionedTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0") int i) {
        try {
            validateGlobalNamespaceOwnership(str, str2);
            validateTopicName(str, str2, str3);
            validateAdminAccessForTenant(this.topicName.getTenant());
            internalCreatePartitionedTopic(asyncResponse, i);
        } catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{clientAppId(), this.topicName, e});
            resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "This operation requires super-user access"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "The tenant/namespace/topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/unload")
    @ApiOperation("Unload a topic")
    @PUT
    public void unloadTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Is authentication required to perform this operation") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalUnloadTopic(asyncResponse, z);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @GET
    @ApiResponses({@ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "The tenant/namespace does not exist"), @ApiResponse(code = 412, message = "Namespace name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}")
    @ApiOperation(value = "Get the list of non-persistent topics under a namespace.", response = String.class, responseContainer = "List")
    public void getList(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2) {
        try {
            validateNamespaceName(str, str2);
            if (log.isDebugEnabled()) {
                log.debug("[{}] list of topics on namespace {}", clientAppId(), this.namespaceName);
            }
            validateAdminAccessForTenant(str);
            Policies namespacePolicies = getNamespacePolicies(this.namespaceName);
            validateGlobalNamespaceOwnership(this.namespaceName);
            ArrayList newArrayList = Lists.newArrayList();
            List boundaries = namespacePolicies.bundles.getBoundaries();
            for (int i = 0; i < boundaries.size() - 1; i++) {
                String format = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
                try {
                    newArrayList.add(pulsar().getAdminClient().topics().getListInBundleAsync(this.namespaceName.toString(), format));
                } catch (PulsarServerException e) {
                    log.error("[{}] Failed to get list of topics under namespace {}/{}", new Object[]{clientAppId(), this.namespaceName, format, e});
                    asyncResponse.resume(new RestException((Throwable) e));
                    return;
                }
            }
            ArrayList newArrayList2 = Lists.newArrayList();
            FutureUtil.waitForAll(newArrayList).handle((r11, th) -> {
                for (int i2 = 0; i2 < newArrayList.size(); i2++) {
                    try {
                        if (((CompletableFuture) newArrayList.get(i2)).isDone() && ((CompletableFuture) newArrayList.get(i2)).get() != null) {
                            newArrayList2.addAll((Collection) ((CompletableFuture) newArrayList.get(i2)).get());
                        }
                    } catch (InterruptedException | ExecutionException e2) {
                        log.error("[{}] Failed to get list of topics under namespace {}", new Object[]{clientAppId(), this.namespaceName, e2});
                        asyncResponse.resume(new RestException(e2 instanceof ExecutionException ? e2.getCause() : e2));
                        return null;
                    }
                }
                asyncResponse.resume((List) newArrayList2.stream().filter(str3 -> {
                    return !TopicName.get(str3).isPersistent();
                }).collect(Collectors.toList()));
                return null;
            });
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        } catch (Exception e3) {
            asyncResponse.resume(new RestException(e3));
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace doesn't exist"), @ApiResponse(code = 412, message = "Namespace name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{bundle}")
    @ApiOperation(value = "Get the list of non-persistent topics under a namespace bundle.", response = String.class, responseContainer = "List")
    public void getListFromBundle(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("bundle") @ApiParam(value = "Bundle range of a topic", required = true) String str3) {
        validateNamespaceName(str, str2);
        if (log.isDebugEnabled()) {
            log.debug("[{}] list of topics on namespace bundle {}/{}", new Object[]{clientAppId(), this.namespaceName, str3});
        }
        validateAdminAccessForTenant(str);
        Policies namespacePolicies = getNamespacePolicies(this.namespaceName);
        validateGlobalNamespaceOwnership(this.namespaceName);
        isBundleOwnedByAnyBroker(this.namespaceName, namespacePolicies.bundles, str3).thenAccept(bool -> {
            if (!bool.booleanValue()) {
                log.info("[{}] Namespace bundle is not owned by any broker {}/{}", new Object[]{clientAppId(), this.namespaceName, str3});
                asyncResponse.resume(Response.noContent().build());
                return;
            }
            try {
                NamespaceBundle validateNamespaceBundleOwnership = validateNamespaceBundleOwnership(this.namespaceName, namespacePolicies.bundles, str3, true, true);
                try {
                    ArrayList newArrayList = Lists.newArrayList();
                    pulsar().getBrokerService().forEachTopic(topic -> {
                        if (validateNamespaceBundleOwnership.includes(TopicName.get(topic.getName()))) {
                            newArrayList.add(topic.getName());
                        }
                    });
                    asyncResponse.resume(newArrayList);
                } catch (Exception e) {
                    log.error("[{}] Failed to list topics on namespace bundle {}/{}", new Object[]{clientAppId(), this.namespaceName, str3, e});
                    asyncResponse.resume(new RestException(e));
                }
            } catch (WebApplicationException e2) {
                asyncResponse.resume(e2);
            }
        }).exceptionally(th -> {
            log.error("[{}] Failed to list topics on namespace bundle {}/{}", new Object[]{clientAppId(), this.namespaceName, str3, th});
            if (th.getCause() instanceof WebApplicationException) {
                asyncResponse.resume(th.getCause());
                return null;
            }
            asyncResponse.resume(new RestException(th.getCause()));
            return null;
        });
    }

    protected void validateAdminOperationOnTopic(TopicName topicName, boolean z) {
        validateAdminAccessForTenant(topicName.getTenant());
        validateTopicOwnership(topicName, z);
    }

    private Topic getTopicReference(TopicName topicName) {
        return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join().orElseThrow(() -> {
            return new RestException(Response.Status.NOT_FOUND, "Topic not found");
        });
    }
}
