package com.networknt.kafka.streams;

import com.networknt.config.Config;
import com.networknt.kafka.common.KafkaStreamsConfig;
import com.networknt.kafka.entity.StreamsDLQMetadata;
import com.networknt.utility.ModuleRegistry;
import com.networknt.utility.ObjectUtils;
import com.networknt.utility.StringUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/kafka/streams/LightStreams.class */
public interface LightStreams {
    public static final Logger logger = LoggerFactory.getLogger(LightStreams.class);
    public static final long WAIT_THRESHOLD = 30000;

    void start(String str, int i);

    void close();

    default void registerModule() {
        ArrayList arrayList = new ArrayList();
        arrayList.add("basic.auth.user.info");
        arrayList.add("sasl.jaas.config");
        arrayList.add("schema.registry.ssl.truststore.password");
        ModuleRegistry.registerModule("kafka-streams", LightStreams.class.getName(), Config.getInstance().getJsonMapConfigNoCache("kafka-streams"), arrayList);
    }

    default Object getKafkaValueByKey(ReadOnlyKeyValueStore<String, ?> readOnlyKeyValueStore, String str) {
        boolean z;
        Object obj = null;
        long currentTimeMillis = System.currentTimeMillis() + WAIT_THRESHOLD;
        while (true) {
            if (System.currentTimeMillis() < currentTimeMillis) {
                try {
                    obj = readOnlyKeyValueStore.get(str);
                    z = false;
                } catch (InvalidStateStoreException e) {
                    z = true;
                    try {
                        logger.debug(e.getMessage());
                        Thread.sleep(100L);
                    } catch (InterruptedException e2) {
                        logger.error(e2.getMessage(), e2);
                    }
                }
                if (!z) {
                    break;
                }
            } else if (logger.isDebugEnabled()) {
                logger.debug("Timeout period is passed after 30 seconds.");
            }
        }
        return obj;
    }

    default Object getAllKafkaValue(ReadOnlyKeyValueStore<?, ?> readOnlyKeyValueStore) {
        boolean z;
        KeyValueIterator keyValueIterator = null;
        long currentTimeMillis = System.currentTimeMillis() + WAIT_THRESHOLD;
        while (true) {
            if (System.currentTimeMillis() < currentTimeMillis) {
                try {
                    keyValueIterator = readOnlyKeyValueStore.all();
                    z = false;
                } catch (InvalidStateStoreException e) {
                    z = true;
                    try {
                        logger.debug(e.getMessage());
                        Thread.sleep(100L);
                    } catch (InterruptedException e2) {
                        logger.error(e2.getMessage(), e2);
                    }
                }
                if (!z) {
                    break;
                }
            } else if (logger.isDebugEnabled()) {
                logger.debug("Timeout period is passed after 30 seconds.");
            }
        }
        return keyValueIterator;
    }

    default KafkaStreams startStream(String str, int i, Topology topology, KafkaStreamsConfig kafkaStreamsConfig, Map<String, StreamsDLQMetadata> map, String... strArr) throws RuntimeException {
        Properties properties = new Properties();
        properties.putAll(kafkaStreamsConfig.getProperties());
        properties.put("application.server", str + ":" + i);
        if (kafkaStreamsConfig.isAuditEnabled() && !StringUtils.isEmpty(kafkaStreamsConfig.getAuditTarget()) && kafkaStreamsConfig.getAuditTarget().equalsIgnoreCase("topic")) {
            topology.addSink("AuditSink", kafkaStreamsConfig.getAuditTopic(), Serdes.String().serializer(), Serdes.String().serializer(), strArr);
        }
        if (kafkaStreamsConfig.isDeadLetterEnabled()) {
            if (ObjectUtils.isEmpty(map) || map.isEmpty()) {
                throw new RuntimeException("DLQ is enabled, SreamsDLQMetadata can not be null");
            }
            map.entrySet().forEach(entry -> {
                topology.addSink(((String) entry.getKey()).trim() + "_DLQSink", ((String) entry.getKey()).trim(), Serdes.ByteArray().serializer(), ((StreamsDLQMetadata) entry.getValue()).getSerde().serializer(), (String[]) ((StreamsDLQMetadata) entry.getValue()).getParentNames().toArray(i2 -> {
                    return new String[i2];
                }));
            });
        }
        KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
        if (kafkaStreamsConfig.isCleanUp()) {
            kafkaStreams.cleanUp();
        }
        kafkaStreams.start();
        return kafkaStreams;
    }
}
