package org.mixql.engine.core;

import com.github.nscala_time.time.Imports$;
import com.github.nscala_time.time.RichReadableInstant$;
import com.github.nscala_time.time.RichReadableInterval$;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.joda.time.DateTime;
import org.mixql.engine.core.logger.ModuleLogger;
import org.mixql.remote.RemoteMessageConverter;
import org.mixql.remote.messages.Message;
import org.mixql.remote.messages.broker.IBrokerSender;
import org.mixql.remote.messages.broker.PlatformPongHeartBeat;
import org.mixql.remote.messages.client.Execute;
import org.mixql.remote.messages.client.ExecuteFunction;
import org.mixql.remote.messages.client.GetDefinedFunctions;
import org.mixql.remote.messages.client.IModuleReceiver;
import org.mixql.remote.messages.client.IWorkerReceiver;
import org.mixql.remote.messages.client.ShutDown;
import org.mixql.remote.messages.module.ExecuteResult;
import org.mixql.remote.messages.module.ExecuteResultFailed;
import org.mixql.remote.messages.module.ExecutedFunctionResult;
import org.mixql.remote.messages.module.ExecutedFunctionResultFailed;
import org.mixql.remote.messages.module.GetDefinedFunctionsError;
import org.mixql.remote.messages.module.IModuleSendToClient;
import org.mixql.remote.messages.module.toBroker.EngineFailed;
import org.mixql.remote.messages.module.toBroker.EngineIsReady;
import org.mixql.remote.messages.module.toBroker.EnginePingHeartBeat;
import org.mixql.remote.messages.module.toBroker.IBrokerReceiverFromModule;
import org.mixql.remote.messages.module.worker.IWorkerSendToClient;
import org.mixql.remote.messages.module.worker.SendMsgToPlatform;
import org.mixql.remote.messages.module.worker.WorkerFinished;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import scala.Function2;
import scala.Function3;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Iterable;
import scala.collection.StringOps$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.util.Failure;
import scala.util.Random$;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;
import scala.util.matching.Regex;

/* compiled from: Module.scala */
@ScalaSignature(bytes = "\u0006\u0005\t5e\u0001B\u001a5\u0001uB\u0001\u0002\u0012\u0001\u0003\u0002\u0003\u0006I!\u0012\u0005\t\u0013\u0002\u0011\t\u0011)A\u0005\u0015\"AQ\u000b\u0001B\u0001B\u0003%!\n\u0003\u0005W\u0001\t\u0005\t\u0015!\u0003X\u0011!Q\u0006A!A!\u0002\u0017Y\u0006\"\u00021\u0001\t\u0003\t\u0007bB5\u0001\u0005\u0004%\tA\u001b\u0005\u0007i\u0002\u0001\u000b\u0011B6\t\u000fU\u0004\u0001\u0019!C\u0001m\"I\u00111\u0001\u0001A\u0002\u0013\u0005\u0011Q\u0001\u0005\b\u0003#\u0001\u0001\u0015)\u0003x\u0011%\t\u0019\u0002\u0001a\u0001\n\u0007\t)\u0002C\u0005\u0002\u001e\u0001\u0001\r\u0011\"\u0001\u0002 !A\u00111\u0005\u0001!B\u0013\t9\u0002C\u0005\u0002&\u0001\u0001\r\u0011\"\u0001\u0002(!I\u0011q\u0006\u0001A\u0002\u0013\u0005\u0011\u0011\u0007\u0005\t\u0003k\u0001\u0001\u0015)\u0003\u0002*!I\u0011q\u0007\u0001A\u0002\u0013\u0005\u0011q\u0005\u0005\n\u0003s\u0001\u0001\u0019!C\u0001\u0003wA\u0001\"a\u0010\u0001A\u0003&\u0011\u0011\u0006\u0005\n\u0003\u0003\u0002!\u0019!C\u0001\u0003\u0007B\u0001\"a\u0013\u0001A\u0003%\u0011Q\t\u0005\n\u0003\u001b\u0002!\u0019!C\u0001\u0003\u0007B\u0001\"a\u0014\u0001A\u0003%\u0011Q\t\u0005\n\u0003#\u0002!\u0019!C\u0005\u0003\u0007B\u0001\"a\u0015\u0001A\u0003%\u0011Q\t\u0005\n\u0003+\u0002\u0001\u0019!C\u0005\u0003/B\u0011\"!\"\u0001\u0001\u0004%I!a\"\t\u0011\u0005-\u0005\u0001)Q\u0005\u00033B\u0011\"!$\u0001\u0005\u0004%I!a$\t\u000f\u0005E\u0005\u0001)A\u0005/\"I\u00111\u0013\u0001A\u0002\u0013%\u0011q\u0012\u0005\n\u0003+\u0003\u0001\u0019!C\u0005\u0003/Cq!a'\u0001A\u0003&q\u000bC\u0004\u0002\u001e\u0002!\t!a(\t\u000f\u0005\u0005\u0006\u0001\"\u0003\u0002$\"9\u0011\u0011\u0018\u0001\u0005\n\u0005m\u0006bBAf\u0001\u0011\u0005\u0011Q\u001a\u0005\b\u0003?\u0004A\u0011AAq\u0011\u001d\tY\u000f\u0001C\u0001\u0003[Dq!a>\u0001\t\u0003\tI\u0010C\u0004\u00032\u0001!\tAa\r\t\u000f\t%\u0003\u0001\"\u0001\u0003L!9!\u0011\f\u0001\u0005\u0002\tm\u0003\"\u0003B0\u0001\t\u0007I\u0011\u0001B1\u0011!\u0011\u0019\b\u0001Q\u0001\n\t\r\u0004\"\u0003B;\u0001\t\u0007I\u0011\u0001B<\u0011!\u0011)\t\u0001Q\u0001\n\te\u0004b\u0002BD\u0001\u0011\u0005!\u0011\u0012\u0005\b\u0005\u0017\u0003A\u0011AAP\u0005\u0019iu\u000eZ;mK*\u0011QGN\u0001\u0005G>\u0014XM\u0003\u00028q\u00051QM\\4j]\u0016T!!\u000f\u001e\u0002\u000b5L\u00070\u001d7\u000b\u0003m\n1a\u001c:h\u0007\u0001\u0019\"\u0001\u0001 \u0011\u0005}\u0012U\"\u0001!\u000b\u0003\u0005\u000bQa]2bY\u0006L!a\u0011!\u0003\r\u0005s\u0017PU3g\u0003!)\u00070Z2vi>\u0014\bC\u0001$H\u001b\u0005!\u0014B\u0001%5\u0005=IUj\u001c3vY\u0016,\u00050Z2vi>\u0014\u0018\u0001C5eK:$\u0018\u000e^=\u0011\u0005-\u0013fB\u0001'Q!\ti\u0005)D\u0001O\u0015\tyE(\u0001\u0004=e>|GOP\u0005\u0003#\u0002\u000ba\u0001\u0015:fI\u00164\u0017BA*U\u0005\u0019\u0019FO]5oO*\u0011\u0011\u000bQ\u0001\u0005Q>\u001cH/\u0001\u0003q_J$\bCA Y\u0013\tI\u0006IA\u0002J]R\fa\u0001\\8hO\u0016\u0014\bC\u0001/_\u001b\u0005i&B\u0001.5\u0013\tyVL\u0001\u0007N_\u0012,H.\u001a'pO\u001e,'/\u0001\u0004=S:LGO\u0010\u000b\u0006E\u00164w\r\u001b\u000b\u0003G\u0012\u0004\"A\u0012\u0001\t\u000bi3\u00019A.\t\u000b\u00113\u0001\u0019A#\t\u000b%3\u0001\u0019\u0001&\t\u000bU3\u0001\u0019\u0001&\t\u000bY3\u0001\u0019A,\u0002\r\r|gNZ5h+\u0005Y\u0007C\u00017s\u001b\u0005i'BA5o\u0015\ty\u0007/\u0001\u0005usB,7/\u00194f\u0015\u0005\t\u0018aA2p[&\u00111/\u001c\u0002\u0007\u0007>tg-[4\u0002\u000f\r|gNZ5hA\u0005\u00191\r\u001e=\u0016\u0003]\u0004\"\u0001\u001f@\u000f\u0005edX\"\u0001>\u000b\u0005mT\u0014A\u0002>fe>l\u0017/\u0003\u0002~u\u0006\u0019!,T)\n\u0007}\f\tAA\u0004D_:$X\r\u001f;\u000b\u0005uT\u0018aB2uq~#S-\u001d\u000b\u0005\u0003\u000f\ti\u0001E\u0002@\u0003\u0013I1!a\u0003A\u0005\u0011)f.\u001b;\t\u0011\u0005=!\"!AA\u0002]\f1\u0001\u001f\u00132\u0003\u0011\u0019G\u000f\u001f\u0011\u0002\rM,'O^3s+\t\t9\u0002E\u0002y\u00033IA!a\u0007\u0002\u0002\t11k\\2lKR\f!b]3sm\u0016\u0014x\fJ3r)\u0011\t9!!\t\t\u0013\u0005=Q\"!AA\u0002\u0005]\u0011aB:feZ,'\u000fI\u0001\u0007a>dG.\u001a:\u0016\u0005\u0005%\u0002c\u0001=\u0002,%!\u0011QFA\u0001\u0005\u0019\u0001v\u000e\u001c7fe\u0006Q\u0001o\u001c7mKJ|F%Z9\u0015\t\u0005\u001d\u00111\u0007\u0005\n\u0003\u001f\u0001\u0012\u0011!a\u0001\u0003S\tq\u0001]8mY\u0016\u0014\b%\u0001\u0007x_J\\WM\u001d)pY2,'/\u0001\tx_J\\WM\u001d)pY2,'o\u0018\u0013fcR!\u0011qAA\u001f\u0011%\tyaEA\u0001\u0002\u0004\tI#A\u0007x_J\\WM\u001d)pY2,'\u000fI\u0001\u000ea>dG.\u001a:US6,w.\u001e;\u0016\u0005\u0005\u0015\u0003cA \u0002H%\u0019\u0011\u0011\n!\u0003\t1{gnZ\u0001\u000fa>dG.\u001a:US6,w.\u001e;!\u0003M9xN]6feB{G\u000e\\3s)&lWm\\;u\u0003Q9xN]6feB{G\u000e\\3s)&lWm\\;uA\u0005\t\u0002.Z1si\n+\u0017\r^%oi\u0016\u0014h/\u00197\u0002%!,\u0017M\u001d;CK\u0006$\u0018J\u001c;feZ\fG\u000eI\u0001\raJ|7-Z:t'R\f'\u000f^\u000b\u0003\u00033\u0002B!a\u0017\u0002~9!\u0011QLA<\u001d\u0011\ty&!\u001d\u000f\t\u0005\u0005\u00141\u000e\b\u0005\u0003G\n9GD\u0002N\u0003KJ\u0011!]\u0005\u0004\u0003S\u0002\u0018AB4ji\",(-\u0003\u0003\u0002n\u0005=\u0014a\u00038tG\u0006d\u0017m\u0018;j[\u0016T1!!\u001bq\u0013\u0011\t\u0019(!\u001e\u0002\tQLW.\u001a\u0006\u0005\u0003[\ny'\u0003\u0003\u0002z\u0005m\u0014aB%na>\u0014Ho\u001d\u0006\u0005\u0003g\n)(\u0003\u0003\u0002��\u0005\u0005%\u0001\u0003#bi\u0016$\u0016.\\3\n\t\u0005\r\u00151\u0010\u0002\f)f\u0004X-S7q_J$8/\u0001\tqe>\u001cWm]:Ti\u0006\u0014Ho\u0018\u0013fcR!\u0011qAAE\u0011%\ty\u0001HA\u0001\u0002\u0004\tI&A\u0007qe>\u001cWm]:Ti\u0006\u0014H\u000fI\u0001\rY&4XM\\3tg&s\u0017\u000e^\u000b\u0002/\u0006iA.\u001b<f]\u0016\u001c8/\u00138ji\u0002\n\u0001\u0002\\5wK:,7o]\u0001\rY&4XM\\3tg~#S-\u001d\u000b\u0005\u0003\u000f\tI\n\u0003\u0005\u0002\u0010\u0005\n\t\u00111\u0001X\u0003%a\u0017N^3oKN\u001c\b%A\u0006ti\u0006\u0014HoU3sm\u0016\u0014HCAA\u0004\u0003a\u0011X-Y2u\u001f:\u0014VmY3jm\u0016$'I]8lKJl5o\u001a\u000b\u0005\u0003\u000f\t)\u000bC\u0004\u0002(\u0012\u0002\r!!+\u0002\u000f5,7o]1hKB!\u00111VA[\u001b\t\tiK\u0003\u0003\u00020\u0006E\u0016\u0001C7fgN\fw-Z:\u000b\u0007\u0005M\u0006(\u0001\u0004sK6|G/Z\u0005\u0005\u0003o\u000biKA\u0004NKN\u001c\u0018mZ3\u00027I,\u0017m\u0019;P]J+7-Z5wK\u0012l5o\u001a$pe\u0016sw-\u001b8f)\u0011\t9!!0\t\u000f\u0005\u001dV\u00051\u0001\u0002@B!\u0011\u0011YAd\u001b\t\t\u0019M\u0003\u0003\u0002F\u00065\u0016AB2mS\u0016tG/\u0003\u0003\u0002J\u0006\r'aD%N_\u0012,H.\u001a*fG\u0016Lg/\u001a:\u0002'M,g\u000eZ'fgN\fw-\u001a+p/>\u00148.\u001a:\u0015\t\u0005=\u0017Q\u001b\t\u0004\u007f\u0005E\u0017bAAj\u0001\n9!i\\8mK\u0006t\u0007bBAlM\u0001\u0007\u0011\u0011\\\u0001\u0004[N<\u0007\u0003BAa\u00037LA!!8\u0002D\ny\u0011jV8sW\u0016\u0014(+Z2fSZ,'/\u0001\u000esK\u0006\u001cGo\u00148Fq\u0016\u001cW\u000f^3NKN\u001c\u0018mZ3Bgft7\r\u0006\u0003\u0002\b\u0005\r\bbBAlO\u0001\u0007\u0011Q\u001d\t\u0005\u0003\u0003\f9/\u0003\u0003\u0002j\u0006\r'aB#yK\u000e,H/Z\u0001#e\u0016\f7\r^(o\u000bb,7-\u001e;f\rVt7\r^5p]6+7o]1hK\u0006\u001b\u0018P\\2\u0015\t\u0005\u001d\u0011q\u001e\u0005\b\u0003/D\u0003\u0019AAy!\u0011\t\t-a=\n\t\u0005U\u00181\u0019\u0002\u0010\u000bb,7-\u001e;f\rVt7\r^5p]\u0006I\"/Z1di>s'+Z7pi\u0016lUm]:bO\u0016\f5/\u001f8d))\t9!a?\u0002��\n=!\u0011\u0004\u0005\u0007\u0003{L\u0003\u0019\u0001&\u0002\u001b\rd\u0017.\u001a8u\u0003\u0012$'/Z:t\u0011\u001d\u0011\t!\u000ba\u0001\u0005\u0007\t1\"\u001a=fGV$XMR;oGBAqH!\u0002K\u0005\u0013\tI+C\u0002\u0003\b\u0001\u0013\u0011BR;oGRLwN\u001c\u001a\u0011\u0007\u0019\u0013Y!C\u0002\u0003\u000eQ\u0012q\u0002\u00157bi\u001a|'/\\\"p]R,\u0007\u0010\u001e\u0005\b\u0005#I\u0003\u0019\u0001B\n\u0003%ygnU;dG\u0016\u001c8\u000f\u0005\u0006@\u0005+\tI+a\u0006K\u0003\u000fI1Aa\u0006A\u0005%1UO\\2uS>t7\u0007C\u0004\u0003\u001c%\u0002\rA!\b\u0002\u0013=tg)Y5mkJ,\u0007CC \u0003\u0016\t}\u0011q\u0003&\u0002\bA!!\u0011\u0005B\u0016\u001d\u0011\u0011\u0019Ca\n\u000f\u00075\u0013)#C\u0001B\u0013\r\u0011I\u0003Q\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\u0011iCa\f\u0003\u0013QC'o\\<bE2,'b\u0001B\u0015\u0001\u000692/\u001a8e\u001bN<Gk\u001c)mCR4wN]7Ce>\\WM\u001d\u000b\u0007\u0003\u001f\u0014)Da\u0012\t\u000f\u0005]'\u00061\u0001\u00038A!!\u0011\bB\"\u001b\t\u0011YD\u0003\u0003\u0003>\t}\u0012\u0001\u0003;p\u0005J|7.\u001a:\u000b\t\t\u0005\u0013QV\u0001\u0007[>$W\u000f\\3\n\t\t\u0015#1\b\u0002\u001a\u0013\n\u0013xn[3s%\u0016\u001cW-\u001b<fe\u001a\u0013x.\\'pIVdW\rC\u0003[U\u0001\u00071,A\btK:$Wj]4U_\u000ec\u0017.\u001a8u)\u0019\tyM!\u0014\u0003X!9\u0011q[\u0016A\u0002\t=\u0003\u0003\u0002B)\u0005'j!Aa\u0010\n\t\tU#q\b\u0002\u0014\u00136{G-\u001e7f'\u0016tG\rV8DY&,g\u000e\u001e\u0005\u00065.\u0002\raW\u0001\u0018e\u0016\fG-T:h\rJ|WnU3sm\u0016\u0014(I]8lKJ$B!!+\u0003^!)!\f\fa\u00017\u0006Qqo\u001c:lKJ\u001cX*\u00199\u0016\u0005\t\r\u0004c\u0002B3\u0005_R\u0015qC\u0007\u0003\u0005ORAA!\u001b\u0003l\u00059Q.\u001e;bE2,'b\u0001B7\u0001\u0006Q1m\u001c7mK\u000e$\u0018n\u001c8\n\t\tE$q\r\u0002\u0004\u001b\u0006\u0004\u0018aC<pe.,'o]'ba\u0002\n\u0011A]\u000b\u0003\u0005srAAa\u001f\u0003\u00026\u0011!Q\u0010\u0006\u0004\u0005\u007f\u0002\u0015\u0001B;uS2LAAa!\u0003~\u00051!+\u00198e_6\f!A\u001d\u0011\u00023\u001d,g.\u001a:bi\u0016,f.^:fI^{'o[3sg:\u000bW.\u001a\u000b\u0002\u0015\u0006)1\r\\8tK\u0002")
/* loaded from: input_file:org/mixql/engine/core/Module.class */
public class Module {
    private final IModuleExecutor executor;
    private final String identity;
    private final String host;
    private final int port;
    private final ModuleLogger logger;
    private final Config config = ConfigFactory.load();
    private ZMQ.Context ctx = null;
    private ZMQ.Socket server = null;
    private ZMQ.Poller poller = null;
    private ZMQ.Poller workerPoller = null;
    private final long pollerTimeout = BoxesRunTime.unboxToLong(Try$.MODULE$.apply(() -> {
        return this.config().getLong("org.mixql.engine.module.pollerTimeout");
    }).getOrElse(() -> {
        return 100L;
    }));
    private final long workerPollerTimeout = BoxesRunTime.unboxToLong(Try$.MODULE$.apply(() -> {
        return this.config().getLong("org.mixql.engine.module.workerPollerTimeout");
    }).getOrElse(() -> {
        return 95L;
    }));
    private final long heartBeatInterval = BoxesRunTime.unboxToLong(Try$.MODULE$.apply(() -> {
        return this.config().getLong("org.mixql.engine.module.heartBeatInterval");
    }).getOrElse(() -> {
        return 16500L;
    }));
    private DateTime processStart = null;
    private final int livenessInit = BoxesRunTime.unboxToInt(Try$.MODULE$.apply(() -> {
        return this.config().getInt("org.mixql.engine.module.liveness");
    }).getOrElse(() -> {
        return 3;
    }));
    private int liveness = livenessInit();
    private final Map<String, ZMQ.Socket> workersMap = (Map) Map$.MODULE$.apply(Nil$.MODULE$);
    private final Random$ r = Random$.MODULE$;

    public Config config() {
        return this.config;
    }

    public ZMQ.Context ctx() {
        return this.ctx;
    }

    public void ctx_$eq(ZMQ.Context context) {
        this.ctx = context;
    }

    public ZMQ.Socket server() {
        return this.server;
    }

    public void server_$eq(ZMQ.Socket socket) {
        this.server = socket;
    }

    public ZMQ.Poller poller() {
        return this.poller;
    }

    public void poller_$eq(ZMQ.Poller poller) {
        this.poller = poller;
    }

    public ZMQ.Poller workerPoller() {
        return this.workerPoller;
    }

    public void workerPoller_$eq(ZMQ.Poller poller) {
        this.workerPoller = poller;
    }

    public long pollerTimeout() {
        return this.pollerTimeout;
    }

    public long workerPollerTimeout() {
        return this.workerPollerTimeout;
    }

    private long heartBeatInterval() {
        return this.heartBeatInterval;
    }

    private DateTime processStart() {
        return this.processStart;
    }

    private void processStart_$eq(DateTime dateTime) {
        this.processStart = dateTime;
    }

    private int livenessInit() {
        return this.livenessInit;
    }

    private int liveness() {
        return this.liveness;
    }

    private void liveness_$eq(int i) {
        this.liveness = i;
    }

    public void startServer() {
        this.logger.logInfo("Starting main client");
        this.logger.logInfo(new StringBuilder(13).append("host of server is ").append(this.host).append(" and port is ").append(Integer.toString(this.port)).toString());
        try {
            try {
                ctx_$eq(ZMQ.context(1));
                server_$eq(ctx().socket(SocketType.DEALER));
                server().setIdentity(this.identity.getBytes());
                this.logger.logInfo(new StringBuilder(0).append("connected: ").append(server().connect(new StringBuilder(7).append("tcp://").append(this.host).append(":").append(Integer.toString(this.port)).toString())).toString());
                this.logger.logInfo("Connection established.");
                this.logger.logDebug("Setting processStart for timer");
                processStart_$eq(Imports$.MODULE$.DateTime().now());
                this.logger.logInfo("Setting poller");
                poller_$eq(ctx().poller(1));
                this.logger.logInfo("Setting workers poller");
                workerPoller_$eq(ctx().poller(14));
                this.logger.logInfo("Register server's socket pollin in poller");
                int register = poller().register(server(), 1);
                this.logger.logInfo("Sending READY message to server's broker");
                sendMsgToPlatformBroker(new EngineIsReady(this.identity), this.logger);
                while (true) {
                    poller().poll(pollerTimeout());
                    int i = -1;
                    if (workerPoller().getSize() != 0) {
                        i = workerPoller().poll(workerPollerTimeout());
                    }
                    if (poller().pollin(register)) {
                        this.logger.logDebug("Setting processStart for timer, as message was received");
                        Message readMsgFromServerBroker = readMsgFromServerBroker(this.logger);
                        if (readMsgFromServerBroker instanceof IBrokerSender) {
                            this.logger.logDebug("got broker's service message");
                            reactOnReceivedBrokerMsg((IBrokerSender) readMsgFromServerBroker);
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        } else {
                            if (!(readMsgFromServerBroker instanceof IModuleReceiver)) {
                                throw new MatchError(readMsgFromServerBroker);
                            }
                            reactOnReceivedMsgForEngine((IModuleReceiver) readMsgFromServerBroker);
                            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                        }
                        processStart_$eq(Imports$.MODULE$.DateTime().now());
                        liveness_$eq(livenessInit());
                    } else {
                        long millis$extension = RichReadableInterval$.MODULE$.millis$extension(Imports$.MODULE$.richReadableInterval(RichReadableInstant$.MODULE$.to$extension(Imports$.MODULE$.richReadableInstant(processStart()), Imports$.MODULE$.DateTime().now())));
                        this.logger.logDebug(new StringBuilder(0).append("elapsed: ").append(millis$extension).toString());
                        if (millis$extension >= heartBeatInterval()) {
                            processStart_$eq(Imports$.MODULE$.DateTime().now());
                            this.logger.logDebug(new StringBuilder(0).append("heartbeat work. Sending heart beat. Liveness: ").append(liveness()).toString());
                            sendMsgToPlatformBroker(new EnginePingHeartBeat(this.identity), this.logger);
                            liveness_$eq(liveness() - 1);
                            this.logger.logDebug(new StringBuilder(0).append("heartbeat work. After sending heart beat. Liveness: ").append(liveness()).toString());
                        }
                        if (liveness() < 0) {
                            this.logger.logError("heartbeat failure, can't reach server's broker. Shutting down");
                            throw new BrakeException();
                        }
                    }
                    if (i > 0) {
                        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), workerPoller().getSize()).foreach(obj -> {
                            return $anonfun$startServer$1(this, BoxesRunTime.unboxToInt(obj));
                        });
                    }
                }
            } catch (BrakeException unused) {
                this.logger.logDebug("BrakeException");
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                close();
                this.logger.logInfo("Stopped.");
            } catch (Exception e) {
                this.logger.logError(new StringBuilder(0).append("Error: ").append(e.getMessage()).toString());
                BoxesRunTime.boxToBoolean(sendMsgToPlatformBroker(new EngineFailed(this.identity, new StringBuilder(0).append(new StringBuilder(32).append("Module ").append(this.identity).append(" to broker: fatal error: ").toString()).append(e.getMessage()).toString()), this.logger));
                close();
                this.logger.logInfo("Stopped.");
            }
        } catch (Throwable th) {
            close();
            throw th;
        }
    }

    private void reactOnReceivedBrokerMsg(Message message) {
        if (!(message instanceof PlatformPongHeartBeat)) {
            throw new MatchError(message);
        }
        this.logger.logDebug("got pong heart beat message from broker server");
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    private void reactOnReceivedMsgForEngine(IModuleReceiver iModuleReceiver) {
        BoxedUnit boxedUnit;
        if (iModuleReceiver instanceof Execute) {
            reactOnExecuteMessageAsync((Execute) iModuleReceiver);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        if (iModuleReceiver instanceof ShutDown) {
            this.logger.logInfo("Started shutdown");
            try {
                this.executor.reactOnShutDown(this.identity, iModuleReceiver.clientIdentity(), this.logger);
            } catch (Throwable th) {
                this.logger.logWarn(new StringBuilder(43).append("Warning: error while reacting on shutdown: ").append(th.getMessage()).toString());
            }
            throw new BrakeException();
        }
        if (iModuleReceiver instanceof ExecuteFunction) {
            reactOnExecuteFunctionMessageAsync((ExecuteFunction) iModuleReceiver);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        if (!(iModuleReceiver instanceof GetDefinedFunctions)) {
            if (!(iModuleReceiver instanceof IWorkerReceiver)) {
                throw new MatchError(iModuleReceiver);
            }
            sendMessageToWorker((IWorkerReceiver) iModuleReceiver);
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            return;
        }
        try {
            sendMsgToClient(this.executor.reactOnGetDefinedFunctions(this.identity, iModuleReceiver.clientIdentity(), this.logger), this.logger);
            boxedUnit = BoxedUnit.UNIT;
        } catch (Throwable th2) {
            sendMsgToClient(new GetDefinedFunctionsError(new StringBuilder(15).append(new StringBuilder(44).append("Module ").append(this.identity).append(" to ").append(iModuleReceiver.clientIdentity()).append(": error while reacting on getting").toString()).append(" functions list").append(th2.getMessage()).toString(), iModuleReceiver.clientIdentity()), this.logger);
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public boolean sendMessageToWorker(IWorkerReceiver iWorkerReceiver) {
        String workerIdentity = iWorkerReceiver.workerIdentity();
        this.logger.logInfo(new StringBuilder(20).append(new StringBuilder(51).append("received message ").append(iWorkerReceiver.type()).append(" from platfrom to workers-future-").append(workerIdentity).append(" ").toString()).append("Sending it to worker").toString());
        return ((ZMQ.Socket) workersMap().apply(workerIdentity)).send(iWorkerReceiver.toByteArray());
    }

    public void reactOnExecuteMessageAsync(Execute execute) {
        reactOnRemoteMessageAsync(execute.clientIdentity(), (str, platformContext) -> {
            this.logger.logInfo(new StringBuilder(39).append("[workers-future-").append(str).append("]: triggering onExecute").toString());
            return this.executor.reactOnExecuteAsync(execute, this.identity, execute.clientIdentity(), this.logger, platformContext);
        }, (message, socket, str2) -> {
            $anonfun$reactOnExecuteMessageAsync$2(execute, message, socket, str2);
            return BoxedUnit.UNIT;
        }, (th, socket2, str3) -> {
            $anonfun$reactOnExecuteMessageAsync$3(this, execute, th, socket2, str3);
            return BoxedUnit.UNIT;
        });
    }

    public void reactOnExecuteFunctionMessageAsync(ExecuteFunction executeFunction) {
        reactOnRemoteMessageAsync(executeFunction.clientIdentity(), (str, platformContext) -> {
            this.logger.logInfo(new StringBuilder(47).append("[workers-future-").append(str).append("]: triggering onExecuteFunction").toString());
            return this.executor.reactOnExecuteFunctionAsync(executeFunction, this.identity, executeFunction.clientIdentity(), this.logger, platformContext);
        }, (message, socket, str2) -> {
            $anonfun$reactOnExecuteFunctionMessageAsync$2(executeFunction, message, socket, str2);
            return BoxedUnit.UNIT;
        }, (th, socket2, str3) -> {
            $anonfun$reactOnExecuteFunctionMessageAsync$3(this, executeFunction, th, socket2, str3);
            return BoxedUnit.UNIT;
        });
    }

    public void reactOnRemoteMessageAsync(String str, Function2<String, PlatformContext, Message> function2, Function3<Message, ZMQ.Socket, String, BoxedUnit> function3, Function3<Throwable, ZMQ.Socket, String, BoxedUnit> function32) {
        String generateUnusedWorkersName = generateUnusedWorkersName();
        this.logger.logInfo(new StringBuilder(16).append("Creating worker ").append(generateUnusedWorkersName).toString());
        this.logger.logInfo(new StringBuilder(0).append("Register module's pair socket pollin in workersPoller for worker ").append(generateUnusedWorkersName).toString());
        ZMQ.Socket socket = ctx().socket(SocketType.PAIR);
        workerPoller().register(socket, 1);
        socket.bind(new StringBuilder(9).append("inproc://").append(generateUnusedWorkersName).toString());
        workersMap().put(generateUnusedWorkersName, socket);
        ObjectRef create = ObjectRef.create((Object) null);
        Future$.MODULE$.apply(() -> {
            this.logger.logInfo(new StringBuilder(78).append("[workers-future-").append(generateUnusedWorkersName).append("]: Creating future's pair socket for communicating with module").toString());
            create.elem = this.ctx().socket(SocketType.PAIR);
            this.logger.logInfo(new StringBuilder(57).append("[workers-future-").append(generateUnusedWorkersName).append("]: Bind future's pair socket in inproc://").append(generateUnusedWorkersName).toString());
            ((ZMQ.Socket) create.elem).connect(new StringBuilder(9).append("inproc://").append(generateUnusedWorkersName).toString());
            return (Message) function2.apply(generateUnusedWorkersName, new PlatformContext((ZMQ.Socket) create.elem, generateUnusedWorkersName, str, this.logger));
        }, ExecutionContext$Implicits$.MODULE$.global()).onComplete(r12 -> {
            $anonfun$reactOnRemoteMessageAsync$2(this, function3, create, generateUnusedWorkersName, function32, r12);
            return BoxedUnit.UNIT;
        }, ExecutionContext$Implicits$.MODULE$.global());
    }

    public boolean sendMsgToPlatformBroker(IBrokerReceiverFromModule iBrokerReceiverFromModule, ModuleLogger moduleLogger) {
        moduleLogger.logDebug("sendMsgToPlatformBroker: Send msg to server ");
        return server().send(iBrokerReceiverFromModule.toByteArray());
    }

    public boolean sendMsgToClient(IModuleSendToClient iModuleSendToClient, ModuleLogger moduleLogger) {
        moduleLogger.logDebug("sendMsgToClient: Send msg to server ");
        return server().send(iModuleSendToClient.toByteArray());
    }

    public Message readMsgFromServerBroker(ModuleLogger moduleLogger) {
        moduleLogger.logDebug(new StringBuilder(0).append("readMsgFromServerBroker: received Identity of engine ").append(new String(server().recv(0))).toString());
        byte[] recv = server().recv(0);
        moduleLogger.logDebug(new StringBuilder(0).append("have received message from server: ").append(new String(recv)).toString());
        return RemoteMessageConverter.unpackAnyMsgFromArray(recv);
    }

    public Map<String, ZMQ.Socket> workersMap() {
        return this.workersMap;
    }

    public Random$ r() {
        return this.r;
    }

    public String generateUnusedWorkersName() {
        Regex r$extension = StringOps$.MODULE$.r$extension(Predef$.MODULE$.augmentString("[0-9]+"));
        Iterable iterable = (Iterable) workersMap().keys().map(str -> {
            return BoxesRunTime.boxToInteger($anonfun$generateUnusedWorkersName$1(r$extension, str));
        });
        boolean z = false;
        IntRef create = IntRef.create(-1);
        while (!z) {
            create.elem = RichInt$.MODULE$.abs$extension(Predef$.MODULE$.intWrapper(r().nextInt()));
            Option find = iterable.find(i -> {
                return i == create.elem;
            });
            if (find instanceof Some) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                if (!None$.MODULE$.equals(find)) {
                    throw new MatchError(find);
                }
                z = true;
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }
        return new StringBuilder(6).append("worker").append(create.elem).toString();
    }

    public void close() {
        Try$.MODULE$.apply(() -> {
            if (this.server() != null) {
                this.logger.logInfo("finally close server");
                this.server().close();
            }
        });
        if (workersMap().nonEmpty()) {
            workersMap().foreach(tuple2 -> {
                return Try$.MODULE$.apply(() -> {
                    ((ZMQ.Socket) tuple2._2()).close();
                });
            });
        }
        Try$.MODULE$.apply(() -> {
            if (this.poller() != null) {
                this.logger.logInfo("finally close poller");
                this.poller().close();
            }
        });
        Try$.MODULE$.apply(() -> {
            if (this.workerPoller() != null) {
                this.logger.logInfo("finally close workerPoller");
                this.workerPoller().close();
            }
        });
        try {
            if (ctx() != null) {
                this.logger.logInfo("finally close context");
                ctx().close();
            }
        } catch (Throwable unused) {
            this.logger.logError("tiemout of closing context exceeded:(");
        }
    }

    public static final /* synthetic */ Object $anonfun$startServer$1(Module module, int i) {
        BoxedUnit boxToBoolean;
        if (!module.workerPoller().pollin(i)) {
            return BoxedUnit.UNIT;
        }
        ZMQ.Socket socket = module.workerPoller().getSocket(i);
        Message unpackAnyMsgFromArray = RemoteMessageConverter.unpackAnyMsgFromArray(socket.recv(0));
        if (unpackAnyMsgFromArray instanceof WorkerFinished) {
            WorkerFinished workerFinished = (WorkerFinished) unpackAnyMsgFromArray;
            module.logger.logInfo(new StringBuilder(74).append("Received message WorkerFinished from worker ").append(workerFinished.workerIdentity()).append(" Remove socket from workersMap").toString());
            module.workersMap().remove(workerFinished.Id);
            module.logger.logInfo(new StringBuilder(45).append("Unregister worker's ").append(workerFinished.workerIdentity()).append(" socket from workerPoller").toString());
            module.workerPoller().unregister(socket);
            module.logger.logInfo(new StringBuilder(24).append("Closing worker's ").append(workerFinished.workerIdentity()).append(" socket").toString());
            socket.close();
            boxToBoolean = BoxedUnit.UNIT;
        } else if (unpackAnyMsgFromArray instanceof SendMsgToPlatform) {
            SendMsgToPlatform sendMsgToPlatform = (SendMsgToPlatform) unpackAnyMsgFromArray;
            module.logger.logInfo(new StringBuilder(71).append("Received message SendMsgToPlatform from worker ").append(sendMsgToPlatform.workerIdentity()).append(" and send it to platform").toString());
            boxToBoolean = BoxesRunTime.boxToBoolean(module.sendMsgToClient(sendMsgToPlatform.msg, module.logger));
        } else {
            if (!(unpackAnyMsgFromArray instanceof IWorkerSendToClient)) {
                throw new MatchError(unpackAnyMsgFromArray);
            }
            IWorkerSendToClient iWorkerSendToClient = (IWorkerSendToClient) unpackAnyMsgFromArray;
            module.logger.logInfo(new StringBuilder(59).append("Received message of type IWorkerSendToPlatform from worker ").append(iWorkerSendToClient.workerIdentity()).append(new StringBuilder(34).append(" and proxy it (type: ").append(iWorkerSendToClient.type()).append(") to platform").toString()).toString());
            boxToBoolean = BoxesRunTime.boxToBoolean(module.sendMsgToClient(iWorkerSendToClient, module.logger));
        }
        return boxToBoolean;
    }

    public static final /* synthetic */ void $anonfun$reactOnExecuteMessageAsync$2(Execute execute, Message message, ZMQ.Socket socket, String str) {
        socket.send(new SendMsgToPlatform(new ExecuteResult(execute.statement, message, execute.clientIdentity()), str).toByteArray());
    }

    public static final /* synthetic */ void $anonfun$reactOnExecuteMessageAsync$3(Module module, Execute execute, Throwable th, ZMQ.Socket socket, String str) {
        socket.send(new SendMsgToPlatform(new ExecuteResultFailed(new StringBuilder(0).append(new StringBuilder(46).append("Module ").append(module.identity).append(" to ").append(execute.clientIdentity()).append(": error while reacting on execute: ").toString()).append(th.getMessage()).toString(), execute.clientIdentity()), str).toByteArray());
    }

    public static final /* synthetic */ void $anonfun$reactOnExecuteFunctionMessageAsync$2(ExecuteFunction executeFunction, Message message, ZMQ.Socket socket, String str) {
        socket.send(new SendMsgToPlatform(new ExecutedFunctionResult(executeFunction.name, message, executeFunction.clientIdentity()), str).toByteArray());
    }

    public static final /* synthetic */ void $anonfun$reactOnExecuteFunctionMessageAsync$3(Module module, ExecuteFunction executeFunction, Throwable th, ZMQ.Socket socket, String str) {
        socket.send(new SendMsgToPlatform(new ExecutedFunctionResultFailed(new StringBuilder(0).append(new StringBuilder(53).append("Module ").append(module.identity).append(" to ").append(executeFunction.clientIdentity()).append(": error while reacting on execute function").toString()).append(new StringBuilder(2).append(executeFunction.name).append(": ").toString()).append(th.getMessage()).toString(), executeFunction.clientIdentity()), str).toByteArray());
    }

    public static final /* synthetic */ void $anonfun$reactOnRemoteMessageAsync$2(Module module, Function3 function3, ObjectRef objectRef, String str, Function3 function32, Try r10) {
        if (r10 instanceof Success) {
            function3.apply((Message) ((Success) r10).value(), (ZMQ.Socket) objectRef.elem, str);
            module.logger.logInfo(new StringBuilder(54).append("[workers-future-").append(str).append("]: Sending WorkerFinished to inproc://").append(str).toString());
            ((ZMQ.Socket) objectRef.elem).send(new WorkerFinished(str).toByteArray());
            module.logger.logInfo(new StringBuilder(55).append("[workers-future-").append(str).append("]: Close future's pair socket inproc://").append(str).toString());
            ((ZMQ.Socket) objectRef.elem).close();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (!(r10 instanceof Failure)) {
            throw new MatchError(r10);
        }
        function32.apply(((Failure) r10).exception(), (ZMQ.Socket) objectRef.elem, str);
        module.logger.logInfo(new StringBuilder(54).append("[workers-future-").append(str).append("]: Sending WorkerFinished to inproc://").append(str).toString());
        ((ZMQ.Socket) objectRef.elem).send(new WorkerFinished(str).toByteArray());
        module.logger.logInfo(new StringBuilder(55).append("[workers-future-").append(str).append("]: Close future's pair socket inproc://").append(str).toString());
        ((ZMQ.Socket) objectRef.elem).close();
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ int $anonfun$generateUnusedWorkersName$1(Regex regex, String str) {
        return StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString((String) regex.findFirstIn(str).get()));
    }

    public Module(IModuleExecutor iModuleExecutor, String str, String str2, int i, ModuleLogger moduleLogger) {
        this.executor = iModuleExecutor;
        this.identity = str;
        this.host = str2;
        this.port = i;
        this.logger = moduleLogger;
    }
}
