package org.apache.pulsar.broker.admin.v2;

import com.google.common.annotations.VisibleForTesting;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Example;
import io.swagger.annotations.ExampleProperty;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.SchemasResourceBase;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Api(value = "/schemas", description = "Schemas related admin APIs", tags = {"schemas"})
@Path("/schemas")
/* loaded from: input_file:org/apache/pulsar/broker/admin/v2/SchemasResource.class */
public class SchemasResource extends SchemasResourceBase {
    private static final Logger log = LoggerFactory.getLogger(SchemasResource.class);

    @VisibleForTesting
    public SchemasResource() {
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"), @ApiResponse(code = 403, message = "Client is not authenticated"), @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"), @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"), @ApiResponse(code = 500, message = "Internal Server Error")})
    @Path("/{tenant}/{namespace}/{topic}/schema")
    @ApiOperation(value = "Get the schema of a topic", response = GetSchemaResponse.class)
    @Produces({"application/json"})
    public void getSchema(@PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") String str3, @QueryParam("authoritative") @DefaultValue("false") boolean z, @Suspended AsyncResponse asyncResponse) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenApply = getSchemaAsync(z).thenApply(schemaAndMetadata -> {
            return this.convertToSchemaResponse(schemaAndMetadata);
        });
        Objects.requireNonNull(asyncResponse);
        thenApply.thenApply((Function<? super U, ? extends U>) (v1) -> {
            return r1.resume(v1);
        }).exceptionally(th -> {
            if (shouldPrintErrorLog(th)) {
                log.error("[{}] Failed to get schema for topic {}", new Object[]{clientAppId(), this.topicName, th});
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"), @ApiResponse(code = 403, message = "Client is not authenticated"), @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"), @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"), @ApiResponse(code = 500, message = "Internal Server Error")})
    @Path("/{tenant}/{namespace}/{topic}/schema/{version}")
    @ApiOperation(value = "Get the schema of a topic at a given version", response = GetSchemaResponse.class)
    @Produces({"application/json"})
    public void getSchema(@PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") String str3, @PathParam("version") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z, @Suspended AsyncResponse asyncResponse) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenApply = getSchemaAsync(z, str4).thenApply(schemaAndMetadata -> {
            return this.convertToSchemaResponse(schemaAndMetadata);
        });
        Objects.requireNonNull(asyncResponse);
        thenApply.thenAccept((Consumer<? super U>) (v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            if (shouldPrintErrorLog(th)) {
                log.error("[{}] Failed to get schema for topic {} with version {}", new Object[]{clientAppId(), this.topicName, str4, th});
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"), @ApiResponse(code = 403, message = "Client is not authenticated"), @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"), @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"), @ApiResponse(code = 500, message = "Internal Server Error")})
    @Path("/{tenant}/{namespace}/{topic}/schemas")
    @ApiOperation(value = "Get the all schemas of a topic", response = GetAllVersionsSchemaResponse.class)
    @Produces({"application/json"})
    public void getAllSchemas(@PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") String str3, @QueryParam("authoritative") @DefaultValue("false") boolean z, @Suspended AsyncResponse asyncResponse) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenApply = getAllSchemasAsync(z).thenApply(list -> {
            return this.convertToAllVersionsSchemaResponse(list);
        });
        Objects.requireNonNull(asyncResponse);
        thenApply.thenAccept((Consumer<? super U>) (v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            if (shouldPrintErrorLog(th)) {
                log.error("[{}] Failed to get all schemas for topic {}", new Object[]{clientAppId(), this.topicName, th});
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"), @ApiResponse(code = 403, message = "Client is not authenticated"), @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist"), @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"), @ApiResponse(code = 500, message = "Internal Server Error")})
    @Path("/{tenant}/{namespace}/{topic}/schema")
    @DELETE
    @ApiOperation(value = "Delete the schema of a topic", response = DeleteSchemaResponse.class)
    @Produces({"application/json"})
    public void deleteSchema(@PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") String str3, @QueryParam("authoritative") @DefaultValue("false") boolean z, @QueryParam("force") @DefaultValue("false") boolean z2, @Suspended AsyncResponse asyncResponse) {
        validateTopicName(str, str2, str3);
        deleteSchemaAsync(z, z2).thenAccept(schemaVersion -> {
            asyncResponse.resume(DeleteSchemaResponse.builder().version(getLongSchemaVersion(schemaVersion)).build());
        }).exceptionally(th -> {
            if (shouldPrintErrorLog(th)) {
                log.error("[{}] Failed to delete schemas for topic {}", new Object[]{clientAppId(), this.topicName, th});
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"), @ApiResponse(code = 403, message = "Client is not authenticated"), @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist"), @ApiResponse(code = 409, message = "Incompatible schema"), @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"), @ApiResponse(code = 422, message = "Invalid schema data"), @ApiResponse(code = 500, message = "Internal Server Error")})
    @Path("/{tenant}/{namespace}/{topic}/schema")
    @Consumes({"application/json"})
    @ApiOperation(value = "Update the schema of a topic", response = PostSchemaResponse.class)
    @POST
    @Produces({"application/json"})
    public void postSchema(@PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") String str3, @ApiParam(value = "A JSON value presenting a schema payload. An example of the expected schema can be found down here.", examples = @Example({@ExampleProperty(mediaType = "application/json", value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }")})) PostSchemaPayload postSchemaPayload, @QueryParam("authoritative") @DefaultValue("false") boolean z, @Suspended AsyncResponse asyncResponse) {
        validateTopicName(str, str2, str3);
        postSchemaAsync(postSchemaPayload, z).thenAccept(schemaVersion -> {
            asyncResponse.resume(PostSchemaResponse.builder().version(schemaVersion).build());
        }).exceptionally(th -> {
            Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
            if (unwrapCompletionException instanceof IncompatibleSchemaException) {
                asyncResponse.resume(Response.status(Response.Status.CONFLICT.getStatusCode(), unwrapCompletionException.getMessage()).build());
                return null;
            }
            if (unwrapCompletionException instanceof InvalidSchemaDataException) {
                asyncResponse.resume(Response.status(422, unwrapCompletionException.getMessage()).build());
                return null;
            }
            if (shouldPrintErrorLog(th)) {
                log.error("[{}] Failed to post schemas for topic {}", new Object[]{clientAppId(), this.topicName, unwrapCompletionException});
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"), @ApiResponse(code = 403, message = "Client is not authenticated"), @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist"), @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"), @ApiResponse(code = 500, message = "Internal Server Error")})
    @Path("/{tenant}/{namespace}/{topic}/compatibility")
    @Consumes({"application/json"})
    @ApiOperation(value = "test the schema compatibility", response = IsCompatibilityResponse.class)
    @POST
    @Produces({"application/json"})
    public void testCompatibility(@PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") String str3, @ApiParam(value = "A JSON value presenting a schema payload. An example of the expected schema can be found down here.", examples = @Example({@ExampleProperty(mediaType = "application/json", value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }")})) PostSchemaPayload postSchemaPayload, @QueryParam("authoritative") @DefaultValue("false") boolean z, @Suspended AsyncResponse asyncResponse) {
        validateTopicName(str, str2, str3);
        testCompatibilityAsync(postSchemaPayload, z).thenAccept(pair -> {
            asyncResponse.resume(Response.accepted().entity(IsCompatibilityResponse.builder().isCompatibility(((Boolean) pair.getLeft()).booleanValue()).schemaCompatibilityStrategy(((SchemaCompatibilityStrategy) pair.getRight()).name()).build()).build());
        }).exceptionally(th -> {
            if (shouldPrintErrorLog(th)) {
                log.error("[{}] Failed to test compatibility for topic {}", new Object[]{clientAppId(), this.topicName, th});
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"), @ApiResponse(code = 403, message = "Client is not authenticated"), @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist"), @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"), @ApiResponse(code = 422, message = "Invalid schema data"), @ApiResponse(code = 500, message = "Internal Server Error")})
    @Path("/{tenant}/{namespace}/{topic}/version")
    @Consumes({"application/json"})
    @ApiOperation(value = "get the version of the schema", response = LongSchemaVersion.class)
    @POST
    @Produces({"application/json"})
    public void getVersionBySchema(@PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") String str3, @ApiParam(value = "A JSON value presenting a schema payload. An example of the expected schema can be found down here.", examples = @Example({@ExampleProperty(mediaType = "application/json", value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }")})) PostSchemaPayload postSchemaPayload, @QueryParam("authoritative") @DefaultValue("false") boolean z, @Suspended AsyncResponse asyncResponse) {
        validateTopicName(str, str2, str3);
        getVersionBySchemaAsync(postSchemaPayload, z).thenAccept(l -> {
            asyncResponse.resume(LongSchemaVersionResponse.builder().version(l).build());
        }).exceptionally(th -> {
            if (shouldPrintErrorLog(th)) {
                log.error("[{}] Failed to get version by schema for topic {}", new Object[]{clientAppId(), this.topicName, th});
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }
}
