package com.jaffa.rpc.lib.zeromq.receivers;

import com.jaffa.rpc.lib.common.OptionConstants;
import com.jaffa.rpc.lib.common.RequestInvocationHelper;
import com.jaffa.rpc.lib.entities.Command;
import com.jaffa.rpc.lib.exception.JaffaRpcSystemException;
import com.jaffa.rpc.lib.serialization.Serializer;
import com.jaffa.rpc.lib.zeromq.CurveUtils;
import com.jaffa.rpc.lib.zeromq.ZeroMqRequestSender;
import com.jaffa.rpc.lib.zookeeper.Utils;
import java.io.Closeable;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZAuth;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import zmq.ZError;

/* loaded from: input_file:com/jaffa/rpc/lib/zeromq/receivers/ZMQAsyncAndSyncRequestReceiver.class */
public class ZMQAsyncAndSyncRequestReceiver implements Runnable, Closeable {
    private static final Logger log;
    private static final ExecutorService service;
    private final ZContext context;
    private final ZMQ.Socket socket;
    private ZAuth auth;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ZMQAsyncAndSyncRequestReceiver() {
        try {
            this.context = new ZContext(10);
            this.context.setLinger(0);
            if (Boolean.parseBoolean(System.getProperty(OptionConstants.ZMQ_CURVE_ENABLED, String.valueOf(false)))) {
                this.auth = new ZAuth(this.context);
                this.auth.configureCurve(Utils.getRequiredOption(OptionConstants.ZMQ_CLIENT_DIR));
            }
            this.socket = this.context.createSocket(SocketType.REP);
            CurveUtils.makeSocketSecure(this.socket);
            this.socket.bind("tcp://" + Utils.getZeroMQBindAddress());
        } catch (Exception e) {
            log.error("Error during ZeroMQ request receiver startup:", e);
            throw new JaffaRpcSystemException(e);
        }
    }

    public static void checkZMQExceptionAndThrow(Exception exc) {
        if (exc.getMessage().contains("Errno 4") || exc.getMessage().contains("156384765")) {
            return;
        }
        log.error("General ZMQ exception", exc);
        throw new JaffaRpcSystemException(exc);
    }

    public static void destroySocketAndContext(ZContext zContext, ZMQ.Socket socket, Class<?> cls) {
        zContext.destroySocket(socket);
        log.info("{} socket destroyed", cls.getSimpleName());
        zContext.destroy();
        log.info("{} context destroyed", cls.getSimpleName());
    }

    @Override // java.lang.Runnable
    public void run() {
        byte[] recv;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                try {
                    recv = this.socket.recv();
                } catch (Exception e) {
                    log.error("Error while receiving sync request", e);
                }
            } catch (ZMQException | ZError.IOException e2) {
                checkZMQExceptionAndThrow(e2);
            }
            if (Objects.nonNull(recv) && recv.length == 1 && recv[0] == 7) {
                destroySocketAndContext(this.context, this.socket, ZMQAsyncAndSyncRequestReceiver.class);
                break;
            }
            Command command = (Command) Serializer.getCurrent().deserialize(recv, Command.class);
            if (Objects.nonNull(command.getCallbackKey()) && Objects.nonNull(command.getCallbackClass())) {
                this.socket.send("OK");
                service.execute(() -> {
                    try {
                        byte[] serialize = Serializer.getCurrent().serialize(RequestInvocationHelper.constructCallbackContainer(command, RequestInvocationHelper.invoke(command)));
                        log.debug("Async response to request {} is ready", command.getCallbackKey());
                        ZMQ.Socket createSocket = this.context.createSocket(SocketType.REQ);
                        ZeroMqRequestSender.addCurveKeysToSocket(createSocket, command.getSourceModuleId());
                        createSocket.connect("tcp://" + command.getCallBackHost());
                        createSocket.send(serialize, 0);
                        byte[] recv2 = createSocket.recv(0);
                        if (!$assertionsDisabled && recv2[0] != 4) {
                            throw new AssertionError();
                        }
                        this.context.destroySocket(createSocket);
                    } catch (Exception e3) {
                        log.error("Error while receiving async request", e3);
                    }
                });
            } else {
                this.socket.send(Serializer.getCurrent().serializeWithClass(RequestInvocationHelper.getResult(RequestInvocationHelper.invoke(command))));
            }
        }
        log.info("{} terminated", getClass().getSimpleName());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws UnknownHostException {
        ZMQAsyncResponseReceiver.sendKillMessageToSocket(Utils.getZeroMQBindAddress());
        if (Boolean.parseBoolean(System.getProperty(OptionConstants.ZMQ_CURVE_ENABLED, String.valueOf(false)))) {
            try {
                this.auth.close();
            } catch (IOException e) {
                log.error("Error while closing ZeroMQ context", e);
            }
        }
        service.shutdownNow();
        log.info("ZMQAsyncAndSyncRequestReceiver closed");
    }

    static {
        $assertionsDisabled = !ZMQAsyncAndSyncRequestReceiver.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(ZMQAsyncAndSyncRequestReceiver.class);
        service = Executors.newFixedThreadPool(3);
    }
}
