package org.simpleframework.transport.reactor;

import java.io.IOException;
import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.simpleframework.common.thread.Daemon;
import org.simpleframework.transport.trace.Trace;

/* loaded from: input_file:org/simpleframework/transport/reactor/ActionDistributor.class */
class ActionDistributor extends Daemon implements OperationDistributor {
    private Map<Channel, ActionSet> executing;
    private Map<Channel, ActionSet> selecting;
    private Queue<Channel> invalid;
    private Queue<Action> pending;
    private ActionSelector selector;
    private Executor executor;
    private Latch latch;
    private long expiry;
    private long update;
    private boolean cancel;

    public ActionDistributor(Executor executor) throws IOException {
        this(executor, true);
    }

    public ActionDistributor(Executor executor, boolean z) throws IOException {
        this(executor, z, 120000L);
    }

    public ActionDistributor(Executor executor, boolean z, long j) throws IOException {
        this.selecting = new LinkedHashMap();
        this.executing = new LinkedHashMap();
        this.pending = new ConcurrentLinkedQueue();
        this.invalid = new ConcurrentLinkedQueue();
        this.selector = new ActionSelector();
        this.latch = new Latch();
        this.executor = executor;
        this.cancel = z;
        this.expiry = j;
        start();
    }

    @Override // org.simpleframework.transport.reactor.OperationDistributor
    public void process(Operation operation, int i) throws IOException {
        ExecuteAction executeAction = new ExecuteAction(operation, i, this.expiry);
        if (!isActive()) {
            throw new IOException("Distributor is closed");
        }
        this.pending.offer(executeAction);
        this.selector.wake();
    }

    @Override // org.simpleframework.transport.reactor.OperationDistributor
    public void close() throws IOException {
        stop();
        this.selector.wake();
        this.latch.close();
    }

    public int size() {
        return this.selecting.size();
    }

    public void run() {
        try {
            execute();
            purge();
        } catch (Throwable th) {
            purge();
            throw th;
        }
    }

    private void execute() {
        while (isActive()) {
            try {
                register();
                cancel();
                expire();
                distribute();
                validate();
            } catch (Exception e) {
                report(e);
            }
        }
    }

    private void purge() {
        try {
            register();
            cancel();
            clear();
        } catch (Exception e) {
            report(e);
        }
    }

    private void report(Exception exc) {
        for (Channel channel : this.selecting.keySet()) {
            for (Action action : this.selecting.get(channel).list()) {
                try {
                    action.getOperation().getTrace().trace(ReactorEvent.ERROR, exc);
                } catch (Exception e) {
                    this.invalid.offer(channel);
                }
            }
        }
        this.invalid.clear();
    }

    private void clear() throws IOException {
        for (ActionSet actionSet : this.selector.registeredSets()) {
            for (Action action : actionSet.list()) {
                Trace trace = action.getOperation().getTrace();
                try {
                    trace.trace(ReactorEvent.CLOSE_SELECTOR);
                    expire(actionSet, Long.MAX_VALUE);
                } catch (Exception e) {
                    trace.trace(ReactorEvent.ERROR, e);
                }
            }
        }
        this.selector.close();
        this.latch.signal();
    }

    private void expire() throws IOException {
        List<ActionSet> registeredSets = this.selector.registeredSets();
        if (this.cancel) {
            long currentTimeMillis = System.currentTimeMillis();
            if (this.update <= currentTimeMillis) {
                Iterator<ActionSet> it = registeredSets.iterator();
                while (it.hasNext()) {
                    expire(it.next(), currentTimeMillis);
                }
                this.update = currentTimeMillis + 10000;
            }
        }
    }

    private void expire(ActionSet actionSet, long j) throws IOException {
        Action[] list = actionSet.list();
        SelectionKey key = actionSet.key();
        if (key.isValid()) {
            int interestOps = key.interestOps();
            for (Action action : list) {
                int interest = action.getInterest();
                if (action.getExpiry() < j) {
                    expire(actionSet, action);
                    interestOps &= interest ^ (-1);
                }
            }
            update(actionSet, interestOps);
        }
    }

    private void update(ActionSet actionSet, int i) throws IOException {
        SelectionKey key = actionSet.key();
        if (i != 0) {
            key.interestOps(i);
            return;
        }
        this.selecting.remove(key.channel());
        key.cancel();
    }

    private void expire(ActionSet actionSet, Action action) throws IOException {
        CancelAction cancelAction = new CancelAction(action);
        if (actionSet != null) {
            Trace trace = action.getOperation().getTrace();
            int interest = action.getInterest();
            try {
                trace.trace(ReactorEvent.SELECT_EXPIRED, Integer.valueOf(interest));
                actionSet.remove(interest);
                execute(cancelAction);
            } catch (Exception e) {
                trace.trace(ReactorEvent.ERROR, e);
            }
        }
    }

    private void validate() throws IOException {
        for (Channel channel : this.selecting.keySet()) {
            if (!this.selecting.get(channel).key().isValid()) {
                this.invalid.offer(channel);
            }
        }
        Iterator<Channel> it = this.invalid.iterator();
        while (it.hasNext()) {
            invalidate(it.next());
        }
        this.invalid.clear();
    }

    private void invalidate(Channel channel) throws IOException {
        for (Action action : this.selecting.remove(channel).list()) {
            Trace trace = action.getOperation().getTrace();
            try {
                trace.trace(ReactorEvent.INVALID_KEY);
                execute(action);
            } catch (Exception e) {
                trace.trace(ReactorEvent.ERROR, e);
            }
        }
    }

    private void cancel() throws IOException {
        for (ActionSet actionSet : this.executing.values()) {
            for (Action action : actionSet.list()) {
                action.getOperation().getTrace().trace(ReactorEvent.SELECT_CANCEL);
            }
            actionSet.cancel();
            actionSet.clear();
        }
        this.executing.clear();
    }

    private void register() throws IOException {
        while (!this.pending.isEmpty()) {
            Action poll = this.pending.poll();
            if (poll != null) {
                SelectableChannel channel = poll.getChannel();
                ActionSet remove = this.executing.remove(channel);
                if (remove == null) {
                    remove = this.selecting.get(channel);
                }
                if (remove != null) {
                    update(poll, remove);
                } else {
                    register(poll);
                }
            }
        }
    }

    private void register(Action action) throws IOException {
        SelectableChannel channel = action.getChannel();
        Trace trace = action.getOperation().getTrace();
        try {
            if (channel.isOpen()) {
                trace.trace(ReactorEvent.SELECT);
                select(action);
            } else {
                trace.trace(ReactorEvent.CHANNEL_CLOSED);
                this.selecting.remove(channel);
                execute(action);
            }
        } catch (Exception e) {
            trace.trace(ReactorEvent.ERROR, e);
        }
    }

    private void update(Action action, ActionSet actionSet) throws IOException {
        Trace trace = action.getOperation().getTrace();
        SelectionKey key = actionSet.key();
        int interest = action.getInterest();
        int interestOps = key.interestOps() | interest;
        try {
            if (1 == (interest & 1)) {
                trace.trace(ReactorEvent.UPDATE_READ_INTEREST);
            }
            if (4 == (interest & 4)) {
                trace.trace(ReactorEvent.UPDATE_WRITE_INTEREST);
            }
            trace.trace(ReactorEvent.UPDATE_INTEREST, Integer.valueOf(interestOps));
            key.interestOps(interestOps);
            actionSet.attach(action);
        } catch (Exception e) {
            trace.trace(ReactorEvent.ERROR, e);
        }
    }

    private void select(Action action) throws IOException {
        SelectableChannel channel = action.getChannel();
        Trace trace = action.getOperation().getTrace();
        int interest = action.getInterest();
        if (interest > 0) {
            ActionSet register = this.selector.register(channel, interest);
            if (1 == (interest & 1)) {
                trace.trace(ReactorEvent.REGISTER_READ_INTEREST);
            }
            if (4 == (interest & 4)) {
                trace.trace(ReactorEvent.REGISTER_WRITE_INTEREST);
            }
            trace.trace(ReactorEvent.REGISTER_INTEREST, Integer.valueOf(interest));
            register.attach(action);
            this.selecting.put(channel, register);
        }
    }

    private void distribute() throws IOException {
        if (this.selector.select(5000L) <= 0 || !isActive()) {
            return;
        }
        process();
    }

    private void process() throws IOException {
        for (ActionSet actionSet : this.selector.selectedSets()) {
            process(actionSet);
            remove(actionSet);
        }
    }

    private void process(ActionSet actionSet) throws IOException {
        for (Action action : actionSet.ready()) {
            Trace trace = action.getOperation().getTrace();
            int interest = action.getInterest();
            try {
                if (1 == (interest & 1)) {
                    trace.trace(ReactorEvent.READ_INTEREST_READY, Integer.valueOf(interest));
                }
                if (4 == (interest & 4)) {
                    trace.trace(ReactorEvent.WRITE_INTEREST_READY, Integer.valueOf(interest));
                }
                execute(action);
            } catch (Exception e) {
                trace.trace(ReactorEvent.ERROR, e);
            }
        }
    }

    private void remove(ActionSet actionSet) throws IOException {
        SelectableChannel channel = actionSet.channel();
        SelectionKey key = actionSet.key();
        if (!key.isValid()) {
            this.selecting.remove(channel);
            return;
        }
        int interest = actionSet.interest();
        int readyOps = key.readyOps();
        if (this.cancel) {
            int i = interest & (readyOps ^ (-1));
            if (i == 0) {
                this.executing.put(channel, actionSet);
            } else {
                key.interestOps(i);
            }
            actionSet.remove(readyOps);
        }
    }

    private void execute(Action action) {
        Trace trace = action.getOperation().getTrace();
        try {
            trace.trace(ReactorEvent.EXECUTE_ACTION, Integer.valueOf(action.getInterest()));
            this.executor.execute(action);
        } catch (Exception e) {
            trace.trace(ReactorEvent.ERROR, e);
        }
    }
}
