package com.betfair.cougar.transport.socket;

import com.betfair.cougar.api.ExecutionContextWithTokens;
import com.betfair.cougar.api.LogExtension;
import com.betfair.cougar.api.UUIDGenerator;
import com.betfair.cougar.core.api.ev.ConnectedResponse;
import com.betfair.cougar.core.api.ev.ExecutionResult;
import com.betfair.cougar.core.api.ev.OperationDefinition;
import com.betfair.cougar.core.api.ev.Subscription;
import com.betfair.cougar.core.api.exception.CougarException;
import com.betfair.cougar.core.api.exception.CougarFrameworkException;
import com.betfair.cougar.core.api.logging.EventLogger;
import com.betfair.cougar.core.impl.logging.ConnectedObjectLogEvent;
import com.betfair.cougar.logging.CougarLogger;
import com.betfair.cougar.logging.CougarLoggingUtils;
import com.betfair.cougar.netutil.nio.CougarProtocol;
import com.betfair.cougar.netutil.nio.HeapDelta;
import com.betfair.cougar.netutil.nio.NioLogger;
import com.betfair.cougar.netutil.nio.NioUtils;
import com.betfair.cougar.netutil.nio.TerminateSubscription;
import com.betfair.cougar.netutil.nio.connected.InitialUpdate;
import com.betfair.cougar.netutil.nio.connected.TerminateHeap;
import com.betfair.cougar.netutil.nio.connected.Update;
import com.betfair.cougar.netutil.nio.message.EventMessage;
import com.betfair.cougar.transport.api.protocol.CougarObjectIOFactory;
import com.betfair.cougar.transport.api.protocol.CougarObjectOutput;
import com.betfair.cougar.transport.api.protocol.socket.NewHeapSubscription;
import com.betfair.cougar.util.UUIDGeneratorImpl;
import com.betfair.platform.virtualheap.Heap;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import org.apache.mina.common.IoSession;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;

@ManagedResource
/* loaded from: input_file:com/betfair/cougar/transport/socket/PooledServerConnectedObjectManager.class */
public class PooledServerConnectedObjectManager implements ServerConnectedObjectManager {
    private static CougarLogger logger;
    private static final AtomicLong heapStateInstanceIdSource;
    private EventLogger eventLogger;
    private NioLogger nioLogger;
    private CougarObjectIOFactory objectIOFactory;
    private int numProcessingThreads;
    private int maxUpdateActionsPerMessage;
    static final /* synthetic */ boolean $assertionsDisabled;
    private BlockingDeque<String> heapsWaitingForUpdate = new LinkedBlockingDeque();
    private ReentrantLock subTermLock = new ReentrantLock();
    private Map<String, HeapState> heapStates = new HashMap();
    private Map<Long, String> heapUris = new HashMap();
    private Map<IoSession, Multiset<String>> heapsByClient = new HashMap();
    private AtomicLong heapIdGenerator = new AtomicLong(0);
    private List<ConnectedObjectPusher> pushers = new ArrayList();
    private UUIDGenerator uuidGenerator = new UUIDGeneratorImpl();
    private Thread shutdownHook = new Thread(new Runnable() { // from class: com.betfair.cougar.transport.socket.PooledServerConnectedObjectManager.1
        @Override // java.lang.Runnable
        public void run() {
            PooledServerConnectedObjectManager.logger.log(Level.INFO, "Terminating all push subscriptions due to application shutdown.", new Object[0]);
            PooledServerConnectedObjectManager.this.terminateAllSubscriptions(Subscription.CloseReason.NODE_SHUTDOWN);
        }
    }, "PooledServerConnectedObjectManager-ShutdownHook");

    /* loaded from: input_file:com/betfair/cougar/transport/socket/PooledServerConnectedObjectManager$ConnectedObjectPusher.class */
    private class ConnectedObjectPusher implements Runnable {
        private volatile boolean running;

        private ConnectedObjectPusher() {
            this.running = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            HeapState heapState;
            while (this.running) {
                try {
                    try {
                        String str = (String) PooledServerConnectedObjectManager.this.heapsWaitingForUpdate.pollFirst(1000L, TimeUnit.MILLISECONDS);
                        if (str != null && (heapState = (HeapState) PooledServerConnectedObjectManager.this.heapStates.get(str)) != null) {
                            Lock updateLock = heapState.getUpdateLock();
                            updateLock.lock();
                            try {
                                try {
                                } catch (Throwable th) {
                                    updateLock.unlock();
                                    throw th;
                                    break;
                                }
                            } catch (Exception e) {
                                PooledServerConnectedObjectManager.logger.log(Level.SEVERE, "error sending updates", e, new Object[0]);
                                PooledServerConnectedObjectManager.this.terminateSubscriptions(str, Subscription.CloseReason.INTERNAL_ERROR);
                                updateLock.unlock();
                            }
                            if (heapState.isTerminated()) {
                                PooledServerConnectedObjectManager.logger.log(Level.SEVERE, "heapState.isTerminated()", new Object[0]);
                                updateLock.unlock();
                            } else {
                                LinkedList linkedList = new LinkedList();
                                Iterator<QueuedHeapChange> it = heapState.getQueuedChanges().iterator();
                                while (it.hasNext()) {
                                    linkedList.add(it.next());
                                    it.remove();
                                }
                                while (!linkedList.isEmpty()) {
                                    Iterator it2 = linkedList.iterator();
                                    while (it2.hasNext()) {
                                        QueuedHeapChange queuedHeapChange = (QueuedHeapChange) it2.next();
                                        if (!queuedHeapChange.isSub()) {
                                            break;
                                        }
                                        QueuedSubscription sub = queuedHeapChange.getSub();
                                        IoSession session = sub.getSession();
                                        long lastUpdateId = heapState.getLastUpdateId();
                                        PooledServerConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, session, "ConnectedObjectPusher: Sending initial heap state with updateId = %s for heapId = %s", new Object[]{Long.valueOf(lastUpdateId), Long.valueOf(heapState.getHeapId())});
                                        NioUtils.writeEventMessageToSession(session, new HeapDelta(heapState.getHeapId(), lastUpdateId, Collections.singletonList(sub.getInitialState())), PooledServerConnectedObjectManager.this.objectIOFactory);
                                        heapState.addSession(session);
                                        it2.remove();
                                    }
                                    ArrayList arrayList = new ArrayList();
                                    Iterator it3 = linkedList.iterator();
                                    while (it3.hasNext()) {
                                        QueuedHeapChange queuedHeapChange2 = (QueuedHeapChange) it3.next();
                                        if (!queuedHeapChange2.isUpdate()) {
                                            break;
                                        }
                                        arrayList.add(queuedHeapChange2.getUpdate());
                                        it3.remove();
                                    }
                                    if (!arrayList.isEmpty()) {
                                        int size = arrayList.size();
                                        int i = 0;
                                        while (i < size) {
                                            int i2 = 0;
                                            int i3 = 0;
                                            ArrayList arrayList2 = new ArrayList();
                                            Iterator it4 = arrayList.iterator();
                                            while (it4.hasNext()) {
                                                Update update = (Update) it4.next();
                                                int size2 = update.getActions().size();
                                                if (i2 > 0 && i3 + size2 > PooledServerConnectedObjectManager.this.maxUpdateActionsPerMessage) {
                                                    break;
                                                }
                                                arrayList2.add(update);
                                                it4.remove();
                                                i2++;
                                                i3 += size2;
                                            }
                                            HashSet<Byte> hashSet = new HashSet();
                                            Iterator<IoSession> it5 = heapState.getSessions().iterator();
                                            while (it5.hasNext()) {
                                                hashSet.add(Byte.valueOf(CougarProtocol.getProtocolVersion(it5.next())));
                                            }
                                            HashMap hashMap = new HashMap();
                                            long nextUpdateId = heapState.getNextUpdateId();
                                            for (Byte b : hashSet) {
                                                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                                                CougarObjectOutput newCougarObjectOutput = PooledServerConnectedObjectManager.this.objectIOFactory.newCougarObjectOutput(byteArrayOutputStream, b.byteValue());
                                                newCougarObjectOutput.writeObject(new HeapDelta(heapState.getHeapId(), nextUpdateId, arrayList2));
                                                newCougarObjectOutput.flush();
                                                hashMap.put(b, new EventMessage(byteArrayOutputStream.toByteArray()));
                                            }
                                            for (IoSession ioSession : heapState.getSessions()) {
                                                PooledServerConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Sending heap delta of size %s and with updateId = %s for heapId = %s", new Object[]{Integer.valueOf(arrayList2.size()), Long.valueOf(nextUpdateId), Long.valueOf(heapState.getHeapId())});
                                                ioSession.write(hashMap.get(Byte.valueOf(CougarProtocol.getProtocolVersion(ioSession))));
                                            }
                                            i += arrayList2.size();
                                        }
                                    }
                                    if (!linkedList.isEmpty() && ((QueuedHeapChange) linkedList.get(0)).isTermination()) {
                                        linkedList.remove(0);
                                        PooledServerConnectedObjectManager.this.terminateSubscriptions(str, Subscription.CloseReason.REQUESTED_BY_PUBLISHER);
                                    }
                                }
                                updateLock.unlock();
                            }
                        }
                    } catch (InterruptedException e2) {
                    }
                } catch (Error e3) {
                    PooledServerConnectedObjectManager.logger.log(Level.SEVERE, "thread died", e3, new Object[0]);
                    throw e3;
                } catch (Exception e4) {
                    PooledServerConnectedObjectManager.logger.log(Level.SEVERE, "thread died", e4, new Object[0]);
                    return;
                }
            }
        }

        public void stop() {
            this.running = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/betfair/cougar/transport/socket/PooledServerConnectedObjectManager$HeapState.class */
    public class HeapState implements HeapStateMonitoring {
        private final long heapId;
        private UpdateProducingHeapListener listener;
        private final Heap heap;
        private volatile boolean terminated;
        private Lock updateLock = new ReentrantLock();
        private final List<IoSession> sessions = new CopyOnWriteArrayList();
        private final AtomicLong updateIdGenerator = new AtomicLong();
        private final Queue<QueuedHeapChange> queuedChanges = new ConcurrentLinkedQueue();
        private final Map<String, SubscriptionDetails> subscriptions = new HashMap();
        private final Map<IoSession, List<String>> sessionSubscriptions = new HashMap();
        private final long instanceId = PooledServerConnectedObjectManager.heapStateInstanceIdSource.incrementAndGet();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:com/betfair/cougar/transport/socket/PooledServerConnectedObjectManager$HeapState$SubscriptionDetails.class */
        public class SubscriptionDetails {
            Subscription subscription;
            LogExtension logExtension;

            SubscriptionDetails() {
            }
        }

        @Override // com.betfair.cougar.transport.socket.PooledServerConnectedObjectManager.HeapStateMonitoring
        public SortedMap<String, List<String>> getSubscriptionIdsBySessionId() {
            TreeMap treeMap = new TreeMap();
            try {
                this.updateLock.lock();
                PooledServerConnectedObjectManager.this.subTermLock.lock();
                for (IoSession ioSession : this.sessionSubscriptions.keySet()) {
                    treeMap.put(NioUtils.getSessionId(ioSession), this.sessionSubscriptions.get(ioSession));
                }
                try {
                    this.updateLock.unlock();
                    PooledServerConnectedObjectManager.this.subTermLock.unlock();
                    return treeMap;
                } finally {
                }
            } catch (Throwable th) {
                try {
                    this.updateLock.unlock();
                    PooledServerConnectedObjectManager.this.subTermLock.unlock();
                    throw th;
                } finally {
                }
            }
        }

        @Override // com.betfair.cougar.transport.socket.PooledServerConnectedObjectManager.HeapStateMonitoring
        public long getLastUpdateId() {
            return this.updateIdGenerator.get();
        }

        @Override // com.betfair.cougar.transport.socket.PooledServerConnectedObjectManager.HeapStateMonitoring
        public int getSubscriptionCount() {
            return this.subscriptions.size();
        }

        @Override // com.betfair.cougar.transport.socket.PooledServerConnectedObjectManager.HeapStateMonitoring
        public int getSessionCount() {
            return this.sessions.size();
        }

        public HeapState(Heap heap) {
            this.heap = heap;
            this.heapId = PooledServerConnectedObjectManager.this.heapIdGenerator.incrementAndGet();
        }

        public Queue<QueuedHeapChange> getQueuedChanges() {
            return this.queuedChanges;
        }

        public Lock getUpdateLock() {
            return this.updateLock;
        }

        public long getHeapId() {
            return this.heapId;
        }

        public List<IoSession> getSessions() {
            return this.sessions;
        }

        public long getNextUpdateId() {
            return this.updateIdGenerator.incrementAndGet();
        }

        public void addSession(IoSession ioSession) {
            if (this.sessions.contains(ioSession)) {
                return;
            }
            this.sessions.add(ioSession);
        }

        public void removeSession(IoSession ioSession) {
            this.sessions.remove(ioSession);
        }

        public void setHeapListener(UpdateProducingHeapListener updateProducingHeapListener) {
            this.listener = updateProducingHeapListener;
        }

        public void removeListener() {
            this.heap.removeListener(this.listener);
        }

        public void terminate() {
            this.terminated = true;
        }

        public boolean isTerminated() {
            return this.terminated;
        }

        public String addSubscription(LogExtension logExtension, Subscription subscription, IoSession ioSession) {
            SubscriptionDetails subscriptionDetails = new SubscriptionDetails();
            subscriptionDetails.logExtension = logExtension;
            subscriptionDetails.subscription = subscription;
            String nextUUID = PooledServerConnectedObjectManager.this.uuidGenerator.getNextUUID();
            this.subscriptions.put(nextUUID, subscriptionDetails);
            List<String> list = this.sessionSubscriptions.get(ioSession);
            if (list == null) {
                list = new ArrayList();
                this.sessionSubscriptions.put(ioSession, list);
            }
            list.add(nextUUID);
            logSubscriptionStart(nextUUID, logExtension);
            return nextUUID;
        }

        private void logSubscriptionStart(String str, LogExtension logExtension) {
            log(str, this.heap.getUri(), "SUBSCRIPTION_START", logExtension);
        }

        public void logSubscriptionEnd(String str, LogExtension logExtension, Subscription.CloseReason closeReason) {
            if (closeReason == null) {
                PooledServerConnectedObjectManager.logger.log(Level.WARNING, "Close reason not provided for subscription " + str + " to heap " + this.heap.getUri() + " defaulting to INTERNAL_ERROR", new IllegalStateException(), new Object[0]);
                closeReason = Subscription.CloseReason.INTERNAL_ERROR;
            }
            log(str, this.heap.getUri(), closeReason.name(), logExtension);
        }

        private void log(String str, String str2, String str3, LogExtension logExtension) {
            PooledServerConnectedObjectManager.this.eventLogger.logEvent(new ConnectedObjectLogEvent("PUSH_SUBSCRIPTION-LOG", new Date(), str, str2, str3), logExtension != null ? logExtension.getFieldsToLog() : null);
        }

        public void terminateSubscriptions(IoSession ioSession, Subscription.CloseReason closeReason) {
            this.sessions.remove(ioSession);
            List<String> remove = this.sessionSubscriptions.remove(ioSession);
            if (remove != null) {
                Iterator<String> it = remove.iterator();
                while (it.hasNext()) {
                    try {
                        this.subscriptions.remove(it.next()).subscription.close(closeReason);
                    } catch (Exception e) {
                        PooledServerConnectedObjectManager.logger.log(Level.WARNING, "Error trying to close subscription", new Object[0]);
                    }
                }
            }
        }

        public boolean hasSubscriptions() {
            return this.subscriptions.isEmpty();
        }

        public List<String> getSubscriptions(IoSession ioSession) {
            return this.sessionSubscriptions.get(ioSession);
        }

        public void terminateSubscription(IoSession ioSession, String str, Subscription.CloseReason closeReason) {
            this.sessionSubscriptions.get(ioSession).remove(str);
            try {
                SubscriptionDetails remove = this.subscriptions.remove(str);
                if (closeReason != Subscription.CloseReason.REQUESTED_BY_PUBLISHER) {
                    remove.subscription.close(closeReason);
                }
            } catch (Exception e) {
                PooledServerConnectedObjectManager.logger.log(Level.WARNING, "Error trying to close subscription", new Object[0]);
            }
        }

        Map<String, SubscriptionDetails> getSubscriptions() {
            return this.subscriptions;
        }

        public long getInstanceId() {
            return this.instanceId;
        }

        public Heap getHeap() {
            return this.heap;
        }
    }

    /* loaded from: input_file:com/betfair/cougar/transport/socket/PooledServerConnectedObjectManager$HeapStateMonitoring.class */
    public interface HeapStateMonitoring {
        SortedMap<String, List<String>> getSubscriptionIdsBySessionId();

        long getLastUpdateId();

        int getSubscriptionCount();

        int getSessionCount();
    }

    /* loaded from: input_file:com/betfair/cougar/transport/socket/PooledServerConnectedObjectManager$HeapTermination.class */
    private class HeapTermination extends QueuedHeapChange {
        private HeapTermination() {
            super();
        }

        @Override // com.betfair.cougar.transport.socket.PooledServerConnectedObjectManager.QueuedHeapChange
        public boolean isTermination() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/betfair/cougar/transport/socket/PooledServerConnectedObjectManager$Multiset.class */
    public static class Multiset<T> {
        private final Map<T, Integer> map = new HashMap();

        Multiset() {
        }

        public boolean add(T t) {
            this.map.put(t, Integer.valueOf(Integer.valueOf(nullToZero(this.map.get(t))).intValue() + 1));
            return true;
        }

        public boolean remove(T t) {
            Integer num = this.map.get(t);
            if (num == null) {
                return false;
            }
            if (num.intValue() == 1) {
                this.map.remove(t);
                return true;
            }
            this.map.put(t, Integer.valueOf(num.intValue() - 1));
            return true;
        }

        public int removeAll(T t) {
            return nullToZero(this.map.remove(t));
        }

        public int count(T t) {
            return nullToZero(this.map.get(t));
        }

        public Set<T> keySet() {
            return this.map.keySet();
        }

        public boolean isEmpty() {
            return this.map.isEmpty();
        }

        private int nullToZero(Integer num) {
            if (num == null) {
                return 0;
            }
            return num.intValue();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/betfair/cougar/transport/socket/PooledServerConnectedObjectManager$QueuedHeapChange.class */
    public class QueuedHeapChange {
        private Update update;
        private QueuedSubscription sub;

        private QueuedHeapChange(Update update) {
            this.update = update;
        }

        private QueuedHeapChange(QueuedSubscription queuedSubscription) {
            this.sub = queuedSubscription;
        }

        protected QueuedHeapChange() {
        }

        public Update getUpdate() {
            return this.update;
        }

        public QueuedSubscription getSub() {
            return this.sub;
        }

        public boolean isUpdate() {
            return this.update != null;
        }

        public boolean isSub() {
            return this.sub != null;
        }

        public boolean isTermination() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/betfair/cougar/transport/socket/PooledServerConnectedObjectManager$QueuedSubscription.class */
    public class QueuedSubscription {
        private IoSession session;
        private InitialUpdate initialState;

        private QueuedSubscription(IoSession ioSession, InitialUpdate initialUpdate) {
            this.session = ioSession;
            this.initialState = initialUpdate;
        }

        public IoSession getSession() {
            return this.session;
        }

        public InitialUpdate getInitialState() {
            return this.initialState;
        }
    }

    Map<String, HeapState> getHeapStates() {
        return this.heapStates;
    }

    public Map<Long, String> getHeapUris() {
        return this.heapUris;
    }

    Map<IoSession, Multiset<String>> getHeapsByClient() {
        return this.heapsByClient;
    }

    BlockingDeque<String> getHeapsWaitingForUpdate() {
        return this.heapsWaitingForUpdate;
    }

    public List<String> getHeapsForSession(IoSession ioSession) {
        ArrayList arrayList = new ArrayList();
        try {
            this.subTermLock.lock();
            Multiset<String> multiset = this.heapsByClient.get(ioSession);
            if (multiset != null) {
                arrayList.addAll(multiset.keySet());
            }
            return arrayList;
        } finally {
            this.subTermLock.unlock();
        }
    }

    public HeapStateMonitoring getHeapStateForMonitoring(String str) {
        return this.heapStates.get(str);
    }

    @Override // com.betfair.cougar.transport.socket.ServerConnectedObjectManager
    public void setNioLogger(NioLogger nioLogger) {
        this.nioLogger = nioLogger;
    }

    public void setObjectIOFactory(CougarObjectIOFactory cougarObjectIOFactory) {
        this.objectIOFactory = cougarObjectIOFactory;
    }

    public void setNumProcessingThreads(int i) {
        this.numProcessingThreads = i;
    }

    public void setEventLogger(EventLogger eventLogger) {
        this.eventLogger = eventLogger;
    }

    public void setMaxUpdateActionsPerMessage(int i) {
        this.maxUpdateActionsPerMessage = i;
    }

    public void start() {
        for (int i = 0; i < this.numProcessingThreads; i++) {
            ConnectedObjectPusher connectedObjectPusher = new ConnectedObjectPusher();
            this.pushers.add(connectedObjectPusher);
            new Thread(connectedObjectPusher, "ConnectedObjectPusher-" + (i + 1)).start();
        }
        Runtime.getRuntime().addShutdownHook(this.shutdownHook);
    }

    public void stop() {
        Iterator<ConnectedObjectPusher> it = this.pushers.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
        this.shutdownHook.run();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void terminateAllSubscriptions(Subscription.CloseReason closeReason) {
        if (this.heapsByClient != null) {
            try {
                this.subTermLock.lock();
                ArrayList arrayList = new ArrayList(this.heapsByClient.keySet());
                this.subTermLock.unlock();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    terminateSubscriptions((IoSession) it.next(), closeReason);
                }
            } catch (Throwable th) {
                this.subTermLock.unlock();
                throw th;
            }
        }
    }

    private HeapState processHeapStateCreation(ConnectedResponse connectedResponse, final String str) {
        if (!this.subTermLock.isHeldByCurrentThread()) {
            throw new IllegalStateException("You must have the subTermLock before calling this method");
        }
        final HeapState heapState = new HeapState(connectedResponse.getHeap());
        heapState.getUpdateLock().lock();
        UpdateProducingHeapListener updateProducingHeapListener = new UpdateProducingHeapListener() { // from class: com.betfair.cougar.transport.socket.PooledServerConnectedObjectManager.2
            @Override // com.betfair.cougar.transport.socket.UpdateProducingHeapListener
            protected void doUpdate(Update update) {
                if (update.getActions().size() > 0) {
                    heapState.getQueuedChanges().add(new QueuedHeapChange(update));
                    if (update.getActions().contains(TerminateHeap.INSTANCE)) {
                        heapState.getQueuedChanges().add(new HeapTermination());
                    }
                    PooledServerConnectedObjectManager.this.heapsWaitingForUpdate.add(str);
                }
            }
        };
        heapState.setHeapListener(updateProducingHeapListener);
        connectedResponse.getHeap().addListener(updateProducingHeapListener, false);
        this.heapStates.put(str, heapState);
        this.heapUris.put(Long.valueOf(heapState.getHeapId()), str);
        return heapState;
    }

    @Override // com.betfair.cougar.transport.socket.ServerConnectedObjectManager
    public void addSubscription(final SocketTransportCommandProcessor socketTransportCommandProcessor, final SocketTransportRPCCommand socketTransportRPCCommand, ConnectedResponse connectedResponse, OperationDefinition operationDefinition, final ExecutionContextWithTokens executionContextWithTokens, final LogExtension logExtension) {
        final String uri = connectedResponse.getHeap().getUri();
        HeapState heapState = null;
        boolean z = false;
        while (!z) {
            try {
                this.subTermLock.lock();
                heapState = this.heapStates.get(uri);
                if (heapState == null) {
                    heapState = processHeapStateCreation(connectedResponse, uri);
                    z = true;
                } else {
                    this.subTermLock.unlock();
                    heapState.getUpdateLock().lock();
                    this.subTermLock.lock();
                    if (this.heapStates.containsKey(uri)) {
                        HeapState heapState2 = this.heapStates.get(uri);
                        if (heapState2.getInstanceId() == heapState.getInstanceId()) {
                            z = true;
                        } else {
                            heapState.getUpdateLock().unlock();
                            heapState = heapState2;
                        }
                    } else {
                        heapState = processHeapStateCreation(connectedResponse, uri);
                        z = true;
                    }
                }
            } catch (Throwable th) {
                this.subTermLock.unlock();
                if (!$assertionsDisabled && heapState == null) {
                    throw new AssertionError();
                }
                heapState.getUpdateLock().unlock();
                throw th;
            }
        }
        final HeapState heapState3 = heapState;
        final Subscription subscription = connectedResponse.getSubscription();
        connectedResponse.getHeap().traverse(new UpdateProducingHeapListener() { // from class: com.betfair.cougar.transport.socket.PooledServerConnectedObjectManager.3
            @Override // com.betfair.cougar.transport.socket.UpdateProducingHeapListener
            protected void doUpdate(Update update) {
                if (update.getActions().contains(TerminateHeap.INSTANCE)) {
                    PooledServerConnectedObjectManager.this.terminateSubscriptions(socketTransportRPCCommand.getSession(), uri, Subscription.CloseReason.REQUESTED_BY_PUBLISHER);
                    socketTransportCommandProcessor.writeErrorResponse((SocketTransportCommand) socketTransportRPCCommand, executionContextWithTokens, (CougarException) new CougarFrameworkException("Subscription requested for terminated heap: " + uri));
                    return;
                }
                Multiset multiset = (Multiset) PooledServerConnectedObjectManager.this.heapsByClient.get(socketTransportRPCCommand.getSession());
                if (multiset == null) {
                    multiset = new Multiset();
                    PooledServerConnectedObjectManager.this.heapsByClient.put(socketTransportRPCCommand.getSession(), multiset);
                }
                long heapId = heapState3.getHeapId();
                final String addSubscription = heapState3.addSubscription(logExtension, subscription, socketTransportRPCCommand.getSession());
                subscription.addListener(new Subscription.SubscriptionListener() { // from class: com.betfair.cougar.transport.socket.PooledServerConnectedObjectManager.3.1
                    public void subscriptionClosed(Subscription subscription2, Subscription.CloseReason closeReason) {
                        if (closeReason == Subscription.CloseReason.REQUESTED_BY_PUBLISHER) {
                            PooledServerConnectedObjectManager.this.terminateSubscription(socketTransportRPCCommand.getSession(), uri, addSubscription, closeReason);
                        }
                        heapState3.logSubscriptionEnd(addSubscription, logExtension, closeReason);
                    }
                });
                boolean z2 = multiset.count(uri) == 0;
                multiset.add(uri);
                if (!socketTransportCommandProcessor.writeSuccessResponse(socketTransportRPCCommand, new ExecutionResult(z2 ? new NewHeapSubscription(heapId, addSubscription, uri) : new NewHeapSubscription(heapId, addSubscription)))) {
                    PooledServerConnectedObjectManager.this.terminateSubscriptions(socketTransportRPCCommand.getSession(), uri, Subscription.CloseReason.INTERNAL_ERROR);
                }
                if (z2) {
                    heapState3.getQueuedChanges().add(new QueuedHeapChange(new QueuedSubscription(socketTransportRPCCommand.getSession(), new InitialUpdate(update))));
                    PooledServerConnectedObjectManager.this.heapsWaitingForUpdate.add(uri);
                }
            }
        });
        this.subTermLock.unlock();
        if (!$assertionsDisabled && heapState == null) {
            throw new AssertionError();
        }
        heapState.getUpdateLock().unlock();
    }

    @Override // com.betfair.cougar.transport.socket.ServerConnectedObjectManager
    public void terminateSubscription(IoSession ioSession, TerminateSubscription terminateSubscription) {
        Subscription.CloseReason closeReason = Subscription.CloseReason.REQUESTED_BY_PUBLISHER;
        try {
            closeReason = Subscription.CloseReason.valueOf(terminateSubscription.getCloseReason());
        } catch (IllegalArgumentException e) {
        }
        terminateSubscription(ioSession, this.heapUris.get(Long.valueOf(terminateSubscription.getHeapId())), terminateSubscription.getSubscriptionId(), closeReason);
    }

    public void terminateSubscription(IoSession ioSession, String str, String str2, Subscription.CloseReason closeReason) {
        Lock lock = null;
        try {
            HeapState heapState = this.heapStates.get(str);
            if (heapState != null) {
                lock = heapState.getUpdateLock();
                lock.lock();
            }
            this.subTermLock.lock();
            if (heapState != null && !heapState.isTerminated()) {
                heapState.terminateSubscription(ioSession, str2, closeReason);
                if (closeReason == Subscription.CloseReason.REQUESTED_BY_PUBLISHER || closeReason == Subscription.CloseReason.REQUESTED_BY_PUBLISHER_ADMINISTRATOR) {
                    try {
                        this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Notifying client that publisher has terminated subscription %s", new Object[]{str2});
                        NioUtils.writeEventMessageToSession(ioSession, new TerminateSubscription(heapState.getHeapId(), str2, closeReason.name()), this.objectIOFactory);
                    } catch (Exception e) {
                        logger.log(Level.INFO, "Error occurred whilst trying to inform client of subscription termination", e, new Object[0]);
                        this.nioLogger.log(NioLogger.LoggingLevel.SESSION, ioSession, "Error occurred whilst trying to inform client of subscription termination, closing session", new Object[0]);
                        ioSession.close();
                    }
                }
                if (heapState.hasSubscriptions()) {
                    terminateSubscriptions(str, closeReason);
                } else if (heapState.getSubscriptions(ioSession).isEmpty()) {
                    terminateSubscriptions(ioSession, str, closeReason);
                }
            }
            Multiset<String> multiset = this.heapsByClient.get(ioSession);
            if (multiset != null) {
                multiset.remove(str);
                if (multiset.isEmpty()) {
                    terminateSubscriptions(ioSession, closeReason);
                }
            }
        } finally {
            this.subTermLock.unlock();
            if (lock != null) {
                lock.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void terminateSubscriptions(IoSession ioSession, String str, Subscription.CloseReason closeReason) {
        terminateSubscriptions(ioSession, this.heapStates.get(str), str, closeReason);
    }

    private void terminateSubscriptions(IoSession ioSession, HeapState heapState, String str, Subscription.CloseReason closeReason) {
        Lock lock = null;
        if (heapState != null) {
            try {
                lock = heapState.getUpdateLock();
                lock.lock();
            } finally {
                this.subTermLock.unlock();
                if (lock != null) {
                    lock.unlock();
                }
            }
        }
        this.subTermLock.lock();
        if (heapState != null && !heapState.isTerminated()) {
            heapState.terminateSubscriptions(ioSession, closeReason);
            if (heapState.getSessions().isEmpty()) {
                terminateSubscriptions(str, closeReason);
            }
        }
        Multiset<String> multiset = this.heapsByClient.get(ioSession);
        if (multiset != null) {
            this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Terminating subscription on %s heaps", new Object[]{Integer.valueOf(multiset.keySet().size())});
            multiset.removeAll(str);
            if (multiset.isEmpty()) {
                terminateSubscriptions(ioSession, closeReason);
            }
        }
    }

    private void terminateSubscriptions(IoSession ioSession, Subscription.CloseReason closeReason) {
        try {
            this.subTermLock.lock();
            Multiset<String> remove = this.heapsByClient.remove(ioSession);
            this.subTermLock.unlock();
            if (remove != null) {
                Iterator<String> it = remove.keySet().iterator();
                while (it.hasNext()) {
                    terminateSubscriptions(ioSession, it.next(), closeReason);
                }
            }
        } catch (Throwable th) {
            this.subTermLock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void terminateSubscriptions(String str, Subscription.CloseReason closeReason) {
        HeapState heapState = this.heapStates.get(str);
        if (heapState != null) {
            try {
                heapState.getUpdateLock().lock();
                this.subTermLock.lock();
                if (!heapState.isTerminated()) {
                    this.heapStates.remove(str);
                    this.heapUris.remove(Long.valueOf(heapState.getHeapId()));
                    Iterator<IoSession> it = heapState.getSessions().iterator();
                    while (it.hasNext()) {
                        terminateSubscriptions(it.next(), heapState, str, closeReason);
                    }
                    logger.log(Level.SEVERE, "Terminating heap state '%s'", new Object[]{str});
                    heapState.terminate();
                    heapState.removeListener();
                }
            } finally {
                this.subTermLock.unlock();
                heapState.getUpdateLock().unlock();
            }
        }
    }

    public void sessionOpened(IoSession ioSession) {
    }

    public void sessionClosed(IoSession ioSession) {
        this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Session closed, terminating live subscriptions", new Object[0]);
        terminateSubscriptions(ioSession, Subscription.CloseReason.CONNECTION_CLOSED);
    }

    @ManagedAttribute(description = "Number of active heaps")
    public int getNumberOfHeaps() {
        if (this.heapUris != null) {
            return this.heapUris.size();
        }
        return 0;
    }

    @ManagedOperation(description = "Number of subscriptions for the given heap URI")
    public int getHeapSubscriptionCount(String str) {
        HeapState heapState;
        if (this.heapStates == null || (heapState = this.heapStates.get(str)) == null) {
            return -1;
        }
        return heapState.getSubscriptionCount();
    }

    @ManagedOperation(description = "Number of sessions subscribed for the given heap URI")
    public int getHeapSessionCount(String str) {
        HeapState heapState;
        if (this.heapStates == null || (heapState = this.heapStates.get(str)) == null) {
            return -1;
        }
        return heapState.getSessionCount();
    }

    @ManagedOperation(description = "Has the specified heap terminated")
    public boolean hasHeapTerminated(String str) {
        HeapState heapState;
        if (this.heapStates == null || (heapState = this.heapStates.get(str)) == null) {
            return true;
        }
        return heapState.isTerminated();
    }

    @ManagedOperation(description = "Last received update Id")
    public long getLastUpdateId(String str) {
        HeapState heapState;
        if (this.heapStates == null || (heapState = this.heapStates.get(str)) == null) {
            return -1L;
        }
        return heapState.getLastUpdateId();
    }

    @ManagedOperation(description = "Number of updates queued for the specified heap")
    public long showNumOfQueuedChanges(String str) {
        HeapState heapState;
        if (this.heapStates == null || (heapState = this.heapStates.get(str)) == null) {
            return -1L;
        }
        return heapState.getQueuedChanges().size();
    }

    @ManagedAttribute(description = "Number of pusher threads")
    public int getNumProcessingThreads() {
        return this.numProcessingThreads;
    }

    static {
        $assertionsDisabled = !PooledServerConnectedObjectManager.class.desiredAssertionStatus();
        logger = CougarLoggingUtils.getLogger(PooledServerConnectedObjectManager.class);
        heapStateInstanceIdSource = new AtomicLong();
    }
}
