package io.nosqlbench.driver.pulsar;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;

/* loaded from: input_file:io/nosqlbench/driver/pulsar/PulsarSpace.class */
public class PulsarSpace {
    private static final Logger logger = LogManager.getLogger(PulsarSpace.class);
    private final String spaceName;
    private final PulsarNBClientConf pulsarNBClientConf;
    private final String pulsarSvcUrl;
    private final String webSvcUrl;
    private final PulsarAdmin pulsarAdmin;
    private final Timer createTransactionTimer;
    private final ActivityDef activityDef;
    private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
    private final Set<String> pulsarClusterMetadata = new HashSet();
    private PulsarClient pulsarClient = null;
    private Schema<?> pulsarSchema = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/nosqlbench/driver/pulsar/PulsarSpace$GaugeImpl.class */
    public static class GaugeImpl implements Gauge<Object> {
        private final Producer<?> producer;
        private final Function<ProducerStats, Object> valueExtractor;

        GaugeImpl(Producer<?> producer, Function<ProducerStats, Object> function) {
            this.producer = producer;
            this.valueExtractor = function;
        }

        public Object getValue() {
            Object apply;
            synchronized (this.producer) {
                apply = this.valueExtractor.apply(this.producer.getStats());
            }
            return apply;
        }
    }

    public PulsarSpace(String str, PulsarNBClientConf pulsarNBClientConf, String str2, String str3, PulsarAdmin pulsarAdmin, ActivityDef activityDef, Timer timer) {
        this.spaceName = str;
        this.pulsarNBClientConf = pulsarNBClientConf;
        this.pulsarSvcUrl = str2;
        this.webSvcUrl = str3;
        this.pulsarAdmin = pulsarAdmin;
        this.activityDef = activityDef;
        this.createTransactionTimer = timer;
        createPulsarClientFromConf();
        createPulsarSchemaFromConf();
        try {
            CollectionUtils.addAll(this.pulsarClusterMetadata, pulsarAdmin.clusters().getClusters().listIterator());
        } catch (PulsarAdminException e) {
            String str4 = "Fail to create PulsarClient from global configuration: " + e.getMessage();
            logger.error(str4);
            throw new RuntimeException(str4);
        }
    }

    private void createPulsarClientFromConf() {
        ClientBuilder builder = PulsarClient.builder();
        try {
            Map<String, Object> clientConfMap = this.pulsarNBClientConf.getClientConfMap();
            clientConfMap.remove("serviceUrl");
            builder.loadConf(clientConfMap).serviceUrl(this.pulsarSvcUrl);
            String str = (String) this.pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authPulginClassName.label);
            String str2 = (String) this.pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authParams.label);
            boolean z = BooleanUtils.toBoolean((String) this.pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.useTls.label));
            String str3 = (String) this.pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label);
            boolean z2 = BooleanUtils.toBoolean((String) this.pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label));
            boolean z3 = BooleanUtils.toBoolean((String) this.pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label));
            if (!StringUtils.isAnyBlank(new CharSequence[]{str, str2})) {
                builder.authentication(str, str2);
            }
            if (z) {
                builder.useKeyStoreTls(z).enableTlsHostnameVerification(z3);
                if (!StringUtils.isBlank(str3)) {
                    builder.tlsTrustCertsFilePath(str3);
                }
            }
            builder.allowTlsInsecureConnection(z2);
            this.pulsarClient = builder.build();
        } catch (PulsarClientException e) {
            String str4 = "Fail to create PulsarClient from global configuration: " + e.getMessage();
            logger.error(str4);
            throw new RuntimeException(str4);
        }
    }

    private void createPulsarSchemaFromConf() {
        Object schemaConfValue = this.pulsarNBClientConf.getSchemaConfValue("schema.type");
        String obj = schemaConfValue != null ? schemaConfValue.toString() : "";
        if (PulsarActivityUtil.isAvroSchemaTypeStr(obj)) {
            Object schemaConfValue2 = this.pulsarNBClientConf.getSchemaConfValue("schema.definition");
            this.pulsarSchema = PulsarActivityUtil.getAvroSchema(obj, schemaConfValue2 != null ? schemaConfValue2.toString() : "");
        } else {
            if (!PulsarActivityUtil.isPrimitiveSchemaTypeStr(obj)) {
                throw new RuntimeException("Unsupported schema type string: " + obj + "; Only primitive type and Avro type are supported at the moment!");
            }
            this.pulsarSchema = PulsarActivityUtil.getPrimitiveTypeSchema(obj);
        }
    }

    public PulsarClient getPulsarClient() {
        return this.pulsarClient;
    }

    public PulsarNBClientConf getPulsarClientConf() {
        return this.pulsarNBClientConf;
    }

    public Schema<?> getPulsarSchema() {
        return this.pulsarSchema;
    }

    public PulsarAdmin getPulsarAdmin() {
        return this.pulsarAdmin;
    }

    public String getPulsarSvcUrl() {
        return this.pulsarSvcUrl;
    }

    public String getWebSvcUrl() {
        return this.webSvcUrl;
    }

    public Set<String> getPulsarClusterMetadata() {
        return this.pulsarClusterMetadata;
    }

    private String getEffectiveProducerTopicName(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String producerTopicName = this.pulsarNBClientConf.getProducerTopicName();
        if (StringUtils.isBlank(producerTopicName)) {
            throw new RuntimeException(" topic name must be set at either global level or cycle level!");
        }
        return producerTopicName;
    }

    private String getEffectiveProducerName(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String producerName = this.pulsarNBClientConf.getProducerName();
        return !StringUtils.isBlank(producerName) ? producerName : "";
    }

    public Supplier<Transaction> getTransactionSupplier() {
        PulsarClient pulsarClient = getPulsarClient();
        return () -> {
            try {
                try {
                    Timer.Context time = this.createTransactionTimer.time();
                    try {
                        Transaction transaction = (Transaction) pulsarClient.newTransaction().build().get();
                        if (time != null) {
                            time.close();
                        }
                        return transaction;
                    } catch (Throwable th) {
                        if (time != null) {
                            try {
                                time.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (InterruptedException | ExecutionException e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Error while starting a new transaction", e);
                    }
                    throw new RuntimeException(e);
                }
            } catch (PulsarClientException e2) {
                throw new RuntimeException("Transactions are not enabled on Pulsar Client, please set client.enableTransaction=true in your Pulsar Client configuration");
            }
        };
    }

    private static String buildCacheKey(String... strArr) {
        return String.join("::", strArr);
    }

    public Producer<?> getProducer(String str, String str2) {
        String str3;
        String effectiveProducerTopicName = getEffectiveProducerTopicName(str);
        String effectiveProducerName = getEffectiveProducerName(str2);
        if (StringUtils.isBlank(effectiveProducerTopicName)) {
            throw new RuntimeException("Producer:: must specify a topic name either at the global level or the cycle level");
        }
        String buildCacheKey = buildCacheKey(effectiveProducerName, effectiveProducerTopicName);
        Producer<?> producer = this.producers.get(buildCacheKey);
        if (producer == null) {
            PulsarClient pulsarClient = getPulsarClient();
            Map<String, Object> producerConfMap = this.pulsarNBClientConf.getProducerConfMap();
            producerConfMap.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, effectiveProducerTopicName);
            if (StringUtils.isBlank(effectiveProducerName)) {
                str3 = "producer" + this.producers.size() + "_";
            } else {
                producerConfMap.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, effectiveProducerName);
                str3 = effectiveProducerName + "_";
            }
            String replace = (str3 + effectiveProducerTopicName + "_").replace("persistent://public/default/", "").replace("non-persistent://", "").replace("persistent://", "").replace("/", "_");
            try {
                ProducerBuilder newProducer = pulsarClient.newProducer(this.pulsarSchema);
                newProducer.loadConf(producerConfMap);
                producer = newProducer.create();
                this.producers.put(buildCacheKey, producer);
                ActivityMetrics.gauge(this.activityDef, replace + "totalbytessent", safeExtractMetric(producer, producerStats -> {
                    return Long.valueOf(producerStats.getTotalBytesSent() + producerStats.getNumBytesSent());
                }));
                ActivityMetrics.gauge(this.activityDef, replace + "totalmsgssent", safeExtractMetric(producer, producerStats2 -> {
                    return Long.valueOf(producerStats2.getTotalMsgsSent() + producerStats2.getNumMsgsSent());
                }));
                ActivityMetrics.gauge(this.activityDef, replace + "totalsendfailed", safeExtractMetric(producer, producerStats3 -> {
                    return Long.valueOf(producerStats3.getTotalSendFailed() + producerStats3.getNumSendFailed());
                }));
                ActivityMetrics.gauge(this.activityDef, replace + "totalacksreceived", safeExtractMetric(producer, producerStats4 -> {
                    return Long.valueOf(producerStats4.getTotalAcksReceived() + producerStats4.getNumAcksReceived());
                }));
                ActivityMetrics.gauge(this.activityDef, replace + "sendbytesrate", safeExtractMetric(producer, (v0) -> {
                    return v0.getSendBytesRate();
                }));
                ActivityMetrics.gauge(this.activityDef, replace + "sendmsgsrate", safeExtractMetric(producer, (v0) -> {
                    return v0.getSendMsgsRate();
                }));
            } catch (PulsarClientException e) {
                throw new RuntimeException("Unable to create a Pulsar producer!", e);
            }
        }
        return producer;
    }

    static Gauge<Object> safeExtractMetric(Producer<?> producer, Function<ProducerStats, Object> function) {
        return new GaugeImpl(producer, function);
    }

    private String getEffectiveTopicNamesStr(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String consumerTopicNames = this.pulsarNBClientConf.getConsumerTopicNames();
        return !StringUtils.isBlank(consumerTopicNames) ? consumerTopicNames : "";
    }

    private List<String> getEffectiveTopicNames(String str) {
        String[] split = getEffectiveTopicNamesStr(str).split("[;,]");
        ArrayList arrayList = new ArrayList();
        for (String str2 : split) {
            if (!StringUtils.isBlank(str2)) {
                arrayList.add(str2.trim());
            }
        }
        return arrayList;
    }

    private String getEffectiveTopicPatternStr(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String consumerTopicPattern = this.pulsarNBClientConf.getConsumerTopicPattern();
        return !StringUtils.isBlank(consumerTopicPattern) ? consumerTopicPattern : "";
    }

    private Pattern getEffectiveTopicPattern(String str) {
        Pattern pattern;
        String effectiveTopicPatternStr = getEffectiveTopicPatternStr(str);
        try {
            pattern = !StringUtils.isBlank(effectiveTopicPatternStr) ? Pattern.compile(effectiveTopicPatternStr) : null;
        } catch (PatternSyntaxException e) {
            pattern = null;
        }
        return pattern;
    }

    private String getEffectiveSubscriptionName(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String consumerSubscriptionName = this.pulsarNBClientConf.getConsumerSubscriptionName();
        if (StringUtils.isBlank(consumerSubscriptionName)) {
            throw new RuntimeException("Consumer::Subscription name must be set at either global level or cycle level!");
        }
        return consumerSubscriptionName;
    }

    private String getEffectiveSubscriptionTypeStr(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String consumerSubscriptionType = this.pulsarNBClientConf.getConsumerSubscriptionType();
        return !StringUtils.isBlank(consumerSubscriptionType) ? consumerSubscriptionType : "";
    }

    private SubscriptionType getEffectiveSubscriptionType(String str) {
        String effectiveSubscriptionTypeStr = getEffectiveSubscriptionTypeStr(str);
        SubscriptionType subscriptionType = SubscriptionType.Exclusive;
        if (!StringUtils.isBlank(effectiveSubscriptionTypeStr)) {
            if (!PulsarActivityUtil.isValidSubscriptionType(effectiveSubscriptionTypeStr)) {
                throw new RuntimeException("Consumer::Invalid subscription type (\"" + effectiveSubscriptionTypeStr + "\"). \nValid subscription types: " + PulsarActivityUtil.getValidSubscriptionTypeList());
            }
            subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionTypeStr);
        }
        return subscriptionType;
    }

    private String getEffectiveConsumerName(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String consumerName = this.pulsarNBClientConf.getConsumerName();
        return !StringUtils.isBlank(consumerName) ? consumerName : "";
    }

    public Consumer<?> getConsumer(String str, String str2, String str3, String str4, String str5, String str6) {
        List<String> effectiveTopicNames = getEffectiveTopicNames(str2);
        String effectiveTopicPatternStr = getEffectiveTopicPatternStr(str3);
        Pattern effectiveTopicPattern = getEffectiveTopicPattern(str3);
        String effectiveSubscriptionName = getEffectiveSubscriptionName(str4);
        SubscriptionType effectiveSubscriptionType = getEffectiveSubscriptionType(str5);
        String effectiveConsumerName = getEffectiveConsumerName(str6);
        if (StringUtils.isBlank(str) && effectiveTopicNames.isEmpty() && effectiveTopicPattern == null) {
            throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!");
        }
        String buildCacheKey = !effectiveTopicNames.isEmpty() ? buildCacheKey(effectiveConsumerName, effectiveSubscriptionName, String.join("|", effectiveTopicNames)) : effectiveTopicPattern != null ? buildCacheKey(effectiveConsumerName, effectiveSubscriptionName, effectiveTopicPatternStr) : buildCacheKey(effectiveConsumerName, effectiveSubscriptionName, str);
        Consumer<?> consumer = this.consumers.get(buildCacheKey);
        if (consumer == null) {
            PulsarClient pulsarClient = getPulsarClient();
            HashMap hashMap = new HashMap(this.pulsarNBClientConf.getConsumerConfMap());
            hashMap.remove("timeout");
            if (!effectiveTopicNames.isEmpty()) {
                hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
                hashMap.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, effectiveTopicNames);
            } else if (effectiveTopicPattern != null) {
                hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
                hashMap.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label, getEffectiveTopicPattern(str3));
            } else {
                effectiveTopicNames.add(str);
                hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
                hashMap.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, effectiveTopicNames);
            }
            hashMap.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, effectiveSubscriptionName);
            hashMap.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, effectiveSubscriptionType);
            if (!StringUtils.isBlank(effectiveConsumerName)) {
                hashMap.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, effectiveConsumerName);
            }
            try {
                consumer = pulsarClient.newConsumer(this.pulsarSchema).loadConf(hashMap).subscribe();
                this.consumers.put(buildCacheKey, consumer);
            } catch (PulsarClientException e) {
                e.printStackTrace();
                throw new RuntimeException("Unable to create a Pulsar consumer!");
            }
        }
        return consumer;
    }

    private String getEffectiveReaderTopicName(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String readerTopicName = this.pulsarNBClientConf.getReaderTopicName();
        if (StringUtils.isBlank(readerTopicName)) {
            throw new RuntimeException("Reader topic name must be set at either global level or cycle level!");
        }
        return readerTopicName;
    }

    private String getEffectiveReaderName(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String consumerName = this.pulsarNBClientConf.getConsumerName();
        return !StringUtils.isBlank(consumerName) ? consumerName : "";
    }

    private String getEffectiveStartMsgPosStr(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String startMsgPosStr = this.pulsarNBClientConf.getStartMsgPosStr();
        return !StringUtils.isBlank(startMsgPosStr) ? startMsgPosStr : PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label;
    }

    public Reader<?> getReader(String str, String str2, String str3) {
        String effectiveReaderTopicName = getEffectiveReaderTopicName(str);
        if (StringUtils.isBlank(effectiveReaderTopicName)) {
            throw new RuntimeException("Reader:: must specify a topic name either at the global level or the cycle level");
        }
        String effectiveReaderName = getEffectiveReaderName(str2);
        String effectiveStartMsgPosStr = getEffectiveStartMsgPosStr(str3);
        if (!PulsarActivityUtil.isValideReaderStartPosition(effectiveStartMsgPosStr)) {
            throw new RuntimeException("Reader:: Invalid value for Reader start message position!");
        }
        String buildCacheKey = buildCacheKey(effectiveReaderTopicName, effectiveReaderName, effectiveStartMsgPosStr);
        Reader<?> reader = this.readers.get(buildCacheKey);
        if (reader == null) {
            PulsarClient pulsarClient = getPulsarClient();
            Map<String, Object> readerConfMap = this.pulsarNBClientConf.getReaderConfMap();
            readerConfMap.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), effectiveReaderTopicName);
            if (!StringUtils.isBlank(effectiveReaderName)) {
                readerConfMap.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), effectiveReaderName);
            }
            readerConfMap.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label);
            try {
                ReaderBuilder loadConf = pulsarClient.newReader(this.pulsarSchema).loadConf(readerConfMap);
                MessageId messageId = MessageId.latest;
                if (effectiveStartMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) {
                    messageId = MessageId.earliest;
                }
                if (messageId != null) {
                    loadConf = loadConf.startMessageId(messageId);
                }
                reader = loadConf.create();
                this.readers.put(buildCacheKey, reader);
            } catch (PulsarClientException e) {
                e.printStackTrace();
                throw new RuntimeException("Unable to create a Pulsar reader!");
            }
        }
        return reader;
    }
}
