package io.nosqlbench.adapter.s4j;

import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import com.datastax.oss.pulsar.jms.PulsarJMSContext;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException;
import io.nosqlbench.adapter.s4j.util.S4JAdapterUtil;
import io.nosqlbench.adapter.s4j.util.S4JClientConf;
import io.nosqlbench.adapter.s4j.util.S4JJMSContextWrapper;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import java.io.Serializable;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/nosqlbench/adapter/s4j/S4JSpace.class */
public class S4JSpace implements AutoCloseable {
    private static final Logger logger;
    private final String spaceName;
    private final NBConfiguration cfg;
    private final String pulsarSvcUrl;
    private final String webSvcUrl;
    private final String s4jClientConfFileName;
    private final S4JClientConf s4JClientConf;
    private final int sessionMode;
    private boolean strictMsgErrorHandling;
    private long maxS4JOpTimeInSec;
    private long s4JActivityStartTimeMills;
    private boolean trackingMsgRecvCnt;
    private int maxNumConn;
    private int maxNumSessionPerConn;
    private PulsarConnectionFactory s4jConnFactory;
    private long totalCycleNum;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ConcurrentHashMap<String, JMSContext> connLvlJmsContexts = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, S4JJMSContextWrapper> sessionLvlJmsContexts = new ConcurrentHashMap<>();
    private final AtomicLong totalOpResponseCnt = new AtomicLong(0);
    private final AtomicLong nullMsgRecvCnt = new AtomicLong(0);
    private final ThreadLocal<Integer> txnBatchTrackingCnt = ThreadLocal.withInitial(() -> {
        return 0;
    });

    public S4JSpace(String str, NBConfiguration nBConfiguration) {
        this.spaceName = str;
        this.cfg = nBConfiguration;
        this.pulsarSvcUrl = (String) nBConfiguration.get("service_url");
        this.webSvcUrl = (String) nBConfiguration.get("web_url");
        this.maxNumConn = NumberUtils.toInt((String) nBConfiguration.getOptional("num_conn").orElse("1"));
        this.maxNumSessionPerConn = NumberUtils.toInt((String) nBConfiguration.getOptional("num_session").orElse("1"));
        this.maxS4JOpTimeInSec = NumberUtils.toLong((String) nBConfiguration.getOptional("max_s4jop_time").orElse("0L"));
        this.trackingMsgRecvCnt = BooleanUtils.toBoolean((String) nBConfiguration.getOptional("track_msg_cnt").orElse("false"));
        this.strictMsgErrorHandling = BooleanUtils.toBoolean((String) nBConfiguration.getOptional("strict_msg_error_handling").orElse("false"));
        this.s4jClientConfFileName = (String) nBConfiguration.get("config");
        this.sessionMode = S4JAdapterUtil.getSessionModeFromStr((String) nBConfiguration.getOptional("session_mode").orElse(""));
        this.s4JClientConf = new S4JClientConf(this.pulsarSvcUrl, this.webSvcUrl, this.s4jClientConfFileName);
        setS4JActivityStartTimeMills(System.currentTimeMillis());
        initializeSpace(this.s4JClientConf);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        shutdownSpace();
    }

    public static NBConfigModel getConfigModel() {
        return ConfigModel.of(S4JSpace.class).add(Param.defaultTo("service_url", "pulsar://localhost:6650").setDescription("Pulsar broker service URL.")).add(Param.defaultTo("web_url", "http://localhost:8080").setDescription("Pulsar web service URL.")).add(Param.defaultTo("config", "config.properties").setDescription("Pulsar client connection configuration property file.")).add(Param.defaultTo("num_conn", 1).setDescription("Number of JMS connections")).add(Param.defaultTo("num_session", 1).setDescription("Number of JMS sessions per JMS connection")).add(Param.defaultTo("max_s4jop_time", 0).setDescription("Maximum time (in seconds) to run NB S4J testing scenario.")).add(Param.defaultTo("track_msg_cnt", false).setDescription("Whether to keep track of message count(s)")).add(Param.defaultTo("session_mode", "").setDescription("JMS session mode")).add(Param.defaultTo("strict_msg_error_handling", false).setDescription("Whether to do strict error handling which is to stop NB S4J execution.")).asReadOnly();
    }

    public ConcurrentHashMap<String, JMSContext> getConnLvlJmsContexts() {
        return this.connLvlJmsContexts;
    }

    public ConcurrentHashMap<String, S4JJMSContextWrapper> getSessionLvlJmsContexts() {
        return this.sessionLvlJmsContexts;
    }

    public long getS4JActivityStartTimeMills() {
        return this.s4JActivityStartTimeMills;
    }

    public void setS4JActivityStartTimeMills(long j) {
        this.s4JActivityStartTimeMills = j;
    }

    public long getMaxS4JOpTimeInSec() {
        return this.maxS4JOpTimeInSec;
    }

    public int getSessionMode() {
        return this.sessionMode;
    }

    public String getS4jClientConfFileName() {
        return this.s4jClientConfFileName;
    }

    public S4JClientConf getS4JClientConf() {
        return this.s4JClientConf;
    }

    public boolean isTrackingMsgRecvCnt() {
        return this.trackingMsgRecvCnt;
    }

    public int getMaxNumSessionPerConn() {
        return this.maxNumSessionPerConn;
    }

    public int getMaxNumConn() {
        return this.maxNumConn;
    }

    public boolean isStrictMsgErrorHandling() {
        return this.strictMsgErrorHandling;
    }

    public int getTxnBatchTrackingCnt() {
        return this.txnBatchTrackingCnt.get().intValue();
    }

    public void incTxnBatchTrackingCnt() {
        this.txnBatchTrackingCnt.set(Integer.valueOf(getTxnBatchTrackingCnt() + 1));
    }

    public long getTotalOpResponseCnt() {
        return this.totalOpResponseCnt.get();
    }

    public long incTotalOpResponseCnt() {
        return this.totalOpResponseCnt.incrementAndGet();
    }

    public void resetTotalOpResponseCnt() {
        this.totalOpResponseCnt.set(0L);
    }

    public long getTotalNullMsgRecvdCnt() {
        return this.nullMsgRecvCnt.get();
    }

    public void resetTotalNullMsgRecvdCnt() {
        this.nullMsgRecvCnt.set(0L);
    }

    public long incTotalNullMsgRecvdCnt() {
        return this.nullMsgRecvCnt.incrementAndGet();
    }

    public PulsarConnectionFactory getS4jConnFactory() {
        return this.s4jConnFactory;
    }

    public long getTotalCycleNum() {
        return this.totalCycleNum;
    }

    public void setTotalCycleNum(long j) {
        this.totalCycleNum = j;
    }

    public void initializeSpace(S4JClientConf s4JClientConf) {
        if (this.s4jConnFactory == null) {
            try {
                this.s4jConnFactory = new PulsarConnectionFactory(s4JClientConf.getS4jConfObjMap());
                for (int i = 0; i < getMaxNumConn(); i++) {
                    String connLvlJmsContextIdentifier = getConnLvlJmsContextIdentifier(i);
                    String encodeToString = Base64.getEncoder().encodeToString(connLvlJmsContextIdentifier.getBytes());
                    JMSContext orCreateConnLvlJMSContext = getOrCreateConnLvlJMSContext(this.s4jConnFactory, s4JClientConf, this.sessionMode);
                    orCreateConnLvlJMSContext.setClientID(encodeToString);
                    orCreateConnLvlJMSContext.setExceptionListener(jMSException -> {
                        if (logger.isDebugEnabled()) {
                            logger.error("onException::Unexpected JMS error happened:" + jMSException);
                        }
                    });
                    this.connLvlJmsContexts.put(connLvlJmsContextIdentifier, orCreateConnLvlJMSContext);
                    if (logger.isDebugEnabled()) {
                        logger.debug("[Connection level JMSContext] {} -- {}", Thread.currentThread().getName(), orCreateConnLvlJMSContext);
                    }
                }
            } catch (JMSRuntimeException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("[ERROR] Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConf.toString());
                }
                throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause());
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }

    public void shutdownSpace() {
        try {
            waitUntilAllOpFinished(System.currentTimeMillis());
            this.txnBatchTrackingCnt.remove();
            for (S4JJMSContextWrapper s4JJMSContextWrapper : this.sessionLvlJmsContexts.values()) {
                if (s4JJMSContextWrapper != null) {
                    if (s4JJMSContextWrapper.isTransactedMode()) {
                        s4JJMSContextWrapper.getJmsContext().rollback();
                    }
                    s4JJMSContextWrapper.close();
                }
            }
            for (JMSContext jMSContext : this.connLvlJmsContexts.values()) {
                if (jMSContext != null) {
                    jMSContext.close();
                }
            }
            this.s4jConnFactory.close();
        } catch (Exception e) {
            e.printStackTrace();
            throw new S4JAdapterUnexpectedException("Unexpected error when shutting down NB S4J space.");
        }
    }

    private void waitUntilAllOpFinished(long j) {
        long currentTimeMillis;
        boolean z;
        long totalCycleNum = getTotalCycleNum();
        long j2 = 0;
        long j3 = 0;
        boolean isTrackingMsgRecvCnt = isTrackingMsgRecvCnt();
        do {
            S4JAdapterUtil.pauseCurThreadExec(1);
            currentTimeMillis = System.currentTimeMillis() - j;
            z = currentTimeMillis <= 10000;
            if (isTrackingMsgRecvCnt) {
                j2 = getTotalOpResponseCnt();
                j3 = getTotalNullMsgRecvdCnt();
                z = z && j2 < totalCycleNum;
            }
            if (logger.isTraceEnabled()) {
                logger.trace(buildExecSummaryString(isTrackingMsgRecvCnt, currentTimeMillis, j2, j3));
            }
        } while (z);
        logger.info(buildExecSummaryString(isTrackingMsgRecvCnt, currentTimeMillis, j2, j3));
    }

    private String buildExecSummaryString(boolean z, long j, long j2, long j3) {
        StringBuilder sb = new StringBuilder();
        sb.append("shutdownSpace::waitUntilAllOpFinished -- ").append("shutdown time elapsed: ").append(j).append("ms; ");
        if (z) {
            sb.append("response received: ").append(j2).append("; ");
            sb.append("null msg received: ").append(j3).append("; ");
        }
        return sb.toString();
    }

    public void processMsgAck(JMSContext jMSContext, Message message, float f, int i) throws JMSException {
        int sessionMode = jMSContext.getSessionMode();
        if (sessionMode == 1 || sessionMode == 0 || RandomUtils.nextFloat(0.0f, 1.0f) >= f) {
            return;
        }
        S4JAdapterUtil.pauseCurThreadExec(i);
        message.acknowledge();
    }

    public String getConnLvlJmsContextIdentifier(int i) {
        return S4JAdapterUtil.buildCacheKey(this.spaceName, StringUtils.join(new Serializable[]{"conn-", Integer.valueOf(i)}));
    }

    public String getSessionLvlJmsContextIdentifier(int i, int i2) {
        return S4JAdapterUtil.buildCacheKey(this.spaceName, StringUtils.join(new Serializable[]{"conn-", Integer.valueOf(i)}), StringUtils.join(new Serializable[]{"session-", Integer.valueOf(i2)}));
    }

    public JMSContext getOrCreateConnLvlJMSContext(PulsarConnectionFactory pulsarConnectionFactory, S4JClientConf s4JClientConf, int i) {
        JMSContext createContext;
        if (!S4JAdapterUtil.isAuthNRequired(s4JClientConf) && S4JAdapterUtil.isUseCredentialsEnabled(s4JClientConf)) {
            throw new S4JAdapterInvalidParamException("'jms.useCredentialsFromCreateConnection' can't set be true when Pulsar client authN parameters are not set. ");
        }
        if (S4JAdapterUtil.isAuthNRequired(s4JClientConf) && S4JAdapterUtil.isUseCredentialsEnabled(s4JClientConf)) {
            String credentialUserName = S4JAdapterUtil.getCredentialUserName(s4JClientConf);
            String credentialPassword = S4JAdapterUtil.getCredentialPassword(s4JClientConf);
            if (!StringUtils.startsWith(credentialPassword, "token:")) {
                throw new S4JAdapterInvalidParamException("When 'jms.useCredentialsFromCreateConnection' is enabled, the provided password must be in format 'token:<token_value_...> ");
            }
            createContext = pulsarConnectionFactory.createContext(credentialUserName, credentialPassword, i);
        } else {
            createContext = pulsarConnectionFactory.createContext(i);
        }
        return createContext;
    }

    public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(long j) {
        return getOrCreateS4jJmsContextWrapper(j, null);
    }

    public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(long j, Map<String, Object> map) {
        int maxNumConn = getMaxNumConn();
        int i = ((int) j) % maxNumConn;
        int maxNumSessionPerConn = ((int) (j / maxNumConn)) % getMaxNumSessionPerConn();
        PulsarJMSContext pulsarJMSContext = (JMSContext) this.connLvlJmsContexts.get(getConnLvlJmsContextIdentifier(i));
        if (!$assertionsDisabled && pulsarJMSContext == null) {
            throw new AssertionError();
        }
        String sessionLvlJmsContextIdentifier = getSessionLvlJmsContextIdentifier(i, maxNumSessionPerConn);
        S4JJMSContextWrapper s4JJMSContextWrapper = this.sessionLvlJmsContexts.get(sessionLvlJmsContextIdentifier);
        if (s4JJMSContextWrapper == null) {
            s4JJMSContextWrapper = new S4JJMSContextWrapper(sessionLvlJmsContextIdentifier, (map == null || map.isEmpty()) ? pulsarJMSContext.createContext(pulsarJMSContext.getSessionMode()) : pulsarJMSContext.createContext(pulsarJMSContext.getSessionMode(), map));
            this.sessionLvlJmsContexts.put(sessionLvlJmsContextIdentifier, s4JJMSContextWrapper);
            if (logger.isDebugEnabled()) {
                logger.debug("[Session level JMSContext] {} -- {}", Thread.currentThread().getName(), s4JJMSContextWrapper);
            }
        }
        return s4JJMSContextWrapper;
    }

    static {
        $assertionsDisabled = !S4JSpace.class.desiredAssertionStatus();
        logger = LogManager.getLogger(S4JSpace.class);
    }
}
