/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.schemaregistry.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.pravega.common.Exceptions;
import io.pravega.common.util.CertificateUtils;
import io.pravega.common.util.Retry;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.client.SchemaRegistryClientConfig;
import io.pravega.schemaregistry.client.exceptions.RegistryExceptions;
import io.pravega.schemaregistry.common.AuthHelper;
import io.pravega.schemaregistry.common.ContinuationTokenIterator;
import io.pravega.schemaregistry.contract.data.CodecType;
import io.pravega.schemaregistry.contract.data.Compatibility;
import io.pravega.schemaregistry.contract.data.EncodingId;
import io.pravega.schemaregistry.contract.data.GroupHistoryRecord;
import io.pravega.schemaregistry.contract.data.GroupProperties;
import io.pravega.schemaregistry.contract.data.SchemaWithVersion;
import io.pravega.schemaregistry.contract.generated.rest.model.AddedTo;
import io.pravega.schemaregistry.contract.generated.rest.model.CanRead;
import io.pravega.schemaregistry.contract.generated.rest.model.CodecTypes;
import io.pravega.schemaregistry.contract.generated.rest.model.CreateGroupRequest;
import io.pravega.schemaregistry.contract.generated.rest.model.EncodingInfo;
import io.pravega.schemaregistry.contract.generated.rest.model.GetEncodingIdRequest;
import io.pravega.schemaregistry.contract.generated.rest.model.GroupHistory;
import io.pravega.schemaregistry.contract.generated.rest.model.ListGroupsResponse;
import io.pravega.schemaregistry.contract.generated.rest.model.SchemaInfo;
import io.pravega.schemaregistry.contract.generated.rest.model.SchemaVersionsList;
import io.pravega.schemaregistry.contract.generated.rest.model.UpdateCompatibilityRequest;
import io.pravega.schemaregistry.contract.generated.rest.model.Valid;
import io.pravega.schemaregistry.contract.generated.rest.model.ValidateRequest;
import io.pravega.schemaregistry.contract.generated.rest.model.VersionInfo;
import io.pravega.schemaregistry.contract.transform.ModelHelper;
import io.pravega.schemaregistry.contract.v1.ApiV1;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.AbstractMap;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.Response;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.proxy.WebResourceFactory;

public class SchemaRegistryClientImpl
implements SchemaRegistryClient {
    private static final Retry.RetryAndThrowConditionally RETRY = Retry.withExpBackoff(100L, 2, 10, 1000L).retryWhen(x -> Exceptions.unwrap(x) instanceof RegistryExceptions.ConnectionException);
    private static final int GROUP_LIMIT = 100;
    private static final String HTTPS = "https";
    private static final String TLS = "TLS";
    private final ApiV1.GroupsApi groupProxy;
    private final ApiV1.SchemasApi schemaProxy;
    private final String namespace;
    private final Client client;

    SchemaRegistryClientImpl(SchemaRegistryClientConfig config, String namespace) {
        Preconditions.checkNotNull(config);
        Preconditions.checkNotNull(config.getSchemaRegistryUri());
        ClientBuilder clientBuilder = ClientBuilder.newBuilder().withConfig(new ClientConfig());
        if (HTTPS.equalsIgnoreCase(config.getSchemaRegistryUri().getScheme())) {
            clientBuilder = clientBuilder.sslContext(this.getSSLContext(config));
            if (!config.isValidateHostName()) {
                clientBuilder.hostnameVerifier((a, b) -> true);
            }
        }
        this.client = clientBuilder.build();
        if (config.isAuthEnabled()) {
            this.client.register(context -> context.getHeaders().add("Authorization", AuthHelper.getAuthorizationHeader(config.getAuthMethod(), config.getAuthToken())));
        }
        this.namespace = namespace;
        this.groupProxy = WebResourceFactory.newResource(ApiV1.GroupsApi.class, this.client.target(config.getSchemaRegistryUri()));
        this.schemaProxy = WebResourceFactory.newResource(ApiV1.SchemasApi.class, this.client.target(config.getSchemaRegistryUri()));
    }

    @VisibleForTesting
    SchemaRegistryClientImpl(ApiV1.GroupsApi groupProxy) {
        this(groupProxy, null);
    }

    @VisibleForTesting
    SchemaRegistryClientImpl(ApiV1.GroupsApi groupProxy, ApiV1.SchemasApi schemaProxy) {
        this.groupProxy = groupProxy;
        this.schemaProxy = schemaProxy;
        this.namespace = null;
        this.client = null;
    }

    @Override
    public boolean addGroup(String groupId, GroupProperties groupProperties) {
        return this.withRetry(() -> {
            CreateGroupRequest request = new CreateGroupRequest().groupName(groupId).groupProperties(ModelHelper.encode(groupProperties));
            Response response = this.groupProxy.createGroup(this.namespace, request);
            Response.Status status = Response.Status.fromStatusCode(response.getStatus());
            switch (status) {
                case CREATED: {
                    return true;
                }
                case CONFLICT: {
                    return false;
                }
                case BAD_REQUEST: {
                    throw new RegistryExceptions.BadArgumentException("Group properties invalid.");
                }
            }
            return (Boolean)this.handleResponse(status, "Internal Service error. Failed to add the group.");
        });
    }

    @Override
    public void removeGroup(String groupId) {
        this.withRetry(() -> {
            Response response = this.groupProxy.deleteGroup(this.namespace, groupId);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case NO_CONTENT: {
                    return;
                }
            }
            this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to remove the group.");
        });
    }

    @Override
    public Iterator<Map.Entry<String, GroupProperties>> listGroups() {
        Function<String, Map.Entry> function = continuationToken -> {
            ListGroupsResponse entity = this.getListGroupsResponse((String)continuationToken);
            LinkedList<AbstractMap.SimpleEntry<String, GroupProperties>> map = new LinkedList<AbstractMap.SimpleEntry<String, GroupProperties>>();
            for (Map.Entry<String, io.pravega.schemaregistry.contract.generated.rest.model.GroupProperties> entry : entity.getGroups().entrySet()) {
                ModelHelper.decode(entry.getValue().getSerializationFormat());
                map.add(new AbstractMap.SimpleEntry<String, GroupProperties>(entry.getKey(), ModelHelper.decode(entry.getValue())));
            }
            return new AbstractMap.SimpleEntry(entity.getContinuationToken(), map);
        };
        return new ContinuationTokenIterator(function, null);
    }

    private ListGroupsResponse getListGroupsResponse(String continuationToken) {
        return this.withRetry(() -> {
            Response response = this.groupProxy.listGroups(this.namespace, continuationToken, 100);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    return response.readEntity(ListGroupsResponse.class);
                }
            }
            return (ListGroupsResponse)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to list groups.");
        });
    }

    @Override
    public GroupProperties getGroupProperties(String groupId) {
        return this.withRetry(() -> {
            Response response = this.groupProxy.getGroupProperties(this.namespace, groupId);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    return ModelHelper.decode(response.readEntity(io.pravega.schemaregistry.contract.generated.rest.model.GroupProperties.class));
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("Group not found.");
                }
            }
            return (GroupProperties)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to list groups.");
        });
    }

    @Override
    public boolean updateCompatibility(String groupId, Compatibility compatibility, @Nullable Compatibility previous) {
        return this.withRetry(() -> {
            UpdateCompatibilityRequest request = new UpdateCompatibilityRequest().compatibility(ModelHelper.encode(compatibility));
            if (previous != null) {
                request.setPreviousCompatibility(ModelHelper.encode(previous));
            }
            Response response = this.groupProxy.updateCompatibility(this.namespace, groupId, request);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case CONFLICT: {
                    return false;
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("Group not found.");
                }
                case OK: {
                    return true;
                }
            }
            return (Boolean)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to update compatibility.");
        });
    }

    @Override
    public List<SchemaWithVersion> getSchemas(String groupId) {
        return this.latestSchemas(groupId, null);
    }

    private List<SchemaWithVersion> latestSchemas(String groupId, String type) {
        return this.withRetry(() -> {
            Response response = this.groupProxy.getSchemas(this.namespace, groupId, type);
            SchemaVersionsList objectsList = response.readEntity(SchemaVersionsList.class);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    return objectsList.getSchemas().stream().map(ModelHelper::decode).collect(Collectors.toList());
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("Group not found.");
                }
            }
            return (List)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to get object types.");
        });
    }

    @Override
    public io.pravega.schemaregistry.contract.data.VersionInfo addSchema(String groupId, io.pravega.schemaregistry.contract.data.SchemaInfo schemaInfo) {
        return this.withRetry(() -> {
            Response response = this.groupProxy.addSchema(this.namespace, groupId, ModelHelper.encode(schemaInfo));
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case CREATED: {
                    return ModelHelper.decode(response.readEntity(VersionInfo.class));
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("Group not found.");
                }
                case CONFLICT: {
                    throw new RegistryExceptions.SchemaValidationFailedException("Schema is incompatible.");
                }
                case EXPECTATION_FAILED: {
                    throw new RegistryExceptions.SerializationMismatchException("Serialization format disallowed.");
                }
                case BAD_REQUEST: {
                    throw new RegistryExceptions.MalformedSchemaException("Schema is malformed. Verify the schema data and type");
                }
            }
            return (io.pravega.schemaregistry.contract.data.VersionInfo)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to addSchema.");
        });
    }

    @Override
    public void deleteSchemaVersion(String groupId, io.pravega.schemaregistry.contract.data.VersionInfo versionInfo) {
        this.withRetry(() -> {
            Response response = this.groupProxy.deleteSchemaForId(this.namespace, groupId, versionInfo.getId());
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
                throw new RegistryExceptions.ResourceNotFoundException("Group not found.");
            }
            if (response.getStatus() != Response.Status.NO_CONTENT.getStatusCode()) {
                this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to get schema.");
            }
        });
    }

    @Override
    public io.pravega.schemaregistry.contract.data.SchemaInfo getSchemaForVersion(String groupId, io.pravega.schemaregistry.contract.data.VersionInfo versionInfo) {
        return this.withRetry(() -> {
            Response response = this.groupProxy.getSchemaForId(this.namespace, groupId, versionInfo.getId());
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    return ModelHelper.decode(response.readEntity(SchemaInfo.class));
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("Schema not found.");
                }
            }
            return (io.pravega.schemaregistry.contract.data.SchemaInfo)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to get schema.");
        });
    }

    @Override
    public io.pravega.schemaregistry.contract.data.EncodingInfo getEncodingInfo(String groupId, EncodingId encodingId) {
        return this.withRetry(() -> {
            Response response = this.groupProxy.getEncodingInfo(this.namespace, groupId, encodingId.getId());
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    return ModelHelper.decode(response.readEntity(EncodingInfo.class));
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("Encoding not found.");
                }
            }
            return (io.pravega.schemaregistry.contract.data.EncodingInfo)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to get encoding info.");
        });
    }

    @Override
    public EncodingId getEncodingId(String groupId, io.pravega.schemaregistry.contract.data.VersionInfo versionInfo, String codecType) {
        return this.withRetry(() -> {
            GetEncodingIdRequest getEncodingIdRequest = new GetEncodingIdRequest();
            getEncodingIdRequest.codecType(codecType).versionInfo(ModelHelper.encode(versionInfo));
            Response response = this.groupProxy.getEncodingId(this.namespace, groupId, getEncodingIdRequest);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    return ModelHelper.decode(response.readEntity(io.pravega.schemaregistry.contract.generated.rest.model.EncodingId.class));
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("getEncodingId failed. Either Group or Version does not exist.");
                }
                case PRECONDITION_FAILED: {
                    throw new RegistryExceptions.CodecTypeNotRegisteredException(String.format("Codec type %s not registered.", codecType));
                }
            }
            return (EncodingId)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to get encoding info.");
        });
    }

    @Override
    public SchemaWithVersion getLatestSchemaVersion(String groupId, @Nullable String schemaType) {
        List<SchemaWithVersion> list = this.latestSchemas(groupId, schemaType);
        if (schemaType == null) {
            return list.stream().max(Comparator.comparingInt(x -> x.getVersionInfo().getId())).orElse(null);
        }
        return list.get(0);
    }

    @Override
    public List<SchemaWithVersion> getSchemaVersions(String groupId, @Nullable String schemaType) {
        return this.withRetry(() -> {
            Response response = this.groupProxy.getSchemaVersions(this.namespace, groupId, schemaType);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    SchemaVersionsList schemaList = response.readEntity(SchemaVersionsList.class);
                    return schemaList.getSchemas().stream().map(ModelHelper::decode).collect(Collectors.toList());
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("getSchemaVersions failed. Group does not exist.");
                }
            }
            return (List)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to get schema versions for group.");
        });
    }

    @Override
    public List<GroupHistoryRecord> getGroupHistory(String groupId) {
        return this.withRetry(() -> {
            Response response = this.groupProxy.getGroupHistory(this.namespace, groupId);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    GroupHistory history = response.readEntity(GroupHistory.class);
                    return history.getHistory().stream().map(ModelHelper::decode).collect(Collectors.toList());
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("getGroupHistory failed. Either Group or Version does not exist.");
                }
            }
            return (List)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to get schema evolution history for group.");
        });
    }

    @Override
    public Map<String, io.pravega.schemaregistry.contract.data.VersionInfo> getSchemaReferences(io.pravega.schemaregistry.contract.data.SchemaInfo schemaInfo) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException {
        return this.withRetry(() -> {
            Response response = this.schemaProxy.getSchemaReferences(ModelHelper.encode(schemaInfo), this.namespace);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    AddedTo addedTo = response.readEntity(AddedTo.class);
                    return addedTo.getGroups().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, x -> ModelHelper.decode((VersionInfo)x.getValue())));
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("getSchemaReferences failed. Either Group or Version does not exist.");
                }
            }
            return (Map)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to get schema evolution history for group.");
        });
    }

    @Override
    public io.pravega.schemaregistry.contract.data.VersionInfo getVersionForSchema(String groupId, io.pravega.schemaregistry.contract.data.SchemaInfo schema) {
        return this.withRetry(() -> {
            SchemaInfo schemaInfo = ModelHelper.encode(schema);
            Response response = this.groupProxy.getSchemaVersion(this.namespace, groupId, schemaInfo);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    return ModelHelper.decode(response.readEntity(VersionInfo.class));
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("Schema not registered.");
                }
            }
            return (io.pravega.schemaregistry.contract.data.VersionInfo)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error. Failed to get schema version.");
        });
    }

    @Override
    public boolean validateSchema(String groupId, io.pravega.schemaregistry.contract.data.SchemaInfo schemaInfo) {
        return this.withRetry(() -> {
            ValidateRequest validateRequest = new ValidateRequest().schemaInfo(ModelHelper.encode(schemaInfo));
            Response response = this.groupProxy.validate(this.namespace, groupId, validateRequest);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    return response.readEntity(Valid.class).isValid();
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("Group not found.");
                }
            }
            return (Boolean)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error.");
        });
    }

    @Override
    public boolean canReadUsing(String groupId, io.pravega.schemaregistry.contract.data.SchemaInfo schemaInfo) {
        return this.withRetry(() -> {
            SchemaInfo request = ModelHelper.encode(schemaInfo);
            Response response = this.groupProxy.canRead(this.namespace, groupId, request);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    return response.readEntity(CanRead.class).isCompatible();
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("Schema not found.");
                }
            }
            return (Boolean)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Internal Service error.");
        });
    }

    @Override
    public List<CodecType> getCodecTypes(String groupId) {
        return this.withRetry(() -> {
            Response response = this.groupProxy.getCodecTypesList(this.namespace, groupId);
            CodecTypes list = response.readEntity(CodecTypes.class);
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case OK: {
                    return list.getCodecTypes().stream().map(ModelHelper::decode).collect(Collectors.toList());
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("Group not found.");
                }
            }
            return (List)this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Failed to get codecTypes. Internal server error.");
        });
    }

    @Override
    public void addCodecType(String groupId, CodecType codecType) {
        this.withRetry(() -> {
            Response response = this.groupProxy.addCodecType(this.namespace, groupId, ModelHelper.encode(codecType));
            switch (Response.Status.fromStatusCode(response.getStatus())) {
                case CREATED: {
                    return;
                }
                case NOT_FOUND: {
                    throw new RegistryExceptions.ResourceNotFoundException("Group not found.");
                }
            }
            this.handleResponse(Response.Status.fromStatusCode(response.getStatus()), "Failed to add codec type. Internal server error.");
        });
    }

    private <T> T withRetry(Supplier<T> supplier) {
        return (T)RETRY.run(supplier::get);
    }

    private void withRetry(Runnable runnable) {
        RETRY.run(() -> {
            runnable.run();
            return null;
        });
    }

    private <T> T handleResponse(Response.Status status, String errorMessage) {
        switch (status) {
            case UNAUTHORIZED: 
            case FORBIDDEN: {
                throw new RegistryExceptions.UnauthorizedException("User not authorized.");
            }
        }
        throw new RegistryExceptions.InternalServerError(errorMessage);
    }

    @Override
    public void close() throws Exception {
        if (this.client != null) {
            this.client.close();
        }
    }

    private SSLContext getSSLContext(SchemaRegistryClientConfig config) {
        try {
            KeyStore trustStore;
            if (config.getTrustStore() != null) {
                trustStore = this.getTrustStore(config);
            } else if (config.getCertificate() != null) {
                trustStore = CertificateUtils.createTrustStore(config.getCertificate());
            } else {
                return SSLContext.getDefault();
            }
            TrustManagerFactory factory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            factory.init(trustStore);
            SSLContext tlsContext = SSLContext.getInstance(TLS);
            tlsContext.init(null, factory.getTrustManagers(), null);
            return tlsContext;
        }
        catch (KeyManagementException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
            throw new IllegalStateException("Failure initializing trust store", e);
        }
    }

    private KeyStore getTrustStore(SchemaRegistryClientConfig config) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException {
        KeyStore trustStore = KeyStore.getInstance(config.getTrustStoreType());
        try (FileInputStream fin = new FileInputStream(config.getTrustStore());){
            String trustStorePassword = config.getTrustStorePassword();
            if (trustStorePassword != null) {
                trustStore.load(fin, trustStorePassword.toCharArray());
            } else {
                trustStore.load(fin, null);
            }
        }
        return trustStore;
    }
}

