package ai.superstream;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.util.JsonFormat;
import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.JetStream;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Subscription;
import io.nats.client.api.ServerInfo;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

/* loaded from: input_file:ai/superstream/Superstream.class */
public class Superstream {
    public Connection brokerConnection;
    public JetStream jetstream;
    public String superstreamJwt;
    public String superstreamNkey;
    public byte[] descriptorAsBytes;
    public Descriptors.Descriptor descriptor;
    public String natsConnectionID;
    public int clientID;
    public String accountName;
    public int learningFactor;
    private static final ObjectMapper objectMapper = new ObjectMapper();
    public Map<String, Object> configs;
    private Subscription updatesSubscription;
    private String host;
    private String token;
    public String type;
    public Boolean reductionEnabled;
    public int learningFactorCounter = 0;
    public boolean learningRequestSent = false;
    public String ProducerSchemaID = "0";
    public String ConsumerSchemaID = "0";
    public Map<String, Descriptors.Descriptor> SchemaIDMap = new HashMap();
    public SuperstreamCounters clientCounters = new SuperstreamCounters();
    public Map<String, Set<Integer>> topicPartitions = new ConcurrentHashMap();
    public ExecutorService executorService = Executors.newCachedThreadPool();
    private Integer kafkaConnectionID = 0;

    public Superstream(String str, String str2, Integer num, Map<String, Object> map, Boolean bool, String str3) {
        this.learningFactor = 20;
        this.learningFactor = num.intValue();
        this.token = str;
        this.host = str2;
        this.configs = map;
        this.reductionEnabled = bool;
        this.type = str3;
    }

    public void init() {
        try {
            initializeNatsConnection(this.token, this.host);
            if (this.brokerConnection != null) {
                registerClient(this.configs);
                subscribeToUpdates();
                reportClientsUpdate();
                sendClientTypeUpdateReq();
            }
        } catch (Exception e) {
            handleError(e.getMessage());
        }
    }

    public void close() {
        try {
            if (this.brokerConnection != null) {
                this.brokerConnection.close();
            }
            this.executorService.shutdown();
        } catch (Exception e) {
        }
    }

    private void initializeNatsConnection(String str, String str2) {
        try {
            Connection connect = Nats.connect(new Options.Builder().server(str2).userInfo(Consts.superstreamInternalUsername, str).maxReconnects(-1).reconnectWait(Duration.ofSeconds(1L)).connectionListener(new ConnectionListener() { // from class: ai.superstream.Superstream.1
                public void connectionEvent(Connection connection, ConnectionListener.Events events) {
                    if (events == ConnectionListener.Events.DISCONNECTED) {
                        System.out.println("superstream: Disconnected");
                        return;
                    }
                    if (events == ConnectionListener.Events.RECONNECTED) {
                        try {
                            if (Superstream.this.brokerConnection != null) {
                                Superstream.this.natsConnectionID = Superstream.this.generateNatsConnectionID();
                                HashMap hashMap = new HashMap();
                                hashMap.put("new_nats_connection_id", Superstream.this.natsConnectionID);
                                hashMap.put("client_id", Integer.valueOf(Superstream.this.clientID));
                                Superstream.this.brokerConnection.request(Consts.clientReconnectionUpdateSubject, new ObjectMapper().writeValueAsBytes(hashMap), Duration.ofSeconds(30L));
                            }
                        } catch (Exception e) {
                            System.out.println("superstream: Failed to send reconnection update: " + e.getMessage());
                        }
                        System.out.println("superstream: Reconnected to superstream");
                    }
                }
            }).build());
            if (connect == null) {
                throw new Exception(String.format("Failed to connect to host: %s", str2));
            }
            JetStream jetStream = connect.jetStream();
            if (jetStream == null) {
                throw new Exception(String.format("Failed to connect to host: %s", str2));
            }
            this.brokerConnection = connect;
            this.jetstream = jetStream;
            this.natsConnectionID = generateNatsConnectionID();
        } catch (Exception e) {
            System.out.println(String.format("superstream: %s", e.getMessage()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String generateNatsConnectionID() {
        ServerInfo serverInfo = this.brokerConnection.getServerInfo();
        String serverName = serverInfo.getServerName();
        this.clientID = serverInfo.getClientId();
        return serverName + ":" + this.clientID;
    }

    public void registerClient(Map<String, ?> map) {
        try {
            this.kafkaConnectionID = 0;
            HashMap hashMap = new HashMap();
            hashMap.put("nats_connection_id", this.natsConnectionID);
            hashMap.put("language", "java");
            hashMap.put("learning_factor", Integer.valueOf(this.learningFactor));
            hashMap.put("version", Consts.sdkVersion);
            hashMap.put("config", normalizeClientConfig(map));
            hashMap.put("reduction_enabled", this.reductionEnabled);
            hashMap.put("connection_id", this.kafkaConnectionID);
            ObjectMapper objectMapper2 = new ObjectMapper();
            Message request = this.brokerConnection.request(Consts.clientRegisterSubject, objectMapper2.writeValueAsBytes(hashMap), Duration.ofSeconds(30L));
            if (request != null) {
                Map map2 = (Map) objectMapper2.readValue(request.getData(), Map.class);
                Object obj = map2.get("client_id");
                if (obj instanceof Integer) {
                    this.clientID = ((Integer) obj).intValue();
                } else if (obj instanceof String) {
                    try {
                        this.clientID = Integer.parseInt((String) obj);
                    } catch (NumberFormatException e) {
                        System.err.println("superstream: client_id is not a valid integer: " + obj);
                    }
                } else {
                    System.err.println("superstream: client_id is not a valid integer: " + obj);
                }
                Object obj2 = map2.get("account_name");
                if (obj2 != null) {
                    this.accountName = obj2.toString();
                } else {
                    System.err.println("superstream: account_name is not a valid string: " + obj2);
                }
                Object obj3 = map2.get("learning_factor");
                if (obj3 instanceof Integer) {
                    this.learningFactor = ((Integer) obj3).intValue();
                } else if (obj3 instanceof String) {
                    try {
                        this.learningFactor = Integer.parseInt((String) obj3);
                    } catch (NumberFormatException e2) {
                        System.err.println("superstream: learning_factor is not a valid integer: " + obj3);
                    }
                } else {
                    System.err.println("superstream: learning_factor is not a valid integer: " + obj3);
                }
            } else {
                System.out.println("superstream: registering client: No reply received within the timeout period.");
            }
        } catch (Exception e3) {
            System.out.println(String.format("superstream: %s", e3.getMessage()));
        }
    }

    private String consumeConnectionID() {
        Properties copyAuthConfig = copyAuthConfig();
        copyAuthConfig.put("key.deserializer", StringDeserializer.class.getName());
        copyAuthConfig.put("value.deserializer", StringDeserializer.class.getName());
        copyAuthConfig.put("auto.offset.reset", "earliest");
        copyAuthConfig.put(Consts.superstreamInnerConsumerKey, "true");
        String str = null;
        KafkaConsumer kafkaConsumer = new KafkaConsumer(copyAuthConfig);
        try {
            kafkaConsumer.assign(Collections.singletonList(new TopicPartition(Consts.superstreamMetadataTopic, 0)));
            Iterator it = kafkaConsumer.poll(Duration.ofSeconds(10L)).iterator();
            if (it.hasNext()) {
                str = (String) ((ConsumerRecord) it.next()).value();
            }
            return str;
        } finally {
            kafkaConsumer.close();
        }
    }

    private Properties copyAuthConfig() {
        Properties properties = new Properties();
        for (String str : new String[]{"security.protocol", "ssl.truststore.location", "ssl.truststore.password", "ssl.keystore.location", "ssl.keystore.password", "ssl.key.password", "ssl.endpoint.identification.algorithm", "sasl.mechanism", "sasl.jaas.config", "sasl.kerberos.service.name", "bootstrap.servers", "client.dns.lookup", "connections.max.idle.ms", "request.timeout.ms", "metadata.max.age.ms", "reconnect.backoff.ms", "reconnect.backoff.max.ms"}) {
            if (this.configs.containsKey(str)) {
                properties.put(str, String.valueOf(this.configs.get(str)));
            }
        }
        return properties;
    }

    public void sendClientTypeUpdateReq() {
        if (this.type == "" || this.type == null) {
            return;
        }
        if (this.type == "consumer" || this.type == "producer") {
            try {
                HashMap hashMap = new HashMap();
                hashMap.put("client_id", Integer.valueOf(this.clientID));
                hashMap.put("type", this.type);
                this.brokerConnection.request(Consts.clientTypeUpdateSubject, new ObjectMapper().writeValueAsBytes(hashMap), Duration.ofSeconds(30L));
            } catch (Exception e) {
                handleError(String.format("sendClientTypeUpdateReq: %s", e.getMessage()));
            }
        }
    }

    public void subscribeToUpdates() {
        try {
            this.updatesSubscription = this.brokerConnection.createDispatcher(updatesHandler()).subscribe(String.format(Consts.superstreamUpdatesSubject, Integer.valueOf(this.clientID)), updatesHandler());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void reportClientsUpdate() {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            try {
                byte[] writeValueAsBytes = objectMapper.writeValueAsBytes(this.clientCounters);
                HashMap hashMap = new HashMap();
                if (!this.topicPartitions.isEmpty()) {
                    Map<String, Integer[]> convertMap = convertMap(this.topicPartitions);
                    String str = this.type;
                    boolean z = -1;
                    switch (str.hashCode()) {
                        case -1003761774:
                            if (str.equals("producer")) {
                                z = false;
                                break;
                            }
                            break;
                        case -567770122:
                            if (str.equals("consumer")) {
                                z = true;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            hashMap.put("producer_topics_partitions", convertMap);
                            hashMap.put("consumer_group_topics_partitions", new HashMap());
                            break;
                        case true:
                            hashMap.put("producer_topics_partitions", new HashMap());
                            hashMap.put("consumer_group_topics_partitions", convertMap);
                            break;
                    }
                }
                byte[] writeValueAsBytes2 = objectMapper.writeValueAsBytes(hashMap);
                this.brokerConnection.publish(String.format(Consts.superstreamClientsUpdateSubject, "counters", Integer.valueOf(this.clientID)), writeValueAsBytes);
                this.brokerConnection.publish(String.format(Consts.superstreamClientsUpdateSubject, "config", Integer.valueOf(this.clientID)), writeValueAsBytes2);
            } catch (Exception e) {
                handleError("reportClientsUpdate: " + e.getMessage());
            }
        }, 0L, 30L, TimeUnit.SECONDS);
    }

    public static Map<String, Integer[]> convertMap(Map<String, Set<Integer>> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Set<Integer>> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), (Integer[]) entry.getValue().toArray(new Integer[0]));
        }
        return hashMap;
    }

    public void sendLearningMessage(byte[] bArr) {
        try {
            this.brokerConnection.publish(String.format(Consts.superstreamLearningSubject, Integer.valueOf(this.clientID)), bArr);
        } catch (Exception e) {
            handleError("sendLearningMessage: " + e.getMessage());
        }
    }

    public void sendRegisterSchemaReq() {
        try {
            this.brokerConnection.publish(String.format(Consts.superstreamRegisterSchemaSubject, Integer.valueOf(this.clientID)), new byte[0]);
            this.learningRequestSent = true;
        } catch (Exception e) {
            handleError("sendLearningMessage: " + e.getMessage());
        }
    }

    public byte[] jsonToProto(byte[] bArr) throws IOException {
        try {
            String str = new String(bArr);
            DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(this.descriptor);
            JsonFormat.parser().merge(str, newBuilder);
            return newBuilder.build().toByteArray();
        } catch (Exception e) {
            return e.getMessage().contains("Cannot find field") ? bArr : bArr;
        }
    }

    public byte[] protoToJson(byte[] bArr, Descriptors.Descriptor descriptor) throws IOException {
        try {
            return JsonFormat.printer().omittingInsignificantWhitespace().print(DynamicMessage.parseFrom(descriptor, bArr)).getBytes(StandardCharsets.UTF_8);
        } catch (Exception e) {
            return e.getMessage().contains("the input ended unexpectedly") ? bArr : bArr;
        }
    }

    private MessageHandler updatesHandler() {
        return message -> {
            try {
                processUpdate((Map) objectMapper.readValue(message.getData(), Map.class));
            } catch (IOException e) {
                handleError("updatesHandler at json.Unmarshal: " + e.getMessage());
            }
        };
    }

    private void processUpdate(Map<String, Object> map) {
        String str = (String) map.get("type");
        try {
            Map map2 = (Map) objectMapper.readValue(Base64.getDecoder().decode((String) map.get("payload")), Map.class);
            boolean z = -1;
            switch (str.hashCode()) {
                case -1664004769:
                    if (str.equals("ToggleReduction")) {
                        z = true;
                        break;
                    }
                    break;
                case -104073916:
                    if (str.equals("LearnedSchema")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    this.descriptor = compileMsgDescriptor((String) map2.get("desc"), (String) map2.get("master_msg_name"), (String) map2.get("file_name"));
                    this.ProducerSchemaID = (String) map2.get("schema_id");
                    break;
                case true:
                    if (!((Boolean) map2.get("enable_reduction")).booleanValue()) {
                        this.reductionEnabled = false;
                        break;
                    } else {
                        this.reductionEnabled = true;
                        break;
                    }
            }
        } catch (Exception e) {
            handleError("processUpdate: " + e.getMessage());
        }
    }

    public void sendGetSchemaRequest(String str) {
        try {
            HashMap hashMap = new HashMap();
            hashMap.put("schema_id", str);
            Message request = this.brokerConnection.request(String.format(Consts.superstreamGetSchemaSubject, Integer.valueOf(this.clientID)), new ObjectMapper().writeValueAsBytes(hashMap), Duration.ofSeconds(30L));
            if (request == null) {
                throw new Exception("Could not get descriptor");
            }
            Map map = (Map) objectMapper.readValue(new String(request.getData(), StandardCharsets.UTF_8), Map.class);
            if (!map.containsKey("desc") || !(map.get("desc") instanceof String)) {
                throw new Exception("Response map does not contain expected keys.");
            }
            Descriptors.Descriptor compileMsgDescriptor = compileMsgDescriptor((String) map.get("desc"), (String) map.get("master_msg_name"), (String) map.get("file_name"));
            if (compileMsgDescriptor == null) {
                throw new Exception("Error compiling schema.");
            }
            this.SchemaIDMap.put((String) map.get("schema_id"), compileMsgDescriptor);
        } catch (Exception e) {
            handleError(String.format("sendGetSchemaRequest: %s", e.getMessage()));
        }
    }

    private Descriptors.Descriptor compileMsgDescriptor(String str, String str2, String str3) {
        try {
            byte[] decode = Base64.getDecoder().decode(str);
            if (decode == null) {
                throw new Exception("error decoding descriptor bytes");
            }
            Descriptors.FileDescriptor fileDescriptor = null;
            Iterator it = DescriptorProtos.FileDescriptorSet.parseFrom(decode).getFileList().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                DescriptorProtos.FileDescriptorProto fileDescriptorProto = (DescriptorProtos.FileDescriptorProto) it.next();
                if (fileDescriptorProto.getName().equals(str3)) {
                    fileDescriptor = Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, new Descriptors.FileDescriptor[0]);
                    break;
                }
            }
            if (fileDescriptor == null) {
                throw new Exception("file not found");
            }
            for (Descriptors.Descriptor descriptor : fileDescriptor.getMessageTypes()) {
                if (descriptor.getName().equals(str2)) {
                    return descriptor;
                }
            }
            return null;
        } catch (Exception e) {
            handleError(String.format("compileMsgDescriptor: %s", e.getMessage()));
            return null;
        }
    }

    public void handleError(String str) {
        if (this.brokerConnection != null) {
            this.brokerConnection.publish(Consts.superstreamErrorSubject, String.format("[account name: %s][clientID: %d][sdk: java][version: %s] %s", this.accountName, Integer.valueOf(this.clientID), Consts.sdkVersion, str).getBytes(StandardCharsets.UTF_8));
        }
    }

    public static Map<String, Object> normalizeClientConfig(Map<String, ?> map) {
        HashMap hashMap = new HashMap();
        mapIfPresent(map, "max.request.size", hashMap, "producer_max_messages_bytes");
        mapIfPresent(map, "acks", hashMap, "producer_required_acks");
        mapIfPresent(map, "delivery.timeout.ms", hashMap, "producer_timeout");
        mapIfPresent(map, "retries", hashMap, "producer_retry_max");
        mapIfPresent(map, "retry.backoff.ms", hashMap, "producer_retry_backoff");
        mapIfPresent(map, "compression.type", hashMap, "producer_compression_level");
        mapIfPresent(map, "fetch.min.bytes", hashMap, "consumer_fetch_min");
        mapIfPresent(map, "fetch.max.bytes", hashMap, "consumer_fetch_default");
        mapIfPresent(map, "retry.backoff.ms", hashMap, "consumer_retry_backoff");
        mapIfPresent(map, "max.poll.interval.ms", hashMap, "consumer_max_wait_time");
        mapIfPresent(map, "max.poll.records", hashMap, "consumer_max_processing_time");
        mapIfPresent(map, "auto.commit.interval.ms", hashMap, "consumer_offset_auto_commit_interval");
        mapIfPresent(map, "session.timeout.ms", hashMap, "consumer_group_session_timeout");
        mapIfPresent(map, "heartbeat.interval.ms", hashMap, "consumer_group_heart_beat_interval");
        mapIfPresent(map, "retry.backoff.ms", hashMap, "consumer_group_rebalance_retry_back_off");
        mapIfPresent(map, "group.id", hashMap, "consumer_group_id");
        mapIfPresent(map, "bootstrap.servers", hashMap, "servers");
        return hashMap;
    }

    private static void mapIfPresent(Map<String, ?> map, String str, Map<String, Object> map2, String str2) {
        if (map.containsKey(str)) {
            if (str == "bootstrap.servers") {
                map2.put(str2, Arrays.toString((String[]) map.get(str)));
            } else {
                map2.put(str2, map.get(str));
            }
        }
    }

    public static Map<String, Object> initSuperstreamConfig(Map<String, Object> map, String str) {
        Map<String, String> map2;
        String str2;
        String str3 = (String) map.get(Consts.superstreamInnerConsumerKey);
        if (str3 != null && str3.equals("true")) {
            return map;
        }
        String str4 = (String) map.get("interceptor.classes");
        boolean z = -1;
        switch (str.hashCode()) {
            case -1003761774:
                if (str.equals("producer")) {
                    z = false;
                    break;
                }
                break;
            case -567770122:
                if (str.equals("consumer")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                str4 = (str4 == null || str4.isEmpty()) ? SuperstreamProducerInterceptor.class.getName() : str4 + "," + SuperstreamProducerInterceptor.class.getName();
                if (map.containsKey("value.serializer") && !map.containsKey(Consts.originalSerializer)) {
                    map.put(Consts.originalSerializer, map.get("value.serializer"));
                    map.put("value.serializer", SuperstreamSerializer.class.getName());
                    break;
                }
                break;
            case true:
                str4 = (str4 == null || str4.isEmpty()) ? SuperstreamConsumerInterceptor.class.getName() : str4 + "," + SuperstreamConsumerInterceptor.class.getName();
                if (map.containsKey("value.deserializer") && !map.containsKey(Consts.originalDeserializer)) {
                    map.put(Consts.originalDeserializer, map.get("value.deserializer"));
                    map.put("value.deserializer", SuperstreamDeserializer.class.getName());
                    break;
                }
                break;
        }
        if (str4 != null) {
            map.put("interceptor.classes", str4);
        }
        try {
            map2 = System.getenv();
            str2 = map2.get("SUPERSTREAM_HOST");
        } catch (Exception e) {
            System.out.println(String.format("superstream: error initializing superstream: %s", e.getMessage()));
        }
        if (str2 == null) {
            throw new Exception("host is required");
        }
        map.put(Consts.superstreamHostKey, str2);
        String str5 = map2.get("SUPERSTREAM_TOKEN");
        if (str5 == null) {
            str5 = Consts.superstreamDefaultToken;
        }
        map.put(Consts.superstreamTokenKey, str5);
        String str6 = map2.get("SUPERSTREAM_LEARNING_FACTOR");
        Integer num = Consts.superstreamDefaultLearningFactor;
        if (str6 != null) {
            num = Integer.valueOf(Integer.parseInt(str6));
        }
        map.put(Consts.superstreamLearningFactorKey, num);
        Boolean bool = false;
        String str7 = map2.get("SUPERSTREAM_REDUCTION_ENABLED");
        if (str7 != null) {
            bool = Boolean.valueOf(Boolean.parseBoolean(str7));
        }
        map.put(Consts.superstreamReductionEnabledKey, bool);
        Superstream superstream = new Superstream(str5, str2, num, map, bool, str);
        superstream.init();
        map.put(Consts.superstreamConnectionKey, superstream);
        return map;
    }

    public static Properties initSuperstreamProps(Properties properties, String str) {
        Map<String, String> map;
        String str2;
        String str3 = (String) properties.get("interceptor.classes");
        boolean z = -1;
        switch (str.hashCode()) {
            case -1003761774:
                if (str.equals("producer")) {
                    z = false;
                    break;
                }
                break;
            case -567770122:
                if (str.equals("consumer")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                str3 = (str3 == null || str3.isEmpty()) ? SuperstreamProducerInterceptor.class.getName() : str3 + "," + SuperstreamProducerInterceptor.class.getName();
                if (properties.containsKey("value.serializer") && !properties.containsKey(Consts.originalSerializer)) {
                    properties.put(Consts.originalSerializer, properties.get("value.serializer"));
                    properties.put("value.serializer", SuperstreamSerializer.class.getName());
                    break;
                }
                break;
            case true:
                str3 = (str3 == null || str3.isEmpty()) ? SuperstreamConsumerInterceptor.class.getName() : str3 + "," + SuperstreamConsumerInterceptor.class.getName();
                if (properties.containsKey("value.deserializer") && !properties.containsKey(Consts.originalDeserializer)) {
                    properties.put(Consts.originalDeserializer, properties.get("value.deserializer"));
                    properties.put("value.deserializer", SuperstreamDeserializer.class.getName());
                    break;
                }
                break;
        }
        if (str3 != null) {
            properties.put("interceptor.classes", str3);
        }
        try {
            map = System.getenv();
            str2 = map.get("SUPERSTREAM_HOST");
        } catch (Exception e) {
            System.out.println(String.format("superstream: error initializing superstream: %s", e.getMessage()));
        }
        if (str2 == null) {
            throw new Exception("host is required");
        }
        properties.put(Consts.superstreamHostKey, str2);
        String str4 = map.get("SUPERSTREAM_TOKEN");
        if (str4 == null) {
            str4 = Consts.superstreamDefaultToken;
        }
        properties.put(Consts.superstreamTokenKey, str4);
        String str5 = map.get("SUPERSTREAM_LEARNING_FACTOR");
        Integer num = Consts.superstreamDefaultLearningFactor;
        if (str5 != null) {
            num = Integer.valueOf(Integer.parseInt(str5));
        }
        properties.put(Consts.superstreamLearningFactorKey, num);
        Boolean bool = false;
        String str6 = map.get("SUPERSTREAM_REDUCTION_ENABLED");
        if (str6 != null) {
            bool = Boolean.valueOf(Boolean.parseBoolean(str6));
        }
        properties.put(Consts.superstreamReductionEnabledKey, bool);
        Superstream superstream = new Superstream(str4, str2, num, propertiesToMap(properties), bool, str);
        superstream.init();
        properties.put(Consts.superstreamConnectionKey, superstream);
        return properties;
    }

    public static Map<String, Object> propertiesToMap(Properties properties) {
        return (Map) properties.entrySet().stream().collect(Collectors.toMap(entry -> {
            return String.valueOf(entry.getKey());
        }, entry2 -> {
            return entry2.getValue();
        }));
    }

    public void updateTopicPartitions(String str, Integer num) {
        Set<Integer> computeIfAbsent = this.topicPartitions.computeIfAbsent(str, str2 -> {
            return new HashSet();
        });
        if (computeIfAbsent.contains(num)) {
            return;
        }
        computeIfAbsent.add(num);
    }
}
