package com.jaffa.rpc.lib.zeromq.receivers;

import com.jaffa.rpc.lib.common.RequestInvoker;
import com.jaffa.rpc.lib.entities.Command;
import com.jaffa.rpc.lib.entities.RequestContext;
import com.jaffa.rpc.lib.exception.JaffaRpcExecutionException;
import com.jaffa.rpc.lib.exception.JaffaRpcSystemException;
import com.jaffa.rpc.lib.serialization.Serializer;
import com.jaffa.rpc.lib.zeromq.CurveUtils;
import com.jaffa.rpc.lib.zeromq.ZeroMqRequestSender;
import com.jaffa.rpc.lib.zookeeper.Utils;
import java.io.Closeable;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZAuth;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import zmq.ZError;

/* loaded from: input_file:com/jaffa/rpc/lib/zeromq/receivers/ZMQAsyncAndSyncRequestReceiver.class */
public class ZMQAsyncAndSyncRequestReceiver implements Runnable, Closeable {
    private static final Logger log = LoggerFactory.getLogger(ZMQAsyncAndSyncRequestReceiver.class);
    private static final ExecutorService service = Executors.newFixedThreadPool(3);
    private ZContext context;
    private ZAuth auth;

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.context = new ZContext(10);
            this.context.setLinger(0);
            if (Boolean.parseBoolean(System.getProperty("jaffa.rpc.protocol.zmq.curve.enabled", String.valueOf(false)))) {
                this.auth = new ZAuth(this.context);
                this.auth.setVerbose(true);
                this.auth.configureCurve(Utils.getRequiredOption("jaffa.rpc.protocol.zmq.client.dir"));
            }
            ZMQ.Socket createSocket = this.context.createSocket(SocketType.REP);
            CurveUtils.makeSocketSecure(createSocket);
            createSocket.bind("tcp://" + Utils.getZeroMQBindAddress());
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Command command = (Command) Serializer.deserialize(createSocket.recv(), Command.class);
                    if (command.getCallbackKey() != null && command.getCallbackClass() != null) {
                        createSocket.send("OK");
                    }
                    if (command.getCallbackKey() == null || command.getCallbackClass() == null) {
                        RequestContext.setMetaData(command);
                        Object invoke = RequestInvoker.invoke(command);
                        RequestContext.removeMetaData();
                        createSocket.send(Serializer.serializeWithClass(RequestInvoker.getResult(invoke)));
                    } else {
                        service.execute(() -> {
                            try {
                                RequestContext.setMetaData(command);
                                Object invoke2 = RequestInvoker.invoke(command);
                                RequestContext.removeMetaData();
                                byte[] serialize = Serializer.serialize(RequestInvoker.constructCallbackContainer(command, invoke2));
                                ZMQ.Socket createSocket2 = this.context.createSocket(SocketType.REQ);
                                ZeroMqRequestSender.addCurveKeysToSocket(createSocket2, command.getSourceModuleId());
                                createSocket2.connect("tcp://" + command.getCallBackZMQ());
                                createSocket2.send(serialize);
                                createSocket2.close();
                            } catch (ClassNotFoundException | NoSuchMethodException e) {
                                log.error("Error while receiving async request", e);
                                throw new JaffaRpcExecutionException(e);
                            }
                        });
                    }
                } catch (ZMQException | ZError.IOException e) {
                    if (e.getMessage().contains("Errno 4")) {
                        continue;
                    } else if (!e.getMessage().contains("156384765")) {
                        log.error("General ZMQ exception", e);
                        throw new JaffaRpcSystemException((Throwable) e);
                    }
                }
            }
            log.info("{} terminated", getClass().getSimpleName());
        } catch (UnknownHostException e2) {
            log.error("Error during ZeroMQ request receiver startup:", e2);
            throw new JaffaRpcSystemException(e2);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (Boolean.parseBoolean(System.getProperty("jaffa.rpc.protocol.zmq.curve.enabled", String.valueOf(false)))) {
            try {
                this.auth.close();
            } catch (IOException e) {
                log.error("Error while closing ZeroMQ context", e);
            }
        } else {
            this.context.close();
        }
        service.shutdownNow();
    }
}
