package org.objectweb.proactive.extensions.pamr.router;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.objectweb.proactive.core.exceptions.IOException6;
import org.objectweb.proactive.core.util.ProActiveRandom;
import org.objectweb.proactive.core.util.log.ProActiveLogger;
import org.objectweb.proactive.extensions.pamr.PAMRConfig;
import org.objectweb.proactive.extensions.pamr.exceptions.MalformedMessageException;
import org.objectweb.proactive.extensions.pamr.exceptions.PAMRException;
import org.objectweb.proactive.extensions.pamr.protocol.AgentID;
import org.objectweb.proactive.extensions.pamr.protocol.MagicCookie;
import org.objectweb.proactive.extensions.pamr.protocol.message.ErrorMessage;
import org.objectweb.proactive.extensions.pamr.protocol.message.HeartbeatRouterMessage;
import org.objectweb.proactive.utils.NamedThreadFactory;
import org.objectweb.proactive.utils.SafeTimerTask;
import org.objectweb.proactive.utils.Sleeper;
import org.objectweb.proactive.utils.SweetCountDownLatch;
import org.objectweb.proactive.utils.ThreadPools;

/* loaded from: input_file:org/objectweb/proactive/extensions/pamr/router/RouterImpl.class */
public class RouterImpl extends RouterInternal implements Runnable {
    public static final Logger logger = ProActiveLogger.getLogger(PAMRConfig.Loggers.PAMR_ROUTER);
    public static final Logger admin_logger = ProActiveLogger.getLogger(PAMRConfig.Loggers.PAMR_ROUTER_ADMIN);
    public static final int DEFAULT_PORT = 33647;
    private static final int READ_BUFFER_SIZE = 4096;
    public static final long DEFAULT_ROUTER_ID = Long.MIN_VALUE;
    private final ExecutorService tpe;
    private InetAddress inetAddress;
    private int port;
    private final long routerId;
    private volatile MagicCookie adminMagicCookie;
    private final File configFile;
    private final int heartbeatTimeout;
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private final SweetCountDownLatch isStopped = new SweetCountDownLatch(1);
    private final AtomicReference<Thread> selectThread = new AtomicReference<>();
    private final ConcurrentHashMap<AgentID, Client> clientMap = new ConcurrentHashMap<>();
    private Selector selector = null;
    private ServerSocketChannel ssc = null;
    private ServerSocket serverSocket = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/objectweb/proactive/extensions/pamr/router/RouterImpl$DisconnectionBroadcaster.class */
    public static class DisconnectionBroadcaster implements Runnable {
        private final List<Client> clients;
        private final AgentID disconnectedAgent;

        public DisconnectionBroadcaster(Collection<Client> collection, AgentID agentID) {
            this.clients = new ArrayList(collection);
            this.disconnectedAgent = agentID;
        }

        @Override // java.lang.Runnable
        public void run() {
            for (Client client : this.clients) {
                if (!this.disconnectedAgent.equals(client.getAgentId())) {
                    try {
                        client.sendMessage(new ErrorMessage(ErrorMessage.ErrorType.ERR_DISCONNECTION_BROADCAST, client.getAgentId(), this.disconnectedAgent, 0L).toByteArray());
                    } catch (Exception e) {
                        ProActiveLogger.logEatedException(RouterImpl.logger, e);
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/objectweb/proactive/extensions/pamr/router/RouterImpl$HeartbeatTimerTask.class */
    private class HeartbeatTimerTask extends SafeTimerTask {
        private final long maxTime;
        private long heartbeatId;
        private final ThreadPoolExecutor tpe = ThreadPools.newBoundedThreadPool(32, new NamedThreadFactory("Hearbeat sender", false, 10));

        /* loaded from: input_file:org/objectweb/proactive/extensions/pamr/router/RouterImpl$HeartbeatTimerTask$SendTask.class */
        private class SendTask implements Callable<Boolean> {
            final Client client;
            final byte[] msg;
            final long heartbeatId;

            public SendTask(Client client, byte[] bArr, long j) {
                this.client = client;
                this.msg = bArr;
                this.heartbeatId = j;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                try {
                    if (this.client.isConnected()) {
                        this.client.sendMessage(this.msg);
                    }
                    return true;
                } catch (IOException e) {
                    throw new PAMRException("Failed to send heartbeat #" + this.heartbeatId + " to " + this.client, e);
                }
            }
        }

        public HeartbeatTimerTask(long j) {
            this.heartbeatId = 0L;
            this.maxTime = j;
            this.heartbeatId = 0L;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v10 */
        /* JADX WARN: Type inference failed for: r0v7, types: [java.util.concurrent.ConcurrentHashMap] */
        /* JADX WARN: Type inference failed for: r0v8, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r8v0, types: [org.objectweb.proactive.extensions.pamr.router.RouterImpl$HeartbeatTimerTask] */
        @Override // org.objectweb.proactive.utils.SafeTimerTask
        public void safeRun() {
            long currentTimeMillis = System.currentTimeMillis();
            int activeCount = this.tpe.getActiveCount();
            if (activeCount > 0) {
                RouterImpl.admin_logger.warn(String.valueOf(activeCount) + " workers [cur:" + this.tpe.getPoolSize() + ",lar:" + this.tpe.getLargestPoolSize() + ",max:" + this.tpe.getMaximumPoolSize() + "] still busy before heartbeats #" + this.heartbeatId + " being send");
            }
            ?? r0 = RouterImpl.this.clientMap;
            synchronized (r0) {
                ArrayList<Client> arrayList = new ArrayList(RouterImpl.this.clientMap.values());
                r0 = r0;
                byte[] byteArray = new HeartbeatRouterMessage(this.heartbeatId).toByteArray();
                ArrayList arrayList2 = new ArrayList(arrayList.size());
                ArrayList arrayList3 = new ArrayList(arrayList2.size());
                for (Client client : arrayList) {
                    if (client.isConnected()) {
                        SendTask sendTask = new SendTask(client, byteArray, this.heartbeatId);
                        arrayList2.add(sendTask);
                        arrayList3.add(this.tpe.submit(sendTask));
                    }
                }
                this.heartbeatId++;
                checkHeartbeat(arrayList);
                long currentTimeMillis2 = this.maxTime - (System.currentTimeMillis() - currentTimeMillis);
                if (currentTimeMillis2 > 0) {
                    new Sleeper(currentTimeMillis2).sleep();
                } else {
                    RouterImpl.admin_logger.warn("Tooks more than " + this.maxTime + " ms to submit send tasks and check received heartbeats (" + (this.maxTime - currentTimeMillis2) + "ms)");
                }
                for (int i = 0; i < arrayList3.size(); i++) {
                    Future future = (Future) arrayList3.get(i);
                    if (future.isDone()) {
                        try {
                            future.get();
                        } catch (Throwable th) {
                            RouterImpl.admin_logger.info("Exception occured while sending heartbeat to " + arrayList.get(i), th);
                        }
                    } else {
                        RouterImpl.admin_logger.info("Sending heartbeat to " + arrayList.get(i) + " took longer than " + this.maxTime + "ms.");
                    }
                }
            }
        }

        private void checkHeartbeat(Collection<Client> collection) {
            long currentTimeMillis = System.currentTimeMillis();
            for (Client client : collection) {
                if (client.isConnected()) {
                    long lastSeen = currentTimeMillis - client.getLastSeen();
                    if (lastSeen > RouterImpl.this.heartbeatTimeout) {
                        try {
                            RouterImpl.logger.info("Client " + client + " disconnected due to late heartbeat (" + lastSeen + " ms)");
                            client.disconnect();
                        } catch (IOException e) {
                            RouterImpl.logger.info("Failed to disconnected client " + client, e);
                        }
                        this.tpe.submit(new DisconnectionBroadcaster(collection, client.getAgentId()));
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RouterImpl(RouterConfig routerConfig) throws Exception {
        this.configFile = routerConfig.getReservedAgentConfigFile();
        this.heartbeatTimeout = routerConfig.getHeartbeatTimeout();
        init(routerConfig);
        this.tpe = Executors.newFixedThreadPool(routerConfig.getNbWorkerThreads(), new NamedThreadFactory("Proactive PAMR router worker"));
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 != 0) {
                this.routerId = j2;
                return;
            }
            j = ProActiveRandom.nextPosLong();
        }
    }

    private void init(RouterConfig routerConfig) throws Exception {
        reloadConfigurationFile();
        this.selector = Selector.open();
        this.ssc = ServerSocketChannel.open();
        this.ssc.configureBlocking(false);
        this.serverSocket = this.ssc.socket();
        this.inetAddress = routerConfig.getInetAddress();
        this.port = routerConfig.getPort();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(this.inetAddress, this.port);
        this.inetAddress = inetSocketAddress.getAddress();
        this.serverSocket.bind(inetSocketAddress);
        this.port = this.serverSocket.getLocalPort();
        logger.info("Message router listening on " + this.serverSocket.toString() + ". Heartbeat timeout is " + this.heartbeatTimeout + " ms");
        this.ssc.register(this.selector, 16);
    }

    @Override // java.lang.Runnable
    public void run() {
        if (!this.selectThread.compareAndSet(null, Thread.currentThread())) {
            logger.error("A select thread has already been started, aborting the current thread ", new Exception());
            return;
        }
        Timer timer2 = new Timer("Heartbeat timer", true);
        long j = this.heartbeatTimeout / 3;
        timer2.scheduleAtFixedRate(new HeartbeatTimerTask(j), new Date(), j);
        while (!this.stopped.get()) {
            try {
                this.selector.select();
                Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey next = it.next();
                    it.remove();
                    try {
                        if ((next.readyOps() & 16) == 16) {
                            handleAccept(next);
                        } else if ((next.readyOps() & 1) == 1) {
                            handleRead(next);
                        } else {
                            logger.warn("Unhandled SelectionKey operation");
                        }
                    } catch (CancelledKeyException e) {
                        clientDisconnected(next, e.getMessage());
                    }
                }
            } catch (IOException e2) {
                logger.warn("Select failed", e2);
            }
        }
        cleanup();
    }

    private void cleanup() {
        this.tpe.shutdown();
        Iterator<Client> it = this.clientMap.values().iterator();
        while (it.hasNext()) {
            it.next().discardAttachment("Shutting down the router");
        }
        try {
            this.ssc.socket().close();
            this.ssc.close();
            this.selector.close();
        } catch (IOException e) {
            ProActiveLogger.logEatedException(logger, e);
        }
        this.isStopped.countDown();
    }

    private void handleAccept(SelectionKey selectionKey) {
        try {
            SocketChannel accept = ((ServerSocketChannel) selectionKey.channel()).accept();
            accept.configureBlocking(false);
            accept.register(this.selector, 1);
        } catch (IOException e) {
            logger.warn("Failed to accept a new connection", e);
        }
    }

    private void handleRead(SelectionKey selectionKey) {
        int read;
        ByteBuffer allocate = ByteBuffer.allocate(4096);
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        Attachment attachment = (Attachment) selectionKey.attachment();
        if (attachment == null) {
            attachment = new Attachment(this, socketChannel);
            selectionKey.attach(attachment);
        }
        do {
            try {
                allocate.clear();
                read = socketChannel.read(allocate);
                allocate.flip();
                if (read > 0) {
                    attachment.getAssembler().pushBuffer(allocate);
                }
            } catch (MalformedMessageException e) {
                clientDisconnected(selectionKey, e.getMessage());
                return;
            } catch (IOException e2) {
                clientDisconnected(selectionKey, e2.getMessage());
                return;
            }
        } while (read > 0);
        if (read == -1) {
            clientDisconnected(selectionKey, "end of stream");
        }
    }

    private void clientDisconnected(SelectionKey selectionKey, String str) {
        Attachment attachment = (Attachment) selectionKey.attachment();
        selectionKey.cancel();
        selectionKey.attach(null);
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        try {
            socketChannel.socket().close();
        } catch (IOException e) {
            ProActiveLogger.logEatedException(logger, e);
        }
        try {
            socketChannel.close();
        } catch (IOException e2) {
            ProActiveLogger.logEatedException(logger, e2);
        }
        Client client = attachment.getClient();
        if (client != null) {
            client.discardAttachment(str);
            this.tpe.submit(new DisconnectionBroadcaster(this.clientMap.values(), client.getAgentId()));
        }
        logger.debug("Client " + attachment.getRemoteEndpoint() + " disconnected: " + str);
    }

    @Override // org.objectweb.proactive.extensions.pamr.router.RouterInternal
    public void handleAsynchronously(ByteBuffer byteBuffer, Attachment attachment) {
        this.tpe.execute(new TopLevelProcessor(byteBuffer, attachment, this));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.util.concurrent.ConcurrentHashMap<org.objectweb.proactive.extensions.pamr.protocol.AgentID, org.objectweb.proactive.extensions.pamr.router.Client>] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v6, types: [org.objectweb.proactive.extensions.pamr.router.Client] */
    @Override // org.objectweb.proactive.extensions.pamr.router.RouterInternal
    public Client getClient(AgentID agentID) {
        Client client = this.clientMap;
        synchronized (client) {
            client = this.clientMap.get(agentID);
        }
        return client;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.util.concurrent.ConcurrentHashMap<org.objectweb.proactive.extensions.pamr.protocol.AgentID, org.objectweb.proactive.extensions.pamr.router.Client>] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v6 */
    @Override // org.objectweb.proactive.extensions.pamr.router.RouterInternal
    public void addClient(Client client) {
        ?? r0 = this.clientMap;
        synchronized (r0) {
            this.clientMap.put(client.getAgentId(), client);
            r0 = r0;
        }
    }

    @Override // org.objectweb.proactive.extensions.pamr.router.Router
    public int getPort() {
        return this.port;
    }

    @Override // org.objectweb.proactive.extensions.pamr.router.Router
    public InetAddress getInetAddr() {
        return this.inetAddress;
    }

    @Override // org.objectweb.proactive.extensions.pamr.router.Router
    public void stop() {
        if (this.stopped.get()) {
            throw new IllegalStateException("Router already stopped");
        }
        this.stopped.set(true);
        Thread thread = this.selectThread.get();
        if (thread != null) {
            thread.interrupt();
            this.isStopped.await();
        }
    }

    public long getId() {
        return this.routerId;
    }

    private Map<AgentID, MagicCookie> validateConfigFile() throws Exception {
        Properties properties = new Properties();
        MagicCookie magicCookie = null;
        try {
            properties.load(new FileInputStream(this.configFile));
            HashMap hashMap = new HashMap();
            for (Object obj : properties.keySet()) {
                String str = (String) obj;
                String str2 = (String) properties.get(obj);
                if (!"configuration".equals(str)) {
                    try {
                        AgentID agentID = new AgentID(Long.parseLong(str));
                        try {
                            MagicCookie magicCookie2 = new MagicCookie(str2);
                            if (!agentID.isReserved()) {
                                throw new Exception("Invalid configuration file " + this.configFile + ": invalid Agent ID " + str + "Agent ID must be between 0 and 4095");
                            }
                            hashMap.put(agentID, magicCookie2);
                        } catch (IllegalArgumentException e) {
                            throw new Exception("Invalid configuration file " + this.configFile + ": invalid cookie value  " + str2 + ". " + e.getMessage());
                        }
                    } catch (NumberFormatException e2) {
                        throw new Exception("Invalid configuration file" + this.configFile + ": Keys must be an integer but " + str + " is not");
                    }
                } else {
                    if (magicCookie != null) {
                        throw new Exception("Duplicated configuration magic cookie");
                    }
                    try {
                        magicCookie = new MagicCookie(properties.getProperty(str));
                    } catch (IllegalArgumentException e3) {
                        throw new Exception("Invalid configuration magic cookie", e3);
                    }
                }
            }
            if (magicCookie == null) {
                throw new Exception("Configuration magic cookie must be defined in the configuration file (key: configuration)");
            }
            admin_logger.debug("Set config magic cookie to: " + magicCookie);
            this.adminMagicCookie = magicCookie;
            return hashMap;
        } catch (FileNotFoundException e4) {
            throw new IOException("Router configuration file does not exist: " + this.configFile);
        } catch (IOException e5) {
            throw new IOException6("Failed to read the router configuration file: " + this.configFile, e5);
        } catch (IllegalArgumentException e6) {
            throw new IOException6("Failed to read the router configuation file: " + this.configFile, e6);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v39, types: [boolean] */
    /* JADX WARN: Type inference failed for: r0v5, types: [java.util.concurrent.ConcurrentHashMap<org.objectweb.proactive.extensions.pamr.protocol.AgentID, org.objectweb.proactive.extensions.pamr.router.Client>] */
    /* JADX WARN: Type inference failed for: r0v53, types: [org.objectweb.proactive.extensions.pamr.router.Client] */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.lang.Throwable] */
    public synchronized void reloadConfigurationFile() throws Exception {
        if (this.configFile == null) {
            return;
        }
        Map<AgentID, MagicCookie> validateConfigFile = validateConfigFile();
        Client client = this.clientMap;
        synchronized (client) {
            Iterator<AgentID> it = this.clientMap.keySet().iterator();
            while (it.hasNext()) {
                AgentID next = it.next();
                if (next.isReserved() && (client = validateConfigFile.containsKey(next)) == 0) {
                    try {
                        try {
                            client = this.clientMap.get(next);
                            client.disconnect();
                        } finally {
                            this.clientMap.remove(next);
                            admin_logger.debug("Removed reserved agent " + next + " (configuration change)");
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        this.clientMap.remove(next);
                        admin_logger.debug("Removed reserved agent " + next + " (configuration change)");
                    }
                }
            }
            for (AgentID agentID : validateConfigFile.keySet()) {
                Client client2 = this.clientMap.get(agentID);
                if (client2 != null) {
                    client2.discardAttachment("Configuration file reloaded");
                }
                this.clientMap.put(agentID, new Client(agentID, validateConfigFile.get(agentID)));
                admin_logger.debug("Disconnected reserved agent " + agentID + " and updated magic cookie (configuration change)");
            }
        }
    }

    public MagicCookie getAdminMagicCookie() {
        return this.adminMagicCookie;
    }

    public File getConfigurationFile() {
        return this.configFile;
    }

    public int getHeartbeatTimeout() {
        return this.heartbeatTimeout;
    }
}
