/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin.v2;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Charsets;
import org.apache.pulsar.shade.io.swagger.annotations.Api;
import org.apache.pulsar.shade.io.swagger.annotations.ApiOperation;
import org.apache.pulsar.shade.io.swagger.annotations.ApiParam;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponse;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponses;
import org.apache.pulsar.shade.io.swagger.annotations.Example;
import org.apache.pulsar.shade.io.swagger.annotations.ExampleProperty;
import org.apache.pulsar.shade.javax.ws.rs.Consumes;
import org.apache.pulsar.shade.javax.ws.rs.DELETE;
import org.apache.pulsar.shade.javax.ws.rs.DefaultValue;
import org.apache.pulsar.shade.javax.ws.rs.Encoded;
import org.apache.pulsar.shade.javax.ws.rs.GET;
import org.apache.pulsar.shade.javax.ws.rs.POST;
import org.apache.pulsar.shade.javax.ws.rs.Path;
import org.apache.pulsar.shade.javax.ws.rs.PathParam;
import org.apache.pulsar.shade.javax.ws.rs.Produces;
import org.apache.pulsar.shade.javax.ws.rs.QueryParam;
import org.apache.pulsar.shade.javax.ws.rs.container.AsyncResponse;
import org.apache.pulsar.shade.javax.ws.rs.container.Suspended;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.shade.org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/schemas")
@Api(value="/schemas", description="Schemas related admin APIs", tags={"schemas"})
public class SchemasResource
extends AdminResource {
    private static final Logger log = LoggerFactory.getLogger(SchemasResource.class);
    private final Clock clock;

    public SchemasResource() {
        this(Clock.systemUTC());
    }

    @VisibleForTesting
    public SchemasResource(Clock clock) {
        this.clock = clock;
    }

    private static long getLongSchemaVersion(SchemaVersion schemaVersion) {
        if (schemaVersion instanceof LongSchemaVersion) {
            return ((LongSchemaVersion)schemaVersion).getVersion();
        }
        return -1L;
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/schema")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get the schema of a topic", response=GetSchemaResponse.class)
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Client is not authorized or Don't have admin permission"), @ApiResponse(code=403, message="Client is not authenticated"), @ApiResponse(code=404, message="Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"), @ApiResponse(code=412, message="Failed to find the ownership for the topic"), @ApiResponse(code=500, message="Internal Server Error")})
    public void getSchema(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") String topic, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @Suspended AsyncResponse response) {
        this.validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
        String schemaId = this.buildSchemaId(tenant, namespace, topic);
        this.pulsar().getSchemaRegistryService().getSchema(schemaId).handle((schema, error) -> {
            SchemasResource.handleGetSchemaResponse(response, schema, error);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/schema/{version}")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get the schema of a topic at a given version", response=GetSchemaResponse.class)
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Client is not authorized or Don't have admin permission"), @ApiResponse(code=403, message="Client is not authenticated"), @ApiResponse(code=404, message="Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"), @ApiResponse(code=412, message="Failed to find the ownership for the topic"), @ApiResponse(code=500, message="Internal Server Error")})
    public void getSchema(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") String topic, @PathParam(value="version") @Encoded String version, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @Suspended AsyncResponse response) {
        this.validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
        String schemaId = this.buildSchemaId(tenant, namespace, topic);
        ByteBuffer bbVersion = ByteBuffer.allocate(8);
        bbVersion.putLong(Long.parseLong(version));
        SchemaVersion v = this.pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array());
        this.pulsar().getSchemaRegistryService().getSchema(schemaId, v).handle((schema, error) -> {
            SchemasResource.handleGetSchemaResponse(response, schema, error);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/schemas")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get the all schemas of a topic", response=GetAllVersionsSchemaResponse.class)
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Client is not authorized or Don't have admin permission"), @ApiResponse(code=403, message="Client is not authenticated"), @ApiResponse(code=404, message="Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"), @ApiResponse(code=412, message="Failed to find the ownership for the topic"), @ApiResponse(code=500, message="Internal Server Error")})
    public void getAllSchemas(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") String topic, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @Suspended AsyncResponse response) {
        this.validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
        String schemaId = this.buildSchemaId(tenant, namespace, topic);
        this.pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId).handle((schema, error) -> {
            SchemasResource.handleGetAllSchemasResponse(response, schema, error);
            return null;
        });
    }

    private static void handleGetSchemaResponse(AsyncResponse response, SchemaRegistry.SchemaAndMetadata schema, Throwable error) {
        if (Objects.isNull(error)) {
            if (Objects.isNull(schema)) {
                response.resume(Response.status(Response.Status.NOT_FOUND).build());
            } else if (schema.schema.isDeleted()) {
                response.resume(Response.status(Response.Status.NOT_FOUND).build());
            } else {
                response.resume(Response.ok().encoding("application/json").entity(SchemasResource.convertSchemaAndMetadataToGetSchemaResponse(schema)).build());
            }
        } else {
            response.resume(error);
        }
    }

    private static void handleGetAllSchemasResponse(AsyncResponse response, List<SchemaRegistry.SchemaAndMetadata> schemas, Throwable error) {
        if (Objects.isNull(error)) {
            if (Objects.isNull(schemas)) {
                response.resume(Response.status(Response.Status.NOT_FOUND).build());
            } else {
                response.resume(Response.ok().encoding("application/json").entity(GetAllVersionsSchemaResponse.builder().getSchemaResponses(schemas.stream().map(SchemasResource::convertSchemaAndMetadataToGetSchemaResponse).collect(Collectors.toList())).build()).build());
            }
        } else {
            response.resume(error);
        }
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/schema")
    @Produces(value={"application/json"})
    @ApiOperation(value="Delete the schema of a topic", response=DeleteSchemaResponse.class)
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Client is not authorized or Don't have admin permission"), @ApiResponse(code=403, message="Client is not authenticated"), @ApiResponse(code=404, message="Tenant or Namespace or Topic doesn't exist"), @ApiResponse(code=412, message="Failed to find the ownership for the topic"), @ApiResponse(code=500, message="Internal Server Error")})
    public void deleteSchema(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") String topic, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @Suspended AsyncResponse response) {
        this.validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
        String schemaId = this.buildSchemaId(tenant, namespace, topic);
        this.pulsar().getSchemaRegistryService().deleteSchema(schemaId, StringUtils.defaultIfEmpty(this.clientAppId(), "")).handle((version, error) -> {
            if (Objects.isNull(error)) {
                response.resume(Response.ok().entity(DeleteSchemaResponse.builder().version(SchemasResource.getLongSchemaVersion(version)).build()).build());
            } else {
                response.resume((Throwable)error);
            }
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/schema")
    @Produces(value={"application/json"})
    @Consumes(value={"application/json"})
    @ApiOperation(value="Update the schema of a topic", response=PostSchemaResponse.class)
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Client is not authorized or Don't have admin permission"), @ApiResponse(code=403, message="Client is not authenticated"), @ApiResponse(code=404, message="Tenant or Namespace or Topic doesn't exist"), @ApiResponse(code=409, message="Incompatible schema"), @ApiResponse(code=412, message="Failed to find the ownership for the topic"), @ApiResponse(code=422, message="Invalid schema data"), @ApiResponse(code=500, message="Internal Server Error")})
    public void postSchema(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") String topic, @ApiParam(value="A JSON value presenting a schema playload. An example of the expected schema can be found down here.", examples=@Example(value={@ExampleProperty(mediaType="application/json", value="{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }")})) PostSchemaPayload payload, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @Suspended AsyncResponse response) {
        this.validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
        NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
        ((CompletableFuture)this.getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
            SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
            if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
                schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
            }
            byte[] data = SchemaType.KEY_VALUE.name().equals(payload.getType()) ? DefaultImplementation.convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema().getBytes(Charsets.UTF_8)) : payload.getSchema().getBytes(Charsets.UTF_8);
            ((CompletableFuture)this.pulsar().getSchemaRegistryService().putSchemaIfAbsent(this.buildSchemaId(tenant, namespace, topic), SchemaData.builder().data(data).isDeleted(false).timestamp(this.clock.millis()).type(SchemaType.valueOf(payload.getType())).user(StringUtils.defaultIfEmpty(this.clientAppId(), "")).props(payload.getProperties()).build(), schemaCompatibilityStrategy).thenAccept(version -> response.resume(Response.accepted().entity(PostSchemaResponse.builder().version((SchemaVersion)version).build()).build()))).exceptionally(error -> {
                if (error.getCause() instanceof IncompatibleSchemaException) {
                    response.resume(Response.status(Response.Status.CONFLICT.getStatusCode(), error.getCause().getMessage()).build());
                } else if (error instanceof InvalidSchemaDataException) {
                    response.resume(Response.status(422, error.getMessage()).build());
                } else {
                    response.resume(Response.serverError().build());
                }
                return null;
            });
        })).exceptionally(error -> {
            if (error.getCause() instanceof RestException) {
                response.resume(Response.status(((RestException)error.getCause()).getResponse().getStatus(), error.getMessage()).build());
            } else {
                response.resume(Response.serverError().build());
            }
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/compatibility")
    @Produces(value={"application/json"})
    @Consumes(value={"application/json"})
    @ApiOperation(value="test the schema compatibility", response=IsCompatibilityResponse.class)
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Client is not authorized or Don't have admin permission"), @ApiResponse(code=403, message="Client is not authenticated"), @ApiResponse(code=404, message="Tenant or Namespace or Topic doesn't exist"), @ApiResponse(code=412, message="Failed to find the ownership for the topic"), @ApiResponse(code=500, message="Internal Server Error")})
    public void testCompatibility(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") String topic, @ApiParam(value="A JSON value presenting a schema playload. An example of the expected schema can be found down here.", examples=@Example(value={@ExampleProperty(mediaType="application/json", value="{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }")})) PostSchemaPayload payload, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @Suspended AsyncResponse response) {
        this.validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
        String schemaId = this.buildSchemaId(tenant, namespace, topic);
        Policies policies = this.getNamespacePolicies(NamespaceName.get(tenant, namespace));
        SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED ? SchemaCompatibilityStrategy.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy) : policies.schema_compatibility_strategy;
        ((CompletableFuture)this.pulsar().getSchemaRegistryService().isCompatible(schemaId, SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false).timestamp(this.clock.millis()).type(SchemaType.valueOf(payload.getType())).user(StringUtils.defaultIfEmpty(this.clientAppId(), "")).props(payload.getProperties()).build(), schemaCompatibilityStrategy).thenAccept(isCompatible -> response.resume(Response.accepted().entity(IsCompatibilityResponse.builder().isCompatibility((boolean)isCompatible).schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build()).build()))).exceptionally(error -> {
            response.resume(Response.serverError().build());
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/version")
    @Produces(value={"application/json"})
    @Consumes(value={"application/json"})
    @ApiOperation(value="get the version of the schema", response=LongSchemaVersion.class)
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Client is not authorized or Don't have admin permission"), @ApiResponse(code=403, message="Client is not authenticated"), @ApiResponse(code=404, message="Tenant or Namespace or Topic doesn't exist"), @ApiResponse(code=412, message="Failed to find the ownership for the topic"), @ApiResponse(code=422, message="Invalid schema data"), @ApiResponse(code=500, message="Internal Server Error")})
    public void getVersionBySchema(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") String topic, @ApiParam(value="A JSON value presenting a schema playload. An example of the expected schema can be found down here.", examples=@Example(value={@ExampleProperty(mediaType="application/json", value="{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }")})) PostSchemaPayload payload, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @Suspended AsyncResponse response) {
        this.validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
        String schemaId = this.buildSchemaId(tenant, namespace, topic);
        ((CompletableFuture)this.pulsar().getSchemaRegistryService().findSchemaVersion(schemaId, SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false).timestamp(this.clock.millis()).type(SchemaType.valueOf(payload.getType())).user(StringUtils.defaultIfEmpty(this.clientAppId(), "")).props(payload.getProperties()).build()).thenAccept(version -> response.resume(Response.accepted().entity(LongSchemaVersionResponse.builder().version((Long)version).build()).build()))).exceptionally(error -> {
            response.resume(Response.serverError().build());
            return null;
        });
    }

    private static GetSchemaResponse convertSchemaAndMetadataToGetSchemaResponse(SchemaRegistry.SchemaAndMetadata schemaAndMetadata) {
        String schemaData = schemaAndMetadata.schema.getType() == SchemaType.KEY_VALUE ? DefaultImplementation.convertKeyValueSchemaInfoDataToString(DefaultImplementation.decodeKeyValueSchemaInfo(schemaAndMetadata.schema.toSchemaInfo())) : new String(schemaAndMetadata.schema.getData(), StandardCharsets.UTF_8);
        return GetSchemaResponse.builder().version(SchemasResource.getLongSchemaVersion(schemaAndMetadata.version)).type(schemaAndMetadata.schema.getType()).timestamp(schemaAndMetadata.schema.getTimestamp()).data(schemaData).properties(schemaAndMetadata.schema.getProps()).build();
    }

    private String buildSchemaId(String tenant, String namespace, String topic) {
        TopicName topicName = TopicName.get("persistent", tenant, namespace, topic);
        if (topicName.isPartitioned()) {
            return TopicName.get(topicName.getPartitionedTopicName()).getSchemaName();
        }
        return topicName.getSchemaName();
    }

    private void validateDestinationAndAdminOperation(String tenant, String namespace, String topic, boolean authoritative) {
        TopicName destinationName = TopicName.get("persistent", tenant, namespace, Codec.decode(topic));
        try {
            this.validateAdminAccessForTenant(destinationName.getTenant());
            this.validateTopicOwnership(destinationName, authoritative);
        }
        catch (RestException e) {
            if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
                throw new RestException(Response.Status.NOT_FOUND, "Not Found");
            }
            throw e;
        }
    }
}

