package org.jgroups.protocols;

import java.io.DataInput;
import java.io.DataOutput;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Global;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.Experimental;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.ThreadFactory;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;

@MBean(description = "Implementation of scopes (concurrent delivery of messages from the same sender)")
@Deprecated
/* loaded from: input_file:jgroups-3.6.2.Final.jar:org/jgroups/protocols/SCOPE.class */
public class SCOPE extends Protocol {
    protected int thread_pool_min_threads = 2;
    protected int thread_pool_max_threads = 10;
    protected long thread_pool_keep_alive_time = 30000;

    @Property(description = "Thread naming pattern for threads in this channel. Default is cl")
    protected String thread_naming_pattern = "cl";

    @Property(description = "Time in milliseconds after which an expired scope will get removed. An expired scope is one to which no messages have been added in max_expiration_time milliseconds. 0 never expires scopes")
    protected long expiration_time = 30000;

    @Property(description = "Interval in milliseconds at which the expiry task tries to remove expired scopes")
    protected long expiration_interval = 60000;
    protected Future<?> expiry_task = null;
    protected final ConcurrentMap<Address, ConcurrentMap<Short, MessageQueue>> queues = Util.createConcurrentMap();
    protected String cluster_name;
    protected Address local_addr;
    protected Executor thread_pool;
    protected ThreadFactory thread_factory;
    protected TimeScheduler timer;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:jgroups-3.6.2.Final.jar:org/jgroups/protocols/SCOPE$ExpiryTask.class */
    public class ExpiryTask implements Runnable {
        protected ExpiryTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                _run();
            } catch (Throwable th) {
                SCOPE.this.log.error("failed expiring old scopes", th);
            }
        }

        protected void _run() {
            long currentTimeMillis = System.currentTimeMillis();
            for (Map.Entry<Address, ConcurrentMap<Short, MessageQueue>> entry : SCOPE.this.queues.entrySet()) {
                Iterator<Map.Entry<Short, MessageQueue>> it = entry.getValue().entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<Short, MessageQueue> next = it.next();
                    Short key = next.getKey();
                    MessageQueue value = next.getValue();
                    long lastUpdate = currentTimeMillis - value.getLastUpdate();
                    if (lastUpdate >= SCOPE.this.expiration_time && value.size() == 0) {
                        it.remove();
                        if (SCOPE.this.log.isTraceEnabled()) {
                            SCOPE.this.log.trace("expired scope " + entry.getKey() + "::" + key + " (" + lastUpdate + " ms old)");
                        }
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:jgroups-3.6.2.Final.jar:org/jgroups/protocols/SCOPE$MessageQueue.class */
    public static class MessageQueue {
        private final Queue<Message> queue = new ConcurrentLinkedQueue();
        private final AtomicBoolean processing = new AtomicBoolean(false);
        private long last_update = System.currentTimeMillis();

        protected MessageQueue() {
        }

        public boolean acquire() {
            return this.processing.compareAndSet(false, true);
        }

        public boolean release() {
            boolean compareAndSet = this.processing.compareAndSet(true, false);
            if (compareAndSet) {
                this.last_update = System.currentTimeMillis();
            }
            return compareAndSet;
        }

        public void add(Message message) {
            this.queue.add(message);
        }

        public Message remove() {
            return this.queue.poll();
        }

        public void clear() {
            this.queue.clear();
        }

        public int size() {
            return this.queue.size();
        }

        public long getLastUpdate() {
            return this.last_update;
        }
    }

    /* loaded from: input_file:jgroups-3.6.2.Final.jar:org/jgroups/protocols/SCOPE$QueueThread.class */
    protected class QueueThread implements Runnable {
        protected final MessageQueue queue;
        protected boolean first = true;

        public QueueThread(MessageQueue messageQueue) {
            this.queue = messageQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            do {
                if (this.first) {
                    this.first = false;
                } else if (!this.queue.acquire()) {
                    return;
                }
                while (true) {
                    try {
                        Message remove = this.queue.remove();
                        if (remove == null) {
                            break;
                        }
                        try {
                            SCOPE.this.up_prot.up(new Event(1, remove));
                        } catch (Throwable th) {
                            SCOPE.this.log.error("couldn't deliver message " + remove, th);
                        }
                    } finally {
                        this.queue.release();
                    }
                }
            } while (this.queue.size() != 0);
        }
    }

    /* loaded from: input_file:jgroups-3.6.2.Final.jar:org/jgroups/protocols/SCOPE$ScopeHeader.class */
    public static class ScopeHeader extends Header {
        public static final byte MSG = 1;
        public static final byte EXPIRE = 2;
        byte type;
        short scope;

        public static ScopeHeader createMessageHeader(short s) {
            return new ScopeHeader((byte) 1, s);
        }

        public static ScopeHeader createExpireHeader(short s) {
            return new ScopeHeader((byte) 2, s);
        }

        public ScopeHeader() {
            this.scope = (short) 0;
        }

        private ScopeHeader(byte b, short s) {
            this.scope = (short) 0;
            this.type = b;
            this.scope = s;
        }

        public short getScope() {
            return this.scope;
        }

        @Override // org.jgroups.Header
        public int size() {
            switch (this.type) {
                case 1:
                case 2:
                    return 3;
                default:
                    throw new IllegalStateException("type has to be MSG or EXPIRE");
            }
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutput dataOutput) throws Exception {
            dataOutput.writeByte(this.type);
            switch (this.type) {
                case 1:
                case 2:
                    dataOutput.writeShort(this.scope);
                    return;
                default:
                    throw new IllegalStateException("type has to be MSG or EXPIRE");
            }
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInput dataInput) throws Exception {
            this.type = dataInput.readByte();
            switch (this.type) {
                case 1:
                case 2:
                    this.scope = dataInput.readShort();
                    return;
                default:
                    throw new IllegalStateException("type has to be MSG or EXPIRE");
            }
        }

        @Override // org.jgroups.Header
        public String toString() {
            StringBuilder sb = new StringBuilder(typeToString(this.type));
            switch (this.type) {
                case 1:
                case 2:
                    sb.append(": scope=").append((int) this.scope);
                    break;
                default:
                    sb.append("n/a");
                    break;
            }
            return sb.toString();
        }

        public static String typeToString(byte b) {
            switch (b) {
                case 1:
                    return "MSG";
                case 2:
                    return "EXPIRE";
                default:
                    return "n/a";
            }
        }
    }

    @ManagedAttribute(description = "Number of scopes in queues")
    public int getNumberOfReceiverScopes() {
        int i = 0;
        Iterator<ConcurrentMap<Short, MessageQueue>> it = this.queues.values().iterator();
        while (it.hasNext()) {
            i += it.next().keySet().size();
        }
        return i;
    }

    @ManagedAttribute(description = "Total number of messages in all queues")
    public int getNumberOfMessages() {
        int i = 0;
        Iterator<ConcurrentMap<Short, MessageQueue>> it = this.queues.values().iterator();
        while (it.hasNext()) {
            Iterator<MessageQueue> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                i += it2.next().size();
            }
        }
        return i;
    }

    @Property(name = "thread_pool.min_threads", description = "Minimum thread pool size for the regular thread pool")
    public void setThreadPoolMinThreads(int i) {
        this.thread_pool_min_threads = i;
        if (this.thread_pool instanceof ThreadPoolExecutor) {
            ((ThreadPoolExecutor) this.thread_pool).setCorePoolSize(i);
        }
    }

    public int getThreadPoolMinThreads() {
        return this.thread_pool_min_threads;
    }

    @Property(name = "thread_pool.max_threads", description = "Maximum thread pool size for the regular thread pool")
    public void setThreadPoolMaxThreads(int i) {
        this.thread_pool_max_threads = i;
        if (this.thread_pool instanceof ThreadPoolExecutor) {
            ((ThreadPoolExecutor) this.thread_pool).setMaximumPoolSize(i);
        }
    }

    public int getThreadPoolMaxThreads() {
        return this.thread_pool_max_threads;
    }

    @Property(name = "thread_pool.keep_alive_time", description = "Timeout in milliseconds to remove idle thread from regular pool")
    public void setThreadPoolKeepAliveTime(long j) {
        this.thread_pool_keep_alive_time = j;
        if (this.thread_pool instanceof ThreadPoolExecutor) {
            ((ThreadPoolExecutor) this.thread_pool).setKeepAliveTime(j, TimeUnit.MILLISECONDS);
        }
    }

    public long getThreadPoolKeepAliveTime() {
        return this.thread_pool_keep_alive_time;
    }

    @Experimental
    @ManagedOperation(description = "Removes all queues and scopes - only used for testing, might get removed any time !")
    public void removeAllQueues() {
        this.queues.clear();
    }

    @ManagedOperation(description = "Expires the given scope around the cluster")
    public void expire(short s) {
        ScopeHeader createExpireHeader = ScopeHeader.createExpireHeader(s);
        Message message = new Message();
        message.putHeader(Global.SCOPE_ID, createExpireHeader);
        message.setFlag(Message.SCOPED);
        this.down_prot.down(new Event(1, message));
    }

    public void removeScope(Address address, short s) {
        ConcurrentMap<Short, MessageQueue> concurrentMap;
        MessageQueue remove;
        if (address == null || (concurrentMap = this.queues.get(address)) == null || (remove = concurrentMap.remove(Short.valueOf(s))) == null) {
            return;
        }
        remove.clear();
    }

    @ManagedOperation(description = "Dumps all scopes associated with members")
    public String dumpScopes() {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<Address, ConcurrentMap<Short, MessageQueue>> entry : this.queues.entrySet()) {
            sb.append(entry.getKey()).append(": ").append(new TreeSet(entry.getValue().keySet())).append(StringUtils.LF);
        }
        return sb.toString();
    }

    @ManagedAttribute(description = "Number of active thread in the pool")
    public int getNumActiveThreads() {
        if (this.thread_pool instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor) this.thread_pool).getActiveCount();
        }
        return 0;
    }

    @Override // org.jgroups.stack.Protocol
    public void init() throws Exception {
        super.init();
        this.timer = getTransport().getTimer();
        this.thread_factory = new DefaultThreadFactory("SCOPE", false, true);
        setInAllThreadFactories(this.cluster_name, this.local_addr, this.thread_naming_pattern);
        if ((this.expiration_interval > 0 && this.expiration_time <= 0) || (this.expiration_interval <= 0 && this.expiration_time > 0)) {
            throw new IllegalArgumentException("expiration_interval (" + this.expiration_interval + ") and expiration_time (" + this.expiration_time + ") don't match");
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void start() throws Exception {
        super.start();
        this.thread_pool = createThreadPool(this.thread_pool_min_threads, this.thread_pool_max_threads, this.thread_pool_keep_alive_time, this.thread_factory);
        if (this.expiration_interval <= 0 || this.expiration_time <= 0) {
            return;
        }
        startExpiryTask();
    }

    @Override // org.jgroups.stack.Protocol
    public void stop() {
        super.stop();
        stopExpiryTask();
        shutdownThreadPool(this.thread_pool);
        Iterator<ConcurrentMap<Short, MessageQueue>> it = this.queues.values().iterator();
        while (it.hasNext()) {
            Iterator<MessageQueue> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                it2.next().release();
            }
        }
    }

    @Override // org.jgroups.stack.Protocol
    public Object down(Event event) {
        switch (event.getType()) {
            case 2:
            case 80:
            case 92:
            case 93:
                this.cluster_name = (String) event.getArg();
                setInAllThreadFactories(this.cluster_name, this.local_addr, this.thread_naming_pattern);
                break;
            case 6:
                handleView((View) event.getArg());
                break;
            case 8:
                this.local_addr = (Address) event.getArg();
                break;
        }
        return this.down_prot.down(event);
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public Object up(Event event) {
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                if (message.isFlagSet(Message.SCOPED) && !message.isFlagSet(Message.Flag.OOB)) {
                    ScopeHeader scopeHeader = (ScopeHeader) message.getHeader(this.id);
                    if (scopeHeader == null) {
                        throw new IllegalStateException("message doesn't have a ScopeHeader attached");
                    }
                    if (scopeHeader.type == 2) {
                        removeScope(message.getSrc(), scopeHeader.scope);
                        return null;
                    }
                    MessageQueue orCreateQueue = getOrCreateQueue(message.getSrc(), scopeHeader.scope);
                    orCreateQueue.add(message);
                    if (!orCreateQueue.acquire()) {
                        return null;
                    }
                    this.thread_pool.execute(new QueueThread(orCreateQueue));
                    return null;
                }
                break;
            case 6:
                handleView((View) event.getArg());
                break;
        }
        return this.up_prot.up(event);
    }

    @Override // org.jgroups.stack.Protocol
    public void up(MessageBatch messageBatch) {
        Iterator<Message> it = messageBatch.iterator();
        while (it.hasNext()) {
            Message next = it.next();
            if (next.isFlagSet(Message.SCOPED) && !next.isFlagSet(Message.Flag.OOB)) {
                ScopeHeader scopeHeader = (ScopeHeader) next.getHeader(this.id);
                if (scopeHeader == null) {
                    this.log.error("message doesn't have a ScopeHeader attached");
                } else {
                    messageBatch.remove(next);
                    if (scopeHeader.type == 2) {
                        removeScope(next.getSrc(), scopeHeader.scope);
                    } else {
                        MessageQueue orCreateQueue = getOrCreateQueue(next.getSrc(), scopeHeader.scope);
                        orCreateQueue.add(next);
                        if (orCreateQueue.acquire()) {
                            this.thread_pool.execute(new QueueThread(orCreateQueue));
                        }
                    }
                }
            }
        }
        if (messageBatch.isEmpty()) {
            return;
        }
        this.up_prot.up(messageBatch);
    }

    protected MessageQueue getOrCreateQueue(Address address, short s) {
        ConcurrentMap<Short, MessageQueue> concurrentMap = this.queues.get(address);
        if (concurrentMap == null) {
            concurrentMap = Util.createConcurrentMap();
            ConcurrentMap<Short, MessageQueue> putIfAbsent = this.queues.putIfAbsent(address, concurrentMap);
            if (putIfAbsent != null) {
                concurrentMap = putIfAbsent;
            }
        }
        MessageQueue messageQueue = concurrentMap.get(Short.valueOf(s));
        if (messageQueue == null) {
            messageQueue = new MessageQueue();
            MessageQueue putIfAbsent2 = concurrentMap.putIfAbsent(Short.valueOf(s), messageQueue);
            if (putIfAbsent2 != null) {
                messageQueue = putIfAbsent2;
            }
        }
        return messageQueue;
    }

    protected synchronized void startExpiryTask() {
        if (this.expiry_task == null || this.expiry_task.isDone()) {
            this.expiry_task = this.timer.scheduleWithFixedDelay(new ExpiryTask(), this.expiration_interval, this.expiration_interval, TimeUnit.MILLISECONDS);
        }
    }

    protected synchronized void stopExpiryTask() {
        if (this.expiry_task != null) {
            this.expiry_task.cancel(true);
            this.expiry_task = null;
        }
    }

    protected static ExecutorService createThreadPool(int i, int i2, long j, ThreadFactory threadFactory) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(i, i2, j, TimeUnit.MILLISECONDS, new SynchronousQueue());
        threadPoolExecutor.setThreadFactory(threadFactory);
        threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return threadPoolExecutor;
    }

    protected static void shutdownThreadPool(Executor executor) {
        if (executor instanceof ExecutorService) {
            ExecutorService executorService = (ExecutorService) executor;
            executorService.shutdownNow();
            try {
                executorService.awaitTermination(3000L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }
    }

    private void setInAllThreadFactories(String str, Address address, String str2) {
        for (ThreadFactory threadFactory : new ThreadFactory[]{this.thread_factory}) {
            if (str2 != null) {
                threadFactory.setPattern(str2);
            }
            if (str != null) {
                threadFactory.setClusterName(str);
            }
            if (address != null) {
                threadFactory.setAddress(address.toString());
            }
        }
    }

    private void handleView(View view) {
        List<Address> members = view.getMembers();
        HashSet hashSet = new HashSet(this.queues.keySet());
        hashSet.removeAll(members);
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            clearQueue((Address) it.next());
        }
    }

    public void clearQueue(Address address) {
        ConcurrentMap<Short, MessageQueue> remove = this.queues.remove(address);
        if (remove != null) {
            Iterator<MessageQueue> it = remove.values().iterator();
            while (it.hasNext()) {
                it.next().clear();
            }
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace("removed " + address + " from receiver_table");
        }
    }
}
