package com.jaffa.rpc.lib;

import com.jaffa.rpc.lib.annotations.Api;
import com.jaffa.rpc.lib.annotations.ApiClient;
import com.jaffa.rpc.lib.annotations.ApiServer;
import com.jaffa.rpc.lib.common.FinalizationWorker;
import com.jaffa.rpc.lib.common.RequestInvoker;
import com.jaffa.rpc.lib.entities.Protocol;
import com.jaffa.rpc.lib.exception.JaffaRpcSystemException;
import com.jaffa.rpc.lib.http.receivers.HttpAsyncAndSyncRequestReceiver;
import com.jaffa.rpc.lib.http.receivers.HttpAsyncResponseReceiver;
import com.jaffa.rpc.lib.kafka.KafkaRequestSender;
import com.jaffa.rpc.lib.kafka.receivers.KafkaAsyncRequestReceiver;
import com.jaffa.rpc.lib.kafka.receivers.KafkaAsyncResponseReceiver;
import com.jaffa.rpc.lib.kafka.receivers.KafkaReceiver;
import com.jaffa.rpc.lib.kafka.receivers.KafkaSyncRequestReceiver;
import com.jaffa.rpc.lib.rabbitmq.RabbitMQRequestSender;
import com.jaffa.rpc.lib.rabbitmq.receivers.RabbitMQAsyncAndSyncRequestReceiver;
import com.jaffa.rpc.lib.rabbitmq.receivers.RabbitMQAsyncResponseReceiver;
import com.jaffa.rpc.lib.serialization.Serializer;
import com.jaffa.rpc.lib.spring.ClientEndpoints;
import com.jaffa.rpc.lib.spring.ServerEndpoints;
import com.jaffa.rpc.lib.zeromq.CurveUtils;
import com.jaffa.rpc.lib.zeromq.ZeroMqRequestSender;
import com.jaffa.rpc.lib.zeromq.receivers.ZMQAsyncAndSyncRequestReceiver;
import com.jaffa.rpc.lib.zeromq.receivers.ZMQAsyncResponseReceiver;
import com.jaffa.rpc.lib.zookeeper.Utils;
import com.jaffa.rpc.lib.zookeeper.ZooKeeperConnection;
import java.io.Closeable;
import java.io.IOException;
import java.lang.Thread;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import javax.annotation.PostConstruct;
import kafka.admin.RackAwareMode$Disabled$;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import kafka.zookeeper.ZooKeeperClient;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.KeeperException;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.zeromq.ZContext;
import scala.Option;

/* loaded from: input_file:com/jaffa/rpc/lib/JaffaService.class */
public class JaffaService {
    private static KafkaZkClient zkClient;
    private static Set<String> serverAsyncTopics;
    private static Set<String> clientAsyncTopics;
    private static Set<String> serverSyncTopics;
    private static Set<String> clientSyncTopics;
    private static AdminZkClient adminZkClient;
    private static RabbitAdmin adminRabbitMQ;
    private static ConnectionFactory connectionFactory;
    private final List<KafkaReceiver> kafkaReceivers = new ArrayList();
    private final List<Closeable> zmqReceivers = new ArrayList();
    private final List<Thread> receiverThreads = new ArrayList();

    @Autowired
    private ServerEndpoints serverEndpoints;

    @Autowired
    private ClientEndpoints clientEndpoints;

    @Autowired
    private ApplicationContext context;
    private static final Logger log = LoggerFactory.getLogger(JaffaService.class);
    private static final Properties producerProps = new Properties();
    private static final Properties consumerProps = new Properties();
    private static int brokersCount = 0;

    private static void loadInternalProperties() {
        if (Utils.getRpcProtocol().equals(Protocol.KAFKA)) {
            consumerProps.put("bootstrap.servers", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.bootstrap.servers"));
            consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            consumerProps.put("enable.auto.commit", String.valueOf(false));
            consumerProps.put("group.id", UUID.randomUUID().toString());
            producerProps.put("bootstrap.servers", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.bootstrap.servers"));
            producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            if (Boolean.parseBoolean(System.getProperty("jaffa.rpc.protocol.kafka.use.ssl", String.valueOf(false)))) {
                HashMap hashMap = new HashMap();
                hashMap.put("security.protocol", "SSL");
                hashMap.put("ssl.truststore.location", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.ssl.truststore.location"));
                hashMap.put("ssl.truststore.password", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.ssl.truststore.password"));
                hashMap.put("ssl.keystore.location", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.ssl.keystore.location"));
                hashMap.put("ssl.keystore.password", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.ssl.keystore.password"));
                hashMap.put("ssl.key.password", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.ssl.key.password"));
                consumerProps.putAll(hashMap);
                producerProps.putAll(hashMap);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void registerServices() {
        HashMap hashMap = new HashMap();
        for (Class<?> cls : this.serverEndpoints.getEndpoints()) {
            log.info("Server endpoint: {}", cls.getName());
            hashMap.put(cls, cls.getInterfaces()[0]);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            RequestInvoker.getWrappedServices().put(entry.getValue(), this.context.getBean((Class) entry.getKey()));
            Utils.registerService(((Class) entry.getValue()).getName(), Utils.getRpcProtocol());
        }
        RequestInvoker.setContext(this.context);
    }

    private void prepareServiceRegistration() throws ClassNotFoundException {
        Utils.connect(Utils.getRequiredOption("jaffa.rpc.zookeeper.connection"));
        Protocol rpcProtocol = Utils.getRpcProtocol();
        if (rpcProtocol.equals(Protocol.KAFKA)) {
            setZkClient(new KafkaZkClient(new ZooKeeperClient(Utils.getRequiredOption("jaffa.rpc.zookeeper.connection"), 200000, 15000, 10, Time.SYSTEM, UUID.randomUUID().toString(), UUID.randomUUID().toString(), (Option) null, Option.apply(ZooKeeperConnection.getZkConfig())), false, Time.SYSTEM));
            setAdminZkClient(new AdminZkClient(zkClient));
            setBrokersCount(zkClient.getAllBrokersInCluster().size());
            log.info("Kafka brokers: {}", Integer.valueOf(brokersCount));
            setServerAsyncTopics(createKafkaTopics("server-async"));
            setClientAsyncTopics(createKafkaTopics("client-async"));
            setServerSyncTopics(createKafkaTopics("server-sync"));
            setClientSyncTopics(createKafkaTopics("client-sync"));
        }
        if (rpcProtocol.equals(Protocol.RABBIT)) {
            String requiredOption = Utils.getRequiredOption("jaffa.rpc.protocol.rabbit.host");
            int parseInt = Integer.parseInt(Utils.getRequiredOption("jaffa.rpc.protocol.rabbit.port"));
            RabbitConnectionFactoryBean rabbitConnectionFactoryBean = new RabbitConnectionFactoryBean();
            rabbitConnectionFactoryBean.setHost(requiredOption);
            rabbitConnectionFactoryBean.setPort(parseInt);
            rabbitConnectionFactoryBean.setUsername(System.getProperty("jaffa.rpc.protocol.rabbit.login", "guest"));
            rabbitConnectionFactoryBean.setPassword(System.getProperty("jaffa.rpc.protocol.rabbit.password", "guest"));
            if (Boolean.parseBoolean(System.getProperty("jaffa.rpc.protocol.rabbit.use.ssl", "false"))) {
                rabbitConnectionFactoryBean.setUseSSL(true);
                rabbitConnectionFactoryBean.setKeyStore(Utils.getRequiredOption("jaffa.rpc.protocol.rabbit.ssl.keystore.location"));
                rabbitConnectionFactoryBean.setKeyStorePassphrase(Utils.getRequiredOption("jaffa.rpc.protocol.rabbit.ssl.keystore.password"));
                rabbitConnectionFactoryBean.setTrustStore(Utils.getRequiredOption("jaffa.rpc.protocol.rabbit.ssl.truststore.location"));
                rabbitConnectionFactoryBean.setTrustStorePassphrase(Utils.getRequiredOption("jaffa.rpc.protocol.rabbit.ssl.truststore.password"));
            }
            setConnectionFactory(new CachingConnectionFactory(rabbitConnectionFactoryBean.getRabbitConnectionFactory()));
            setAdminRabbitMQ(new RabbitAdmin(connectionFactory));
            adminRabbitMQ.declareExchange(new DirectExchange(RabbitMQRequestSender.EXCHANGE_NAME, true, false));
            if (adminRabbitMQ.getQueueInfo(RabbitMQRequestSender.SERVER) == null) {
                adminRabbitMQ.declareQueue(new Queue(RabbitMQRequestSender.SERVER));
            }
            if (adminRabbitMQ.getQueueInfo(RabbitMQRequestSender.CLIENT_ASYNC_NAME) == null) {
                adminRabbitMQ.declareQueue(new Queue(RabbitMQRequestSender.CLIENT_ASYNC_NAME));
            }
            if (adminRabbitMQ.getQueueInfo(RabbitMQRequestSender.CLIENT_SYNC_NAME) == null) {
                adminRabbitMQ.declareQueue(new Queue(RabbitMQRequestSender.CLIENT_SYNC_NAME));
            }
        }
    }

    private Set<String> getTopicNames(String str) throws ClassNotFoundException {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        if (str.contains("server")) {
            for (Class<?> cls : this.serverEndpoints.getEndpoints()) {
                if (!cls.isAnnotationPresent(ApiServer.class)) {
                    throw new IllegalArgumentException(String.format("Class %s is not annotated as ApiServer!", cls.getName()));
                }
                if (cls.getInterfaces().length == 0) {
                    throw new IllegalArgumentException(String.format("Class %s does not implement Api interface!", cls.getName()));
                }
                Class<?> cls2 = cls.getInterfaces()[0];
                if (!cls2.isAnnotationPresent(Api.class)) {
                    throw new IllegalArgumentException(String.format("Class %s does not implement Api interface!", cls.getName()));
                }
                try {
                    cls.getConstructor(new Class[0]);
                    hashSet2.add(cls2);
                } catch (NoSuchMethodException e) {
                    log.error("General error during endpoint initialization", e);
                    throw new IllegalArgumentException(String.format("Class %s does not have default constructor!", cls.getName()));
                }
            }
        } else {
            for (Class<?> cls3 : this.clientEndpoints.getEndpoints()) {
                if (!cls3.isAnnotationPresent(ApiClient.class)) {
                    throw new IllegalArgumentException("Class " + cls3.getName() + " does has ApiClient annotation!");
                }
                hashSet2.add(Class.forName(Utils.getServiceInterfaceNameFromClient(cls3.getName())));
            }
        }
        hashSet2.forEach(cls4 -> {
            hashSet.add(cls4.getName() + "-" + Utils.getRequiredOption("jaffa.rpc.module.id") + "-" + str);
        });
        return hashSet;
    }

    private Set<String> createKafkaTopics(String str) throws ClassNotFoundException {
        Set<String> topicNames = getTopicNames(str);
        topicNames.forEach(str2 -> {
            if (!zkClient.topicExists(str2)) {
                adminZkClient.createTopic(str2, brokersCount, 1, new Properties(), RackAwareMode$Disabled$.MODULE$);
            } else if (!Integer.valueOf(zkClient.getTopicPartitionCount(str2).get() + "").equals(Integer.valueOf(brokersCount))) {
                throw new IllegalStateException("Topic " + str2 + " has wrong config");
            }
        });
        return topicNames;
    }

    @PostConstruct
    private void init() {
        try {
            Utils.loadExternalProperties();
            loadInternalProperties();
            long currentTimeMillis = System.currentTimeMillis();
            prepareServiceRegistration();
            CountDownLatch countDownLatch = null;
            int i = 0;
            Serializer.init();
            switch (Utils.getRpcProtocol()) {
                case KAFKA:
                    if (!clientSyncTopics.isEmpty() && !clientAsyncTopics.isEmpty()) {
                        i = 0 + 2;
                    }
                    if (!serverSyncTopics.isEmpty() && !serverAsyncTopics.isEmpty()) {
                        i += 2;
                    }
                    if (i != 0) {
                        countDownLatch = new CountDownLatch(brokersCount * i);
                    }
                    if (!serverSyncTopics.isEmpty() && !serverAsyncTopics.isEmpty()) {
                        KafkaSyncRequestReceiver kafkaSyncRequestReceiver = new KafkaSyncRequestReceiver(countDownLatch);
                        KafkaAsyncRequestReceiver kafkaAsyncRequestReceiver = new KafkaAsyncRequestReceiver(countDownLatch);
                        this.kafkaReceivers.add(kafkaAsyncRequestReceiver);
                        this.kafkaReceivers.add(kafkaSyncRequestReceiver);
                        this.receiverThreads.add(new Thread(kafkaSyncRequestReceiver));
                        this.receiverThreads.add(new Thread(kafkaAsyncRequestReceiver));
                    }
                    if (!clientSyncTopics.isEmpty() && !clientAsyncTopics.isEmpty()) {
                        KafkaAsyncResponseReceiver kafkaAsyncResponseReceiver = new KafkaAsyncResponseReceiver(countDownLatch);
                        this.kafkaReceivers.add(kafkaAsyncResponseReceiver);
                        KafkaRequestSender.initSyncKafkaConsumers(brokersCount, countDownLatch);
                        this.receiverThreads.add(new Thread(kafkaAsyncResponseReceiver));
                        break;
                    }
                    break;
                case ZMQ:
                    if (this.serverEndpoints.getEndpoints().length != 0) {
                        ZMQAsyncAndSyncRequestReceiver zMQAsyncAndSyncRequestReceiver = new ZMQAsyncAndSyncRequestReceiver();
                        this.zmqReceivers.add(zMQAsyncAndSyncRequestReceiver);
                        this.receiverThreads.add(new Thread(zMQAsyncAndSyncRequestReceiver));
                    }
                    if (this.clientEndpoints.getEndpoints().length != 0) {
                        ZMQAsyncResponseReceiver zMQAsyncResponseReceiver = new ZMQAsyncResponseReceiver();
                        this.zmqReceivers.add(zMQAsyncResponseReceiver);
                        this.receiverThreads.add(new Thread(zMQAsyncResponseReceiver));
                    }
                    if (Boolean.parseBoolean(System.getProperty("jaffa.rpc.protocol.zmq.curve.enabled", String.valueOf(false)))) {
                        CurveUtils.readClientKeys();
                        CurveUtils.readServerKeys();
                        break;
                    }
                    break;
                case HTTP:
                    HttpAsyncAndSyncRequestReceiver.initClient();
                    if (this.serverEndpoints.getEndpoints().length != 0) {
                        HttpAsyncAndSyncRequestReceiver httpAsyncAndSyncRequestReceiver = new HttpAsyncAndSyncRequestReceiver();
                        this.zmqReceivers.add(httpAsyncAndSyncRequestReceiver);
                        this.receiverThreads.add(new Thread(httpAsyncAndSyncRequestReceiver));
                    }
                    if (this.clientEndpoints.getEndpoints().length != 0) {
                        HttpAsyncResponseReceiver httpAsyncResponseReceiver = new HttpAsyncResponseReceiver();
                        this.zmqReceivers.add(httpAsyncResponseReceiver);
                        this.receiverThreads.add(new Thread(httpAsyncResponseReceiver));
                        break;
                    }
                    break;
                case RABBIT:
                    if (this.serverEndpoints.getEndpoints().length != 0) {
                        RabbitMQAsyncAndSyncRequestReceiver rabbitMQAsyncAndSyncRequestReceiver = new RabbitMQAsyncAndSyncRequestReceiver();
                        this.zmqReceivers.add(rabbitMQAsyncAndSyncRequestReceiver);
                        this.receiverThreads.add(new Thread(rabbitMQAsyncAndSyncRequestReceiver));
                    }
                    if (this.clientEndpoints.getEndpoints().length != 0) {
                        RabbitMQAsyncResponseReceiver rabbitMQAsyncResponseReceiver = new RabbitMQAsyncResponseReceiver();
                        rabbitMQAsyncResponseReceiver.setContext(this.context);
                        this.zmqReceivers.add(rabbitMQAsyncResponseReceiver);
                        this.receiverThreads.add(new Thread(rabbitMQAsyncResponseReceiver));
                    }
                    RabbitMQRequestSender.init();
                    break;
                default:
                    throw new JaffaRpcSystemException(JaffaRpcSystemException.NO_PROTOCOL_DEFINED);
            }
            this.receiverThreads.forEach((v0) -> {
                v0.start();
            });
            if (i != 0) {
                countDownLatch.await();
            }
            registerServices();
            FinalizationWorker.startFinalizer(this.context);
            log.info("\n    .---.                                             \n    |   |                                               \n    '---'                                               \n    .---.                 _.._       _.._               \n    |   |               .' .._|    .' .._|              \n    |   |     __        | '        | '         __       \n    |   |  .:--.'.    __| |__    __| |__    .:--.'.     \n    |   | / |   \\ |  |__   __|  |__   __|  / |   \\ |  \n    |   | `\" __ | |     | |        | |     `\" __ | |  \n    |   |  .'.''| |     | |        | |      .'.''| |    \n __.'   ' / /   | |_    | |        | |     / /   | |_   \n|      '  \\ \\._,\\ '/    | |        | |     \\ \\._,\\ '/ \n|____.'    `--'  `\"     |_|        |_|      `--'  `\"  \n                                       STARTED IN {} MS \n", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } catch (Exception e) {
            log.error("Exception during Jaffa RPC library startup:", e);
            throw new JaffaRpcSystemException(e);
        }
    }

    public void close() {
        log.info("Close started");
        this.kafkaReceivers.forEach((v0) -> {
            v0.close();
        });
        log.info("Kafka receivers closed");
        KafkaRequestSender.shutDownConsumers();
        log.info("Kafka sync response consumers closed");
        if (Utils.getConn() != null) {
            try {
                Iterator<String> it = Utils.getServices().iterator();
                while (it.hasNext()) {
                    Utils.deleteAllRegistrations(it.next());
                }
                if (Utils.getConn() != null) {
                    Utils.getConn().close();
                }
                Utils.setConn(null);
            } catch (KeeperException | InterruptedException | ParseException | UnknownHostException e) {
                log.error("Unable to unregister services from ZooKeeper cluster, probably it was done earlier");
            }
        }
        log.info("Services were unregistered");
        this.zmqReceivers.forEach(closeable -> {
            try {
                closeable.close();
            } catch (IOException e2) {
                log.error("Unable to shut down ZeroMQ receivers", e2);
                throw new JaffaRpcSystemException(e2);
            }
        });
        ZContext zContext = ZeroMqRequestSender.context;
        if (!zContext.isClosed()) {
            zContext.close();
        }
        RabbitMQRequestSender.close();
        log.info("All ZMQ sockets were closed");
        for (Thread thread : this.receiverThreads) {
            do {
                thread.interrupt();
            } while (thread.getState() != Thread.State.TERMINATED);
        }
        log.info("All receiver threads stopped");
        FinalizationWorker.stopFinalizer();
        log.info("Finalizer was stopped");
        log.info("Jaffa RPC shutdown completed");
    }

    public static Properties getProducerProps() {
        return producerProps;
    }

    public static Properties getConsumerProps() {
        return consumerProps;
    }

    public static KafkaZkClient getZkClient() {
        return zkClient;
    }

    private static void setZkClient(KafkaZkClient kafkaZkClient) {
        zkClient = kafkaZkClient;
    }

    public static int getBrokersCount() {
        return brokersCount;
    }

    private static void setBrokersCount(int i) {
        brokersCount = i;
    }

    public static Set<String> getServerAsyncTopics() {
        return serverAsyncTopics;
    }

    private static void setServerAsyncTopics(Set<String> set) {
        serverAsyncTopics = set;
    }

    public static Set<String> getClientAsyncTopics() {
        return clientAsyncTopics;
    }

    private static void setClientAsyncTopics(Set<String> set) {
        clientAsyncTopics = set;
    }

    public static Set<String> getServerSyncTopics() {
        return serverSyncTopics;
    }

    private static void setServerSyncTopics(Set<String> set) {
        serverSyncTopics = set;
    }

    public static Set<String> getClientSyncTopics() {
        return clientSyncTopics;
    }

    private static void setClientSyncTopics(Set<String> set) {
        clientSyncTopics = set;
    }

    private static void setAdminZkClient(AdminZkClient adminZkClient2) {
        adminZkClient = adminZkClient2;
    }

    private static void setAdminRabbitMQ(RabbitAdmin rabbitAdmin) {
        adminRabbitMQ = rabbitAdmin;
    }

    private static void setConnectionFactory(ConnectionFactory connectionFactory2) {
        connectionFactory = connectionFactory2;
    }

    public static ConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }
}
