package io.mantisrx.connector.job.core;

import com.sampullara.cli.Args;
import com.sampullara.cli.Argument;
import io.mantisrx.client.MantisSSEJob;
import io.mantisrx.client.SinkConnectionsStatus;
import io.mantisrx.client.examples.SubmitEphemeralJob;
import io.mantisrx.runtime.parameter.SinkParameters;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observer;
import rx.Subscription;

/* loaded from: input_file:io/mantisrx/connector/job/core/MantisSourceJobConnector.class */
public class MantisSourceJobConnector {

    @Argument(alias = "p", description = "Specify a configuration file")
    private static String propFile = "";
    private static final Logger LOGGER = LoggerFactory.getLogger(MantisSourceJobConnector.class);
    private final Properties props;
    public static final String MANTIS_SOURCEJOB_CLIENT_ID_PARAM = "clientId";
    public static final String MANTIS_SOURCEJOB_SUBSCRIPTION_ID = "subscriptionId";
    public static final String MANTIS_SOURCEJOB_CLIENT_ID = "clientId";
    public static final String MANTIS_SOURCEJOB_CRITERION = "criterion";
    public static final String MANTIS_SOURCEJOB_NAME_PARAM = "sourceJobName";
    public static final String MANTIS_SOURCEJOB_TARGET_KEY = "target";
    public static final String MANTIS_SOURCEJOB_IS_BROADCAST_MODE = "isBroadcastMode";
    public static final String MANTIS_SOURCEJOB_SAMPLE_PER_SEC_KEY = "sample";
    public static final String MANTIS_ENABLE_PINGS = "enablePings";
    public static final String MANTIS_ENABLE_META_MESSAGES = "enableMetaMessages";
    public static final String MANTIS_META_MESSAGE_INTERVAL_SEC = "metaMessagesSec";
    public static final String MANTIS_MQL_THREADING_PARAM = "mantis.mql.threading.enabled";
    private static final String ZK_CONNECT_STRING = "mantis.zookeeper.connectString";
    private static final String ZK_ROOT = "mantis.zookeeper.root";
    private static final String ZK_LEADER_PATH = "mantis.zookeeper.leader.announcement.path";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/connector/job/core/MantisSourceJobConnector$NoOpSinkConnectionsStatusObserver.class */
    public static class NoOpSinkConnectionsStatusObserver implements Observer<SinkConnectionsStatus> {
        public void onCompleted() {
            MantisSourceJobConnector.LOGGER.warn("Got Completed on SinkConnectionStatus ");
        }

        public void onError(Throwable th) {
            MantisSourceJobConnector.LOGGER.error("Got Error on SinkConnectionStatus ", th);
        }

        public void onNext(SinkConnectionsStatus sinkConnectionsStatus) {
            MantisSourceJobConnector.LOGGER.info("Got Sink Connection Status update " + sinkConnectionsStatus);
        }
    }

    public MantisSourceJobConnector(Properties properties) {
        this.props = properties;
    }

    public MantisSourceJobConnector() {
        String str;
        String str2;
        String str3;
        this.props = new Properties();
        Map<String, String> map = System.getenv();
        if (map == null || map.isEmpty()) {
            str = "127.0.0.1:2181";
            str2 = "/mantis/master";
            str3 = "/leader";
        } else {
            str = map.getOrDefault(ZK_CONNECT_STRING, "127.0.0.1:2181");
            str2 = map.getOrDefault(ZK_ROOT, "/mantis/master");
            str3 = map.getOrDefault(ZK_LEADER_PATH, "/leader");
            LOGGER.info("Mantis Zk settings read from ENV: connectString {} root {} path {}", new Object[]{map.get(ZK_CONNECT_STRING), map.get(ZK_ROOT), map.get(ZK_LEADER_PATH)});
        }
        if (str == null || str.isEmpty() || str2 == null || str2.isEmpty() || str3 == null || str3.isEmpty()) {
            throw new RuntimeException("Zookeeper properties not available!");
        }
        this.props.put(ZK_CONNECT_STRING, str);
        this.props.put(ZK_ROOT, str2);
        this.props.put(ZK_LEADER_PATH, str3);
        this.props.put("mantis.zookeeper.connectionTimeMs", "2000");
        this.props.put("mantis.zookeeper.connection.retrySleepMs", "500");
        this.props.put("mantis.zookeeper.connection.retryCount", "5");
        LOGGER.info("Mantis Zk settings used for Source Job connector: connectString {} root {} path {}", new Object[]{str, str2, str3});
    }

    @Deprecated
    public MantisSSEJob connecToJob(String str) {
        return connectToJob(str, new SinkParameters.Builder().build(), new NoOpSinkConnectionsStatusObserver());
    }

    public MantisSSEJob connectToJob(String str, SinkParameters sinkParameters) {
        return connectToJob(str, sinkParameters, new NoOpSinkConnectionsStatusObserver());
    }

    @Deprecated
    public MantisSSEJob connectToJob(String str, SinkParameters sinkParameters, int i, int i2) {
        return connectToJob(str, sinkParameters, new NoOpSinkConnectionsStatusObserver());
    }

    @Deprecated
    public MantisSSEJob connectToJob(String str, SinkParameters sinkParameters, int i, int i2, Observer<SinkConnectionsStatus> observer) {
        return connectToJob(str, sinkParameters, observer);
    }

    public MantisSSEJob connectToJob(String str, SinkParameters sinkParameters, Observer<SinkConnectionsStatus> observer) {
        return new MantisSSEJob.Builder(this.props).name(str).sinkConnectionsStatusObserver(observer).onConnectionReset(th -> {
            LOGGER.error("Reconnecting due to error: " + th.getMessage());
        }).sinkParams(sinkParameters).buildJobConnector();
    }

    public static void main(String[] strArr) {
        try {
            SinkParameters build = new SinkParameters.Builder().withParameter(MANTIS_SOURCEJOB_SUBSCRIPTION_ID, "id1").withParameter(MANTIS_SOURCEJOB_CRITERION, "select * where true").build();
            Args.parse(MantisSourceJobConnector.class, strArr);
            CountDownLatch countDownLatch = new CountDownLatch(20);
            MantisSSEJob connectToJob = new MantisSourceJobConnector().connectToJob("TestSourceJob", build);
            Subscription subscribe = connectToJob.connectAndGetObservable().doOnNext(mantisServerSentEvent -> {
                LOGGER.info("Got event:  data: " + mantisServerSentEvent.getEventAsString());
                countDownLatch.countDown();
            }).subscribe();
            connectToJob.connectAndGetObservable().doOnNext(mantisServerSentEvent2 -> {
                LOGGER.info("    2nd: Got event:  data: " + mantisServerSentEvent2.getEventAsString());
                countDownLatch.countDown();
            }).subscribe();
            try {
                if (countDownLatch.await(300L, TimeUnit.SECONDS)) {
                    System.out.println("PASSED");
                } else {
                    System.err.println("FAILED!");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscribe.unsubscribe();
            System.out.println("Unsubscribed");
        } catch (IllegalArgumentException e2) {
            Args.usage(SubmitEphemeralJob.class);
            System.exit(1);
        } catch (Exception e3) {
            e3.printStackTrace();
            System.exit(1);
        }
    }
}
