/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.bedrock.runtime.concurrent.socket;

import com.oracle.bedrock.Option;
import com.oracle.bedrock.annotations.Internal;
import com.oracle.bedrock.io.NetworkHelper;
import com.oracle.bedrock.predicate.Predicates;
import com.oracle.bedrock.runtime.concurrent.AbstractControllableRemoteChannel;
import com.oracle.bedrock.runtime.concurrent.RemoteCallable;
import com.oracle.bedrock.runtime.concurrent.RemoteEvent;
import com.oracle.bedrock.runtime.concurrent.RemoteEventListener;
import com.oracle.bedrock.runtime.concurrent.RemoteRunnable;
import com.oracle.bedrock.runtime.concurrent.socket.SocketBasedRemoteChannel;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.logging.Logger;
import java.util.stream.Collectors;

@Internal
public class SocketBasedRemoteChannelServer
extends AbstractControllableRemoteChannel {
    private static final Logger LOGGER = Logger.getLogger(SocketBasedRemoteChannelServer.class.getName());
    private ServerSocket serverSocket;
    private ServerThread serverThread;
    private ConcurrentHashMap<Integer, SocketBasedRemoteChannel> remoteChannels;
    private AtomicBoolean isTerminating;
    private String name;
    private final int port;

    public SocketBasedRemoteChannelServer(String name) {
        this(name, 0);
    }

    public SocketBasedRemoteChannelServer(String name, int port) {
        this.name = name;
        this.port = port;
        this.serverSocket = null;
        this.serverThread = null;
        this.remoteChannels = new ConcurrentHashMap();
        this.isTerminating = new AtomicBoolean(false);
    }

    public synchronized InetAddress open() throws IOException {
        if (!this.isOpen()) {
            this.serverSocket = new ServerSocket(this.port);
            this.serverSocket.setReuseAddress(true);
            this.serverThread = new ServerThread();
            this.serverThread.start();
            this.setOpen(true);
        }
        return this.getInetAddress(Predicates.allOf((Predicate[])new Predicate[]{NetworkHelper.LOOPBACK_ADDRESS, NetworkHelper.DEFAULT_ADDRESS}));
    }

    public synchronized int getPort() {
        if (this.serverSocket != null) {
            return this.serverSocket.getLocalPort();
        }
        throw new IllegalStateException("Server is closed");
    }

    public synchronized InetAddress getInetAddress(Predicate<InetAddress> predicate) {
        if (this.serverSocket != null) {
            try {
                predicate = predicate == null ? NetworkHelper.DEFAULT_ADDRESS : predicate;
                InetAddress inetAddress = NetworkHelper.getInetAddress(predicate);
                return inetAddress == null ? this.serverSocket.getInetAddress() : inetAddress;
            }
            catch (SocketException e) {
                return this.serverSocket.getInetAddress();
            }
        }
        throw new IllegalStateException("Server is closed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected synchronized void onClose() {
        this.isTerminating.set(true);
        for (SocketBasedRemoteChannel executor : this.remoteChannels.values()) {
            try {
                executor.close();
            }
            catch (Exception exception) {}
        }
        try {
            this.serverSocket.close();
        }
        catch (IOException iOException) {
        }
        finally {
            this.serverSocket = null;
        }
    }

    @Override
    public <T> CompletableFuture<T> submit(RemoteCallable<T> callable, Option ... options) throws IllegalStateException {
        SocketBasedRemoteChannelServer socketBasedRemoteChannelServer = this;
        synchronized (socketBasedRemoteChannelServer) {
            if (this.isOpen() && !this.isTerminating.get()) {
                List<CompletableFuture> futures = this.remoteChannels.values().stream().map(channel -> channel.submit(callable, new Option[0])).collect(Collectors.toList());
                if (futures.isEmpty()) {
                    throw new IllegalStateException("Failed to submit the request [" + callable + "].  There are no RemoteChannels connected");
                }
                return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()]));
            }
            throw new IllegalStateException("Can't submit the request [" + callable + " as the RemoteChannel is closing or is closed");
        }
    }

    @Override
    public CompletableFuture<Void> submit(RemoteRunnable runnable, Option ... options) throws IllegalStateException {
        SocketBasedRemoteChannelServer socketBasedRemoteChannelServer = this;
        synchronized (socketBasedRemoteChannelServer) {
            if (this.isOpen() && !this.isTerminating.get()) {
                List<CompletableFuture> futures = this.remoteChannels.values().stream().map(channel -> channel.submit(runnable, new Option[0])).collect(Collectors.toList());
                if (futures.isEmpty()) {
                    throw new IllegalStateException("Failed to submit the request [" + runnable + "].  There are no RemoteChannels connected");
                }
                return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
            }
            throw new IllegalStateException("Can't submit the request [" + runnable + "] as the RemoteChannel is closing or is closed");
        }
    }

    @Override
    public void addListener(RemoteEventListener listener, Option ... options) {
        super.addListener(listener, options);
        this.remoteChannels.forEach((id, remoteChannel) -> remoteChannel.addListener(listener, options));
    }

    @Override
    public void removeListener(RemoteEventListener listener, Option ... options) {
        super.removeListener(listener, options);
        this.remoteChannels.forEach((id, remoteChannel) -> remoteChannel.removeListener(listener, options));
    }

    @Override
    public CompletableFuture<Void> raise(RemoteEvent event, Option ... options) {
        if (this.isOpen()) {
            List<CompletableFuture> futures = this.remoteChannels.values().stream().map(channel -> {
                try {
                    return channel.raise(event, options);
                }
                catch (Throwable e) {
                    return CompletableFuture.completedFuture(null);
                }
            }).collect(Collectors.toList());
            return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        }
        return CompletableFuture.completedFuture(null);
    }

    public Iterable<SocketBasedRemoteChannel> getRemoteChannels() {
        return this.remoteChannels.values();
    }

    private class ServerThread
    extends Thread {
        private SocketBasedRemoteChannel remoteChannel;

        private ServerThread() {
            super("RemoteChannelServer-" + SocketBasedRemoteChannelServer.this.name);
        }

        @Override
        public void run() {
            int channelId = 0;
            while (!SocketBasedRemoteChannelServer.this.isTerminating.get()) {
                int remoteChannelId = ++channelId;
                boolean connected = false;
                int attempts = 0;
                int maxAttempts = 5;
                while (!connected && attempts < maxAttempts) {
                    ++attempts;
                    try {
                        Socket socket = SocketBasedRemoteChannelServer.this.serverSocket.accept();
                        SocketBasedRemoteChannel channel = new SocketBasedRemoteChannel(socket);
                        SocketBasedRemoteChannelServer.this.eventListenersByStreamName.forEach((streamName, listeners) -> listeners.forEach(listener -> channel.addListener((RemoteEventListener)listener, (Option)streamName)));
                        SocketBasedRemoteChannelServer.this.channelListeners.forEach(channel::addListener);
                        channel.open();
                        if (channel.isOpen()) {
                            SocketBasedRemoteChannelServer.this.remoteChannels.put(remoteChannelId, channel);
                            this.remoteChannel = channel;
                            connected = true;
                            continue;
                        }
                        --attempts;
                        LOGGER.severe("SocketBasedRemoteChannelServer \"" + SocketBasedRemoteChannelServer.this.name + "\" Rejected connection on " + socket);
                    }
                    catch (Throwable e) {
                        if (this.remoteChannel != null && this.remoteChannel.isOpen()) {
                            try {
                                this.remoteChannel.onClose();
                            }
                            catch (Throwable throwable) {
                                // empty catch block
                            }
                        }
                        this.remoteChannel = null;
                        SocketBasedRemoteChannelServer.this.remoteChannels.remove(remoteChannelId);
                    }
                }
                SocketBasedRemoteChannelServer.this.isTerminating.compareAndSet(false, !connected);
            }
        }
    }
}

