/*
 * Decompiled with CFR 0.152.
 */
package ai.superstream;

import ai.superstream.Consts;
import ai.superstream.SuperstreamCounters;
import ai.superstream.SuperstreamDeserializer;
import ai.superstream.SuperstreamSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Subscription;
import io.nats.client.api.ServerInfo;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Superstream {
    public Connection brokerConnection;
    public JetStream jetstream;
    public String superstreamJwt;
    public String superstreamNkey;
    public byte[] descriptorAsBytes;
    public Descriptors.Descriptor descriptor;
    public String natsConnectionID;
    public int clientID;
    public String accountName;
    public int learningFactor = 20;
    public int learningFactorCounter = 0;
    public boolean learningRequestSent = false;
    private static final ObjectMapper objectMapper = new ObjectMapper();
    public String ProducerSchemaID = "0";
    public String ConsumerSchemaID = "0";
    public Map<String, Descriptors.Descriptor> SchemaIDMap = new HashMap<String, Descriptors.Descriptor>();
    public Map<String, ?> config;
    public SuperstreamCounters clientCounters = new SuperstreamCounters();
    private Subscription subscription;

    public Superstream(String token, String host, Integer learnfactor, String type, Map<String, ?> configs) {
        this.learningFactor = learnfactor;
        try {
            this.initializeNatsConnection(token, host);
            this.registerClient(configs);
            this.subscribeToUpdates();
            this.reportClientsUpdate();
            switch (type) {
                case "producer": {
                    this.sendClientTypeUpdateReq("producer");
                    break;
                }
                case "consumer": {
                    this.sendClientTypeUpdateReq("consumer");
                    break;
                }
                default: {
                    throw new Exception(type + " is not a valid client type");
                }
            }
        }
        catch (Exception e) {
            this.handleError(e.getMessage());
        }
    }

    public void close() {
        try {
            if (this.brokerConnection != null) {
                this.brokerConnection.close();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private void initializeNatsConnection(String token, String host) {
        try {
            Options options = new Options.Builder().server(host).userInfo("superstream_internal", token).maxReconnects(-1).reconnectWait(Duration.ofSeconds(1L)).connectionListener(new ConnectionListener(){

                public void connectionEvent(Connection conn, ConnectionListener.Events type) {
                    if (type == ConnectionListener.Events.DISCONNECTED) {
                        System.out.println("superstream: Disconnected");
                    } else if (type == ConnectionListener.Events.RECONNECTED) {
                        try {
                            Superstream.this.natsConnectionID = Superstream.this.generateNatsConnectionID();
                            HashMap<String, Object> reqData = new HashMap<String, Object>();
                            reqData.put("new_nats_connection_id", Superstream.this.natsConnectionID);
                            reqData.put("client_id", Superstream.this.clientID);
                            ObjectMapper mapper = new ObjectMapper();
                            byte[] reqBytes = mapper.writeValueAsBytes(reqData);
                            Superstream.this.brokerConnection.request("internal.clientReconnectionUpdate", reqBytes, Duration.ofSeconds(30L));
                        }
                        catch (Exception e) {
                            System.out.println("superstream: Failed to send reconnection update: " + e.getMessage());
                        }
                        System.out.println("superstream: Reconnected to superstream");
                    }
                }
            }).build();
            Connection nc = Nats.connect((Options)options);
            JetStream js = nc.jetStream();
            this.brokerConnection = nc;
            this.jetstream = js;
            this.natsConnectionID = this.generateNatsConnectionID();
        }
        catch (Exception e) {
            System.out.println(String.format("superstream: %s", e.getMessage()));
        }
    }

    private String generateNatsConnectionID() {
        ServerInfo serverInfo = this.brokerConnection.getServerInfo();
        String connectedServerName = serverInfo.getServerName();
        this.clientID = serverInfo.getClientId();
        return connectedServerName + ":" + this.clientID;
    }

    public void registerClient(Map<String, ?> configs) {
        try {
            HashMap<String, Object> reqData = new HashMap<String, Object>();
            reqData.put("nats_connection_id", this.natsConnectionID);
            reqData.put("language", "java");
            reqData.put("learning_factor", this.learningFactor);
            reqData.put("version", "1.0.0");
            reqData.put("config", Superstream.normalizeClientConfig(configs));
            ObjectMapper mapper = new ObjectMapper();
            byte[] reqBytes = mapper.writeValueAsBytes(reqData);
            Message reply = this.brokerConnection.request("internal.registerClient", reqBytes, Duration.ofSeconds(30L));
            if (reply != null) {
                Map replyData = (Map)mapper.readValue(reply.getData(), Map.class);
                Object clientIDObject = replyData.get("client_id");
                if (clientIDObject instanceof Integer) {
                    this.clientID = (Integer)clientIDObject;
                } else if (clientIDObject instanceof String) {
                    try {
                        this.clientID = Integer.parseInt((String)clientIDObject);
                    }
                    catch (NumberFormatException e) {
                        System.err.println("superstream: client_id is not a valid integer: " + clientIDObject);
                    }
                } else {
                    System.err.println("superstream: client_id is not a valid integer: " + clientIDObject);
                }
                Object accountNameObject = replyData.get("account_name");
                if (accountNameObject != null) {
                    this.accountName = accountNameObject.toString();
                } else {
                    System.err.println("superstream: account_name is not a valid string: " + accountNameObject);
                }
                Object learningFactorObject = replyData.get("learning_factor");
                if (learningFactorObject instanceof Integer) {
                    this.learningFactor = (Integer)learningFactorObject;
                } else if (learningFactorObject instanceof String) {
                    try {
                        this.learningFactor = Integer.parseInt((String)learningFactorObject);
                    }
                    catch (NumberFormatException e) {
                        System.err.println("superstream: learning_factor is not a valid integer: " + learningFactorObject);
                    }
                } else {
                    System.err.println("superstream: learning_factor is not a valid integer: " + learningFactorObject);
                }
            } else {
                System.out.println("superstream: registering client: No reply received within the timeout period.");
            }
        }
        catch (Exception e) {
            System.out.println(String.format("superstream: %s", e.getMessage()));
        }
    }

    public void sendClientTypeUpdateReq(String clientType) {
        try {
            HashMap<String, Object> reqData = new HashMap<String, Object>();
            reqData.put("client_id", this.clientID);
            reqData.put("type", clientType);
            ObjectMapper mapper = new ObjectMapper();
            byte[] reqBytes = mapper.writeValueAsBytes(reqData);
            this.brokerConnection.request("internal.clientTypeUpdate", reqBytes, Duration.ofSeconds(30L));
        }
        catch (Exception e) {
            this.handleError(String.format("sendClientTypeUpdateReq: %s", e.getMessage()));
        }
    }

    public void subscribeToUpdates() {
        try {
            String subject = String.format("internal.updates.%d", this.clientID);
            Dispatcher dispatcher = this.brokerConnection.createDispatcher(this.updatesHandler());
            this.subscription = dispatcher.subscribe(subject, this.updatesHandler());
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void reportClientsUpdate() {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> {
            try {
                byte[] byteCounters = objectMapper.writeValueAsBytes((Object)this.clientCounters);
                this.brokerConnection.publish(String.format("internal_tasks.clientsUpdate.%s.%d", "counters", this.clientID), byteCounters);
            }
            catch (Exception e) {
                this.handleError("reportClientsUpdate: " + e.getMessage());
            }
        }, 0L, 30L, TimeUnit.SECONDS);
    }

    public void sendLearningMessage(byte[] msg) {
        try {
            this.brokerConnection.publish(String.format("internal.schema.learnSchema.%d", this.clientID), msg);
        }
        catch (Exception e) {
            this.handleError("sendLearningMessage: " + e.getMessage());
        }
    }

    public void sendRegisterSchemaReq() {
        try {
            this.brokerConnection.publish(String.format("internal_tasks.schema.registerSchema.%d", this.clientID), new byte[0]);
            this.learningRequestSent = true;
        }
        catch (Exception e) {
            this.handleError("sendLearningMessage: " + e.getMessage());
        }
    }

    public byte[] jsonToProto(byte[] msgBytes) throws IOException {
        try {
            String jsonString = new String(msgBytes);
            DynamicMessage.Builder newMessageBuilder = DynamicMessage.newBuilder((Descriptors.Descriptor)this.descriptor);
            JsonFormat.parser().merge(jsonString, (Message.Builder)newMessageBuilder);
            DynamicMessage message = newMessageBuilder.build();
            return message.toByteArray();
        }
        catch (Exception e) {
            if (e.getMessage().contains("Cannot find field")) {
                return msgBytes;
            }
            return msgBytes;
        }
    }

    public byte[] protoToJson(byte[] msgBytes, Descriptors.Descriptor desc) throws IOException {
        try {
            DynamicMessage message = DynamicMessage.parseFrom((Descriptors.Descriptor)desc, (byte[])msgBytes);
            String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print((MessageOrBuilder)message);
            return jsonString.getBytes(StandardCharsets.UTF_8);
        }
        catch (Exception e) {
            if (e.getMessage().contains("the input ended unexpectedly")) {
                return msgBytes;
            }
            return msgBytes;
        }
    }

    private MessageHandler updatesHandler() {
        return msg -> {
            try {
                Map update = (Map)objectMapper.readValue(msg.getData(), Map.class);
                this.processUpdate(update);
            }
            catch (IOException e) {
                this.handleError("updatesHandler at json.Unmarshal: " + e.getMessage());
            }
        };
    }

    private void processUpdate(Map<String, Object> update) {
        if ("LearnedSchema".equals(update.get("type"))) {
            try {
                String schemaID;
                String payloadBytesString = (String)update.get("payload");
                byte[] payloadBytes = Base64.getDecoder().decode(payloadBytesString);
                Map payload = (Map)objectMapper.readValue(payloadBytes, Map.class);
                String descriptorBytesString = (String)payload.get("desc");
                String masterMsgName = (String)payload.get("master_msg_name");
                String fileName = (String)payload.get("file_name");
                this.descriptor = this.compileMsgDescriptor(descriptorBytesString, masterMsgName, fileName);
                this.ProducerSchemaID = schemaID = (String)payload.get("schema_id");
            }
            catch (Exception e) {
                this.handleError("processUpdate: " + e.getMessage());
            }
        }
    }

    public void sendGetSchemaRequest(String schemaID) {
        try {
            Descriptors.Descriptor respDescriptor;
            HashMap<String, String> reqData = new HashMap<String, String>();
            reqData.put("schema_id", schemaID);
            ObjectMapper mapper = new ObjectMapper();
            byte[] reqBytes = mapper.writeValueAsBytes(reqData);
            Message msg = this.brokerConnection.request(String.format("internal.schema.getSchema.%d", this.clientID), reqBytes, Duration.ofSeconds(30L));
            if (msg == null) {
                throw new Exception("Could not get descriptor");
            }
            Map respMap = (Map)objectMapper.readValue(new String(msg.getData(), StandardCharsets.UTF_8), Map.class);
            if (respMap.containsKey("desc") && respMap.get("desc") instanceof String) {
                String fileName;
                String masterMsgName;
                String descriptorBytesString = (String)respMap.get("desc");
                respDescriptor = this.compileMsgDescriptor(descriptorBytesString, masterMsgName = (String)respMap.get("master_msg_name"), fileName = (String)respMap.get("file_name"));
                if (respDescriptor == null) {
                    throw new Exception("Error compiling schema.");
                }
            } else {
                throw new Exception("Response map does not contain expected keys.");
            }
            this.SchemaIDMap.put((String)respMap.get("schema_id"), respDescriptor);
        }
        catch (Exception e) {
            this.handleError(String.format("sendGetSchemaRequest: %s", e.getMessage()));
        }
    }

    private Descriptors.Descriptor compileMsgDescriptor(String descriptorBytesString, String masterMsgName, String fileName) {
        try {
            byte[] descriptorAsBytes = Base64.getDecoder().decode(descriptorBytesString);
            if (descriptorAsBytes == null) {
                throw new Exception("error decoding descriptor bytes");
            }
            DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom((byte[])descriptorAsBytes);
            Descriptors.FileDescriptor fileDescriptor = null;
            for (DescriptorProtos.FileDescriptorProto fdp : descriptorSet.getFileList()) {
                if (!fdp.getName().equals(fileName)) continue;
                fileDescriptor = Descriptors.FileDescriptor.buildFrom((DescriptorProtos.FileDescriptorProto)fdp, (Descriptors.FileDescriptor[])new Descriptors.FileDescriptor[0]);
                break;
            }
            if (fileDescriptor == null) {
                throw new Exception("file not found");
            }
            for (Descriptors.Descriptor md : fileDescriptor.getMessageTypes()) {
                if (!md.getName().equals(masterMsgName)) continue;
                return md;
            }
        }
        catch (Exception e) {
            this.handleError(String.format("compileMsgDescriptor: %s", e.getMessage()));
        }
        return null;
    }

    public void handleError(String msg) {
        if (this.brokerConnection != null) {
            String message = String.format("[account name: %s][clientID: %d][sdk: java][version: %s] %s", this.accountName, this.clientID, "1.0.0", msg);
            this.brokerConnection.publish("internal.clientErrors", message.getBytes(StandardCharsets.UTF_8));
        }
    }

    public static Map<String, Object> normalizeClientConfig(Map<String, ?> javaConfig) {
        HashMap<String, Object> superstreamConfig = new HashMap<String, Object>();
        Superstream.mapIfPresent(javaConfig, "max.request.size", superstreamConfig, "producer_max_messages_bytes");
        Superstream.mapIfPresent(javaConfig, "acks", superstreamConfig, "producer_required_acks");
        Superstream.mapIfPresent(javaConfig, "delivery.timeout.ms", superstreamConfig, "producer_timeout");
        Superstream.mapIfPresent(javaConfig, "retries", superstreamConfig, "producer_retry_max");
        Superstream.mapIfPresent(javaConfig, "retry.backoff.ms", superstreamConfig, "producer_retry_backoff");
        Superstream.mapIfPresent(javaConfig, "compression.type", superstreamConfig, "producer_compression_level");
        Superstream.mapIfPresent(javaConfig, "fetch.min.bytes", superstreamConfig, "consumer_fetch_min");
        Superstream.mapIfPresent(javaConfig, "fetch.max.bytes", superstreamConfig, "consumer_fetch_default");
        Superstream.mapIfPresent(javaConfig, "retry.backoff.ms", superstreamConfig, "consumer_retry_backoff");
        Superstream.mapIfPresent(javaConfig, "max.poll.interval.ms", superstreamConfig, "consumer_max_wait_time");
        Superstream.mapIfPresent(javaConfig, "max.poll.records", superstreamConfig, "consumer_max_processing_time");
        Superstream.mapIfPresent(javaConfig, "enable.auto.commit", superstreamConfig, "consumer_offset_auto_commit_enable");
        Superstream.mapIfPresent(javaConfig, "auto.commit.interval.ms", superstreamConfig, "consumer_offset_auto_commit_interval");
        Superstream.mapIfPresent(javaConfig, "session.timeout.ms", superstreamConfig, "consumer_group_session_timeout");
        Superstream.mapIfPresent(javaConfig, "heartbeat.interval.ms", superstreamConfig, "consumer_group_heart_beat_interval");
        Superstream.mapIfPresent(javaConfig, "retry.backoff.ms", superstreamConfig, "consumer_group_rebalance_retry_back_off");
        Superstream.mapIfPresent(javaConfig, "auto.offset.reset", superstreamConfig, "consumer_group_rebalance_reset_invalid_offsets");
        Superstream.mapIfPresent(javaConfig, "group.id", superstreamConfig, "consumer_group_id");
        Superstream.mapIfPresent(javaConfig, "bootstrap.servers", superstreamConfig, "servers");
        return superstreamConfig;
    }

    private static void mapIfPresent(Map<String, ?> javaConfig, String javaKey, Map<String, Object> superstreamConfig, String superstreamKey) {
        if (javaConfig.containsKey(javaKey)) {
            superstreamConfig.put(superstreamKey, javaConfig.get(javaKey));
        }
    }

    public static Map<String, Object> initSuperstreamConfig(Map<String, Object> configs) {
        Map<String, String> envVars;
        if (configs.containsKey("value.deserializer") && !configs.containsKey("original.deserializer")) {
            configs.put("original.deserializer", configs.get("value.deserializer"));
            configs.put("value.deserializer", SuperstreamDeserializer.class.getName());
        }
        if (configs.containsKey("value.serializer") && !configs.containsKey("original.serializer")) {
            configs.put("original.serializer", configs.get("value.serializer"));
            configs.put("value.serializer", SuperstreamSerializer.class.getName());
        }
        if ((envVars = System.getenv()).containsKey("SUPERSTREAM_TOKEN")) {
            configs.put("superstream.token", envVars.get("SUPERSTREAM_TOKEN"));
        }
        if (envVars.containsKey("SUPERSTREAM_HOST")) {
            configs.put("superstream.host", envVars.get("SUPERSTREAM_HOST"));
        } else {
            configs.put("superstream.host", "broker.superstream.dev");
        }
        if (envVars.containsKey("SUPERSTREAM_LEARNING_FACTOR")) {
            String learningFactorString = envVars.get("SUPERSTREAM_LEARNING_FACTOR");
            Integer learningFactor = Integer.parseInt(learningFactorString);
            configs.put("superstream.learning.factor", learningFactor);
        } else {
            configs.put("superstream.learning.factor", Consts.superstreamDefaultLearningFactor);
        }
        return configs;
    }

    public static Properties initSuperstreamProps(Properties properties) {
        Map<String, String> envVars;
        if (properties.containsKey("value.deserializer") && !properties.containsKey("original.deserializer")) {
            properties.put("original.deserializer", properties.get("value.deserializer"));
            properties.put("value.deserializer", SuperstreamDeserializer.class.getName());
        }
        if (properties.containsKey("value.serializer") && !properties.containsKey("original.serializer")) {
            properties.put("original.serializer", properties.get("value.serializer"));
            properties.put("value.serializer", SuperstreamSerializer.class.getName());
        }
        if ((envVars = System.getenv()).containsKey("SUPERSTREAM_TOKEN")) {
            properties.put("superstream.token", envVars.get("SUPERSTREAM_TOKEN"));
        }
        if (envVars.containsKey("SUPERSTREAM_HOST")) {
            properties.put("superstream.host", envVars.get("SUPERSTREAM_HOST"));
        } else {
            properties.put("superstream.host", "broker.superstream.dev");
        }
        if (envVars.containsKey("SUPERSTREAM_LEARNING_FACTOR")) {
            String learningFactorString = envVars.get("SUPERSTREAM_LEARNING_FACTOR");
            Integer learningFactor = Integer.parseInt(learningFactorString);
            properties.put("superstream.learning.factor", learningFactor);
        } else {
            properties.put("superstream.learning.factor", Consts.superstreamDefaultLearningFactor);
        }
        return properties;
    }
}

