package ml.shifu.guagua.master;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.Thread;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import ml.shifu.guagua.BasicCoordinator;
import ml.shifu.guagua.GuaguaConstants;
import ml.shifu.guagua.GuaguaRuntimeException;
import ml.shifu.guagua.io.Bytable;
import ml.shifu.guagua.io.BytableWrapper;
import ml.shifu.guagua.io.Combinable;
import ml.shifu.guagua.io.HaltBytable;
import ml.shifu.guagua.io.NettyBytableDecoder;
import ml.shifu.guagua.io.NettyBytableEncoder;
import ml.shifu.guagua.io.Serializer;
import ml.shifu.guagua.master.AbstractMasterCoordinator;
import ml.shifu.guagua.util.AppendList;
import ml.shifu.guagua.util.BytableDiskList;
import ml.shifu.guagua.util.BytableMemoryDiskList;
import ml.shifu.guagua.util.ClassUtils;
import ml.shifu.guagua.util.NetworkUtils;
import ml.shifu.guagua.util.NumberFormatUtils;
import ml.shifu.guagua.util.ReflectionUtils;
import ml.shifu.guagua.util.StringUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ml/shifu/guagua/master/NettyMasterCoordinator.class */
public class NettyMasterCoordinator<MASTER_RESULT extends Bytable, WORKER_RESULT extends Bytable> extends AbstractMasterCoordinator<MASTER_RESULT, WORKER_RESULT> {
    private static final Logger LOG = LoggerFactory.getLogger(NettyMasterCoordinator.class);
    private static final Object LOCK = new Object();
    private ServerBootstrap messageServer;
    private int messageServerPort;
    private AppendList<WorkerResultWrapper> iterResults;
    private int currentInteration;
    private int totalInteration;
    private static Serializer<Bytable> serializer;
    private Map<String, Integer> indexMap = new HashMap();
    private boolean canUpdateWorkerResultMap = true;
    private MASTER_RESULT masterResult = null;
    private String workerClassName = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ml/shifu/guagua/master/NettyMasterCoordinator$MasterThreadFactory.class */
    public static class MasterThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;

        MasterThreadFactory() {
            SecurityManager securityManager = System.getSecurityManager();
            this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
            this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(this.group, runnable, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
            if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
            if (thread.getPriority() != 5) {
                thread.setPriority(5);
            }
            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: ml.shifu.guagua.master.NettyMasterCoordinator.MasterThreadFactory.1
                @Override // java.lang.Thread.UncaughtExceptionHandler
                public void uncaughtException(Thread thread2, Throwable th) {
                    NettyMasterCoordinator.LOG.warn("Error message in thread {} with error message {}, error root cause {}.", new Object[]{thread2, th, th.getCause()});
                }
            });
            return thread;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ml/shifu/guagua/master/NettyMasterCoordinator$MergeWorkerResultList.class */
    public class MergeWorkerResultList extends LinkedList<WorkerResultWrapper> implements AppendList<WorkerResultWrapper> {
        private static final long serialVersionUID = -33662960498334446L;
        private int rawSize;
        private final int threshold;
        private int currIndex = 0;

        public MergeWorkerResultList(int i) {
            if (i <= 0) {
                throw new IllegalArgumentException("Threshold cannot be <= 0.");
            }
            this.threshold = i;
        }

        @Override // java.util.LinkedList, java.util.AbstractList, java.util.AbstractCollection, java.util.Collection, java.util.List, java.util.Deque, java.util.Queue
        public synchronized boolean add(WorkerResultWrapper workerResultWrapper) {
            this.rawSize++;
            if (!workerResultWrapper.isWorkerCombinable()) {
                return super.add((MergeWorkerResultList) workerResultWrapper);
            }
            this.currIndex++;
            if (this.currIndex != this.threshold - 1) {
                return super.add((MergeWorkerResultList) workerResultWrapper);
            }
            while (this.currIndex > 1) {
                workerResultWrapper.combine(removeLast());
                this.currIndex--;
            }
            if (this.currIndex != 1) {
                throw new IllegalStateException();
            }
            return super.add((MergeWorkerResultList) workerResultWrapper);
        }

        @Override // java.util.LinkedList, java.util.AbstractCollection, java.util.Collection, java.util.List, java.util.Deque, ml.shifu.guagua.util.AppendList
        public synchronized int size() {
            return this.rawSize;
        }

        public synchronized int mergedSize() {
            return super.size();
        }

        @Override // ml.shifu.guagua.util.AppendList
        public synchronized boolean append(WorkerResultWrapper workerResultWrapper) {
            return add(workerResultWrapper);
        }

        @Override // ml.shifu.guagua.util.AppendList
        public synchronized void switchState() {
        }

        public synchronized void close() {
        }
    }

    /* loaded from: input_file:ml/shifu/guagua/master/NettyMasterCoordinator$ServerHandler.class */
    private class ServerHandler extends SimpleChannelUpstreamHandler {
        private ServerHandler() {
        }

        public void handleUpstream(ChannelHandlerContext channelHandlerContext, ChannelEvent channelEvent) throws Exception {
            if ((channelEvent instanceof ChannelStateEvent) && ((ChannelStateEvent) channelEvent).getState() != ChannelState.INTEREST_OPS) {
                NettyMasterCoordinator.LOG.debug(channelEvent.toString());
            }
            super.handleUpstream(channelHandlerContext, channelEvent);
        }

        public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) {
            if (!(messageEvent.getMessage() instanceof Bytable)) {
                throw new IllegalStateException("Message should be bytable instance.");
            }
            BytableWrapper bytableWrapper = (BytableWrapper) messageEvent.getMessage();
            NettyMasterCoordinator.LOG.debug("Received container id {} with message:{}", bytableWrapper.getContainerId(), bytableWrapper);
            String containerId = bytableWrapper.getContainerId();
            synchronized (NettyMasterCoordinator.LOCK) {
                if (!NettyMasterCoordinator.this.canUpdateWorkerResultMap) {
                    NettyMasterCoordinator.LOG.info("Cannot update worker result with message: containerId {} iteration {} currentIteration", new Object[]{containerId, Integer.valueOf(bytableWrapper.getCurrentIteration()), Integer.valueOf(NettyMasterCoordinator.this.currentInteration)});
                    return;
                }
                if (bytableWrapper.isStopMessage()) {
                    Bytable bytable = NettyMasterCoordinator.this.masterResult;
                    if ((bytableWrapper.getCurrentIteration() == NettyMasterCoordinator.this.totalInteration + 1 || ((bytable instanceof HaltBytable) && ((HaltBytable) bytable).isHalt())) && !NettyMasterCoordinator.this.indexMap.containsKey(containerId)) {
                        NettyMasterCoordinator.this.iterResults.append(new WorkerResultWrapper(bytableWrapper.getCurrentIteration(), null, null));
                        NettyMasterCoordinator.this.indexMap.put(containerId, Integer.valueOf(NettyMasterCoordinator.this.iterResults.size() - 1));
                    }
                } else if (!NettyMasterCoordinator.this.indexMap.containsKey(containerId) && NettyMasterCoordinator.this.currentInteration == bytableWrapper.getCurrentIteration()) {
                    String str = NettyMasterCoordinator.this.workerClassName;
                    NettyMasterCoordinator.this.iterResults.append(new WorkerResultWrapper(bytableWrapper.getCurrentIteration(), NettyMasterCoordinator.this.getWorkerSerializer().bytesToObject(bytableWrapper.getBytes(), str), str));
                    NettyMasterCoordinator.this.indexMap.put(containerId, Integer.valueOf(NettyMasterCoordinator.this.iterResults.size() - 1));
                }
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
            NettyMasterCoordinator.LOG.error("error in service handler", exceptionEvent.getCause());
            exceptionEvent.getChannel().close();
            Throwable cause = exceptionEvent.getCause();
            if (cause != null && (cause instanceof GuaguaRuntimeException)) {
                throw ((GuaguaRuntimeException) cause);
            }
            throw new GuaguaRuntimeException(exceptionEvent.getCause());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ml/shifu/guagua/master/NettyMasterCoordinator$WorkerResultWrapper.class */
    public static class WorkerResultWrapper implements Bytable, Combinable<WorkerResultWrapper> {
        private int currIter;
        private Bytable workerResult;
        private String className;

        public WorkerResultWrapper(int i, Bytable bytable, String str) {
            this.currIter = i;
            this.workerResult = bytable;
            this.className = str;
        }

        public boolean isWorkerCombinable() {
            try {
                if (this.className != null && this.workerResult != null) {
                    if (Combinable.class.isAssignableFrom(Class.forName(this.className))) {
                        return true;
                    }
                }
                return false;
            } catch (ClassNotFoundException e) {
                return false;
            }
        }

        @Override // ml.shifu.guagua.io.Combinable
        public WorkerResultWrapper combine(WorkerResultWrapper workerResultWrapper) {
            if (isWorkerCombinable()) {
                ((Combinable) this.workerResult).combine(workerResultWrapper.workerResult);
            }
            return this;
        }

        @Override // ml.shifu.guagua.io.Bytable
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(this.currIter);
            if (this.className == null) {
                dataOutput.writeInt(0);
            } else {
                writeBytes(dataOutput, this.className.getBytes(Charset.forName("UTF-8")));
            }
            if (this.workerResult == null) {
                dataOutput.writeInt(0);
            } else {
                writeBytes(dataOutput, NettyMasterCoordinator.serializer.objectToBytes(this.workerResult));
            }
        }

        private void writeBytes(DataOutput dataOutput, byte[] bArr) throws IOException {
            dataOutput.writeInt(bArr.length);
            for (byte b : bArr) {
                dataOutput.writeByte(b);
            }
        }

        @Override // ml.shifu.guagua.io.Bytable
        public void readFields(DataInput dataInput) throws IOException {
            this.currIter = dataInput.readInt();
            int readInt = dataInput.readInt();
            if (readInt != 0) {
                byte[] bArr = new byte[readInt];
                for (int i = 0; i < bArr.length; i++) {
                    bArr[i] = dataInput.readByte();
                }
                this.className = new String(bArr, Charset.forName("UTF-8"));
            } else {
                this.className = null;
            }
            int readInt2 = dataInput.readInt();
            if (readInt2 != 0) {
                byte[] bArr2 = new byte[readInt2];
                for (int i2 = 0; i2 < bArr2.length; i2++) {
                    bArr2[i2] = dataInput.readByte();
                }
                this.workerResult = NettyMasterCoordinator.serializer.bytesToObject(bArr2, this.className);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // ml.shifu.guagua.BasicCoordinator
    public void initialize(Properties properties) {
        super.initialize(properties);
        initIterResults(properties);
    }

    private boolean isWorkerCombinable(String str) {
        if (str != null) {
            try {
                if (Combinable.class.isAssignableFrom(Class.forName(str))) {
                    return true;
                }
            } catch (ClassNotFoundException e) {
                return false;
            }
        }
        return false;
    }

    private void initIterResults(Properties properties) {
        synchronized (LOCK) {
            if ("true".equalsIgnoreCase(properties.getProperty(GuaguaConstants.GUAGUA_MASTER_RESULT_NONSPILL, "true")) && isWorkerCombinable(properties.getProperty(GuaguaConstants.GUAGUA_WORKER_RESULT_CLASS))) {
                this.iterResults = new MergeWorkerResultList(NumberFormatUtils.getInt(properties.getProperty(GuaguaConstants.GUAGUA_MASTER_RESULT_MERGE_THRESHOLD, "10"), 10));
            } else {
                BytableDiskList bytableDiskList = new BytableDiskList(System.currentTimeMillis() + "", WorkerResultWrapper.class.getName());
                long maxMemory = (long) (Runtime.getRuntime().maxMemory() * Double.valueOf(properties.getProperty(GuaguaConstants.GUAGUA_MASTER_WORKERESULTS_MEMORY_FRACTION, GuaguaConstants.GUAGUA_MASTER_WORKERESULTS_DEFAULT_MEMORY_FRACTION)).doubleValue());
                LOG.info("Memory size in BytableMemoryDiskList for worker result list: {}", Long.valueOf(maxMemory));
                this.iterResults = new BytableMemoryDiskList(maxMemory, bytableDiskList);
            }
        }
    }

    @Override // ml.shifu.guagua.master.MasterInterceptor
    public void preApplication(final MasterContext<MASTER_RESULT, WORKER_RESULT> masterContext) {
        initialize(masterContext.getProps());
        serializer = getWorkerSerializer();
        this.workerClassName = masterContext.getWorkerResultClassName();
        this.totalInteration = masterContext.getTotalIteration();
        new AbstractMasterCoordinator.FailOverCommand(masterContext).execute();
        if (!masterContext.isInitIteration()) {
            new BasicCoordinator.BasicCoordinatorCommand() { // from class: ml.shifu.guagua.master.NettyMasterCoordinator.1
                @Override // ml.shifu.guagua.BasicCoordinator.BasicCoordinatorCommand
                public void doExecute() throws KeeperException, InterruptedException {
                    String appId = masterContext.getAppId();
                    int currentIteration = masterContext.getCurrentIteration();
                    NettyMasterCoordinator.this.setMasterResult(masterContext, NettyMasterCoordinator.this.getCurrentMasterNode(appId, currentIteration).toString(), NettyMasterCoordinator.this.getCurrentMasterSplitNode(appId, currentIteration).toString());
                }
            }.execute();
        }
        startNettyServer(masterContext.getProps());
        synchronized (LOCK) {
            if (masterContext.isInitIteration()) {
                this.currentInteration = 0;
            } else {
                this.currentInteration = masterContext.getCurrentIteration();
            }
        }
        initMasterZnode(masterContext);
        if (masterContext.isInitIteration()) {
            clear(masterContext.getProps());
            LOG.info("All workers are initiliazed successfully.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void clear(Properties properties) {
        synchronized (LOCK) {
            closeIterResults();
            this.iterResults.clear();
            initIterResults(properties);
            this.indexMap.clear();
            this.canUpdateWorkerResultMap = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeIterResults() {
        try {
            Method declaredMethod = ClassUtils.getDeclaredMethod("close", this.iterResults.getClass());
            if (declaredMethod != null) {
                declaredMethod.invoke(this.iterResults, (Object[]) null);
            }
        } catch (NoSuchMethodException e) {
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    private void initMasterZnode(MasterContext<MASTER_RESULT, WORKER_RESULT> masterContext) {
        String str = null;
        try {
            str = getMasterBaseNode(masterContext.getAppId()).toString();
            getZooKeeper().createExt(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
        } catch (KeeperException.NodeExistsException e) {
            LOG.warn("Node exists: {}", str);
        } catch (Exception e2) {
            throw new GuaguaRuntimeException(e2);
        }
        try {
            String sb = getCurrentMasterNode(masterContext.getAppId(), 0).toString();
            if (getZooKeeper().exists(sb, false) == null) {
                String str2 = InetAddress.getLocalHost().getHostName() + ":" + this.messageServerPort + ":1";
                getZooKeeper().createExt(sb, str2.getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
                LOG.info("Master znode initialization with server info {}", str2);
            } else {
                String str3 = InetAddress.getLocalHost().getHostName() + ":" + this.messageServerPort + ":" + (NumberFormatUtils.getInt(new String(getZooKeeper().getData(sb, (Watcher) null, (Stat) null), Charset.forName("UTF-8")).split(":")[2], true) + 1);
                getZooKeeper().createOrSetExt(sb, str3.getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false, -1);
                LOG.info("Master znode re-initialization with server info {}", str3);
            }
        } catch (Exception e3) {
            throw new GuaguaRuntimeException(e3);
        } catch (KeeperException.NodeExistsException e4) {
            LOG.warn("Node exists: {}", str);
        }
    }

    private void startNettyServer(Properties properties) {
        this.messageServerPort = NumberFormatUtils.getInt(properties.getProperty(GuaguaConstants.GUAGUA_NETTY_SEVER_PORT), GuaguaConstants.GUAGUA_NETTY_SEVER_DEFAULT_PORT);
        this.messageServerPort = NetworkUtils.getValidServerPort(this.messageServerPort);
        this.messageServer = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newFixedThreadPool(8, new MasterThreadFactory()), Executors.newCachedThreadPool(new MasterThreadFactory())));
        this.messageServer.setPipelineFactory(new ChannelPipelineFactory() { // from class: ml.shifu.guagua.master.NettyMasterCoordinator.2
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new ChannelHandler[]{new NettyBytableEncoder(), new NettyBytableDecoder(), new ServerHandler()});
            }
        });
        try {
            this.messageServer.bind(new InetSocketAddress(this.messageServerPort));
        } catch (ChannelException e) {
            LOG.warn(e.getMessage() + "; try to rebind again.");
            this.messageServerPort = NetworkUtils.getValidServerPort(this.messageServerPort);
            this.messageServer.bind(new InetSocketAddress(this.messageServerPort));
        }
        try {
            LOG.info("Master netty server is started at {}", InetAddress.getLocalHost().getHostName() + ":" + InetAddress.getLocalHost().getHostAddress() + ":" + this.messageServerPort);
        } catch (UnknownHostException e2) {
            throw new GuaguaRuntimeException(e2);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // ml.shifu.guagua.master.MasterInterceptor
    public void preIteration(final MasterContext<MASTER_RESULT, WORKER_RESULT> masterContext) {
        synchronized (LOCK) {
            this.currentInteration = masterContext.getCurrentIteration();
            this.canUpdateWorkerResultMap = true;
        }
        long nanoTime = System.nanoTime();
        new BasicCoordinator.RetryCoordinatorCommand(isFixedTime(), getSleepTime()) { // from class: ml.shifu.guagua.master.NettyMasterCoordinator.3
            @Override // ml.shifu.guagua.BasicCoordinator.RetryCoordinatorCommand
            public boolean retryExecution() throws KeeperException, InterruptedException {
                int size;
                synchronized (NettyMasterCoordinator.LOCK) {
                    size = NettyMasterCoordinator.this.iterResults.size();
                }
                if (System.nanoTime() % 30 == 0) {
                    NettyMasterCoordinator.LOG.info("Iteration {}, workers compelted: {}, still {} workers are not synced.", new Object[]{Integer.valueOf(masterContext.getCurrentIteration()), Integer.valueOf(size), Integer.valueOf(masterContext.getWorkers() - size)});
                }
                boolean isTerminated = isTerminated(size, masterContext.getWorkers(), masterContext.getMinWorkersRatio(), (masterContext.isFirstIteration() || masterContext.getCurrentIteration() == masterContext.getTotalIteration()) ? 60000L : masterContext.getMinWorkersTimeOut());
                if (isTerminated) {
                    synchronized (NettyMasterCoordinator.LOCK) {
                        NettyMasterCoordinator.this.canUpdateWorkerResultMap = false;
                    }
                    NettyMasterCoordinator.LOG.info("Iteration {}, master waiting is terminated by workers {} doneWorkers {} minWorkersRatio {} minWorkersTimeOut {}.", new Object[]{Integer.valueOf(masterContext.getCurrentIteration()), Integer.valueOf(masterContext.getWorkers()), Integer.valueOf(size), Double.valueOf(masterContext.getMinWorkersRatio()), 60000L});
                }
                return isTerminated;
            }
        }.execute();
        synchronized (LOCK) {
            this.canUpdateWorkerResultMap = false;
            this.iterResults.switchState();
        }
        if (this.iterResults instanceof BytableMemoryDiskList) {
            LOG.info("Worker result memory count in iteration {} is {}.", Integer.valueOf(this.currentInteration), Long.valueOf(((BytableMemoryDiskList) this.iterResults).getMemoryCount()));
            LOG.info("Worker result dist count in iteration {} is {}.", Integer.valueOf(this.currentInteration), Long.valueOf(((BytableMemoryDiskList) this.iterResults).getDiskCount()));
        } else {
            LOG.info("Worker result merge list raw and merged count in iteration {} are {}, {}.", new Object[]{Integer.valueOf(this.currentInteration), Integer.valueOf(((MergeWorkerResultList) this.iterResults).size()), Integer.valueOf(((MergeWorkerResultList) this.iterResults).mergedSize())});
        }
        final int i = this.currentInteration;
        masterContext.setWorkerResults(new Iterable<WORKER_RESULT>() { // from class: ml.shifu.guagua.master.NettyMasterCoordinator.4
            @Override // java.lang.Iterable
            public Iterator<WORKER_RESULT> iterator() {
                return (Iterator<WORKER_RESULT>) new Iterator<WORKER_RESULT>() { // from class: ml.shifu.guagua.master.NettyMasterCoordinator.4.1
                    private Iterator<WorkerResultWrapper> localItr;
                    private volatile AtomicBoolean isStart = new AtomicBoolean();
                    WorkerResultWrapper current = null;

                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        synchronized (NettyMasterCoordinator.LOCK) {
                            if (this.isStart.compareAndSet(false, true)) {
                                this.localItr = NettyMasterCoordinator.this.iterResults.iterator();
                            }
                            boolean hasNext = this.localItr.hasNext();
                            if (hasNext) {
                                this.current = this.localItr.next();
                                while (this.current.currIter != i) {
                                    hasNext = this.localItr.hasNext();
                                    if (!hasNext) {
                                        break;
                                    }
                                    this.current = this.localItr.next();
                                }
                            }
                            if (hasNext) {
                                return hasNext;
                            }
                            this.localItr = NettyMasterCoordinator.this.iterResults.iterator();
                            return false;
                        }
                    }

                    @Override // java.util.Iterator
                    public WORKER_RESULT next() {
                        WORKER_RESULT worker_result;
                        synchronized (NettyMasterCoordinator.LOCK) {
                            worker_result = (WORKER_RESULT) this.current.workerResult;
                        }
                        return worker_result;
                    }

                    @Override // java.util.Iterator
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        });
        LOG.info("Application {} container {} iteration {} waiting ends with {}ms execution time.", new Object[]{masterContext.getAppId(), masterContext.getContainerId(), Integer.valueOf(masterContext.getCurrentIteration()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))});
    }

    @Override // ml.shifu.guagua.master.AbstractMasterCoordinator, ml.shifu.guagua.master.MasterInterceptor
    public void postIteration(final MasterContext<MASTER_RESULT, WORKER_RESULT> masterContext) {
        new BasicCoordinator.BasicCoordinatorCommand() { // from class: ml.shifu.guagua.master.NettyMasterCoordinator.5
            @Override // ml.shifu.guagua.BasicCoordinator.BasicCoordinatorCommand
            public void doExecute() throws KeeperException, InterruptedException {
                NettyMasterCoordinator.this.masterResult = masterContext.getMasterResult();
                NettyMasterCoordinator.this.updateMasterHaltStatus(masterContext);
                boolean z = false;
                String sb = NettyMasterCoordinator.this.getCurrentMasterNode(masterContext.getAppId(), masterContext.getCurrentIteration()).toString();
                String sb2 = NettyMasterCoordinator.this.getCurrentMasterSplitNode(masterContext.getAppId(), masterContext.getCurrentIteration()).toString();
                NettyMasterCoordinator.LOG.debug("master result:{}", masterContext.getMasterResult());
                try {
                    z = NettyMasterCoordinator.this.setBytesToZNode(sb, sb2, NettyMasterCoordinator.this.getMasterSerializer().objectToBytes(masterContext.getMasterResult()), CreateMode.PERSISTENT);
                    synchronized (NettyMasterCoordinator.LOCK) {
                        NettyMasterCoordinator.this.clear(masterContext.getProps());
                        NettyMasterCoordinator.this.currentInteration = masterContext.getCurrentIteration() + 1;
                        NettyMasterCoordinator.this.canUpdateWorkerResultMap = true;
                    }
                } catch (KeeperException.NodeExistsException e) {
                    NettyMasterCoordinator.LOG.warn("Has such node:", e);
                }
                int i = NumberFormatUtils.getInt(masterContext.getProps().getProperty(GuaguaConstants.GUAGUA_CLEANUP_INTERVAL), 2);
                if (masterContext.getCurrentIteration() >= i + 1) {
                    String sb3 = NettyMasterCoordinator.this.getMasterNode(masterContext.getAppId(), masterContext.getCurrentIteration() - i).toString();
                    try {
                        NettyMasterCoordinator.this.getZooKeeper().deleteExt(sb3, -1, false);
                        if (z) {
                            sb3 = NettyMasterCoordinator.this.getCurrentMasterSplitNode(masterContext.getAppId(), masterContext.getCurrentIteration() - i).toString();
                            NettyMasterCoordinator.this.getZooKeeper().deleteExt(sb3, -1, true);
                        }
                    } catch (KeeperException.NoNodeException e2) {
                        if (System.nanoTime() % 20 == 0) {
                            NettyMasterCoordinator.LOG.warn("No such node:{}", sb3);
                        }
                    }
                }
                NettyMasterCoordinator.LOG.info("master results write to znode.");
            }
        }.execute();
    }

    @Override // ml.shifu.guagua.master.AbstractMasterCoordinator, ml.shifu.guagua.master.MasterInterceptor
    public void postApplication(final MasterContext<MASTER_RESULT, WORKER_RESULT> masterContext) {
        synchronized (LOCK) {
            this.currentInteration = masterContext.getCurrentIteration();
        }
        new BasicCoordinator.BasicCoordinatorCommand() { // from class: ml.shifu.guagua.master.NettyMasterCoordinator.6
            @Override // ml.shifu.guagua.BasicCoordinator.BasicCoordinatorCommand
            public void doExecute() throws Exception, InterruptedException {
                try {
                    String str = StringUtils.get(masterContext.getProps().getProperty(GuaguaConstants.GUAGUA_ZK_CLEANUP_ENABLE), "true");
                    final long nanoTime = System.nanoTime();
                    if (Boolean.TRUE.toString().equalsIgnoreCase(str)) {
                        new BasicCoordinator.RetryCoordinatorCommand(NettyMasterCoordinator.this.isFixedTime(), NettyMasterCoordinator.this.getSleepTime()) { // from class: ml.shifu.guagua.master.NettyMasterCoordinator.6.1
                            @Override // ml.shifu.guagua.BasicCoordinator.RetryCoordinatorCommand
                            public boolean retryExecution() throws KeeperException, InterruptedException {
                                int size;
                                synchronized (NettyMasterCoordinator.LOCK) {
                                    size = NettyMasterCoordinator.this.iterResults.size();
                                }
                                if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) > 120000) {
                                    NettyMasterCoordinator.LOG.info("unregister step, worker(s) compelted: {}, still {} workers are not unregistered, but time out to terminate.", Integer.valueOf(size), Integer.valueOf(masterContext.getWorkers() - size));
                                    return true;
                                }
                                if (System.nanoTime() % 30 == 0) {
                                    NettyMasterCoordinator.LOG.info("unregister step, worker(s) compelted: {}, still {} workers are not unregistered.", Integer.valueOf(size), Integer.valueOf(masterContext.getWorkers() - size));
                                }
                                return isTerminated(size, masterContext.getWorkers(), 1.0d, 60000L);
                            }
                        }.execute();
                        String sb = NettyMasterCoordinator.this.getAppNode(masterContext.getAppId()).toString();
                        try {
                            NettyMasterCoordinator.this.getZooKeeper().deleteExt(sb, -1, true);
                        } catch (KeeperException.NoNodeException e) {
                            if (System.nanoTime() % 20 == 0) {
                                NettyMasterCoordinator.LOG.warn("No such node:{}", sb);
                            }
                        }
                    }
                    if (NettyMasterCoordinator.this.messageServer != null) {
                        Method method = ReflectionUtils.getMethod(NettyMasterCoordinator.this.messageServer.getClass(), "shutdown");
                        if (method != null) {
                            method.invoke(NettyMasterCoordinator.this.messageServer, (Object[]) null);
                        }
                        NettyMasterCoordinator.this.messageServer.releaseExternalResources();
                    }
                    NettyMasterCoordinator.super.closeZooKeeper();
                    NettyMasterCoordinator.this.closeIterResults();
                    NettyMasterCoordinator.this.iterResults.clear();
                } catch (Throwable th) {
                    if (NettyMasterCoordinator.this.messageServer != null) {
                        Method method2 = ReflectionUtils.getMethod(NettyMasterCoordinator.this.messageServer.getClass(), "shutdown");
                        if (method2 != null) {
                            method2.invoke(NettyMasterCoordinator.this.messageServer, (Object[]) null);
                        }
                        NettyMasterCoordinator.this.messageServer.releaseExternalResources();
                    }
                    NettyMasterCoordinator.super.closeZooKeeper();
                    NettyMasterCoordinator.this.closeIterResults();
                    NettyMasterCoordinator.this.iterResults.clear();
                    throw th;
                }
            }
        }.execute();
    }
}
