package com.betfair.cougar.client.socket;

import com.betfair.cougar.client.socket.HeapState;
import com.betfair.cougar.core.api.ev.ExecutionObserver;
import com.betfair.cougar.core.api.ev.ExecutionResult;
import com.betfair.cougar.core.api.ev.Subscription;
import com.betfair.cougar.core.api.exception.CougarClientException;
import com.betfair.cougar.core.api.exception.ServerFaultCode;
import com.betfair.cougar.core.impl.ev.ConnectedResponseImpl;
import com.betfair.cougar.netutil.nio.HeapDelta;
import com.betfair.cougar.netutil.nio.NioLogger;
import com.betfair.cougar.netutil.nio.NioUtils;
import com.betfair.cougar.netutil.nio.TerminateSubscription;
import com.betfair.cougar.netutil.nio.connected.InitialUpdate;
import com.betfair.cougar.transport.api.protocol.CougarObjectIOFactory;
import com.betfair.cougar.transport.api.protocol.socket.InvocationResponse;
import com.betfair.cougar.transport.api.protocol.socket.NewHeapSubscription;
import com.betfair.platform.virtualheap.ImmutableHeap;
import com.betfair.platform.virtualheap.conflate.Conflater;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.mina.common.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/betfair/cougar/client/socket/ClientConnectedObjectManager.class */
public class ClientConnectedObjectManager {
    private NioLogger nioLogger;
    private int numProcessingThreads;
    private long maxInitialPopulationWait;
    private long pullerAwaitTimeout;
    private long missingDeltaTimeout;
    private int maxDeltaQueue;
    private Conflater newListenerConflater;
    private CougarObjectIOFactory objectIOFactory;
    private static final Logger LOGGER = LoggerFactory.getLogger(ClientConnectedObjectManager.class);
    private static final AtomicLong initialPopulationThreadIdSource = new AtomicLong();
    private ConcurrentHashMap<String, ConnectedHeaps> heapsByServer = new ConcurrentHashMap<>();
    private BlockingDeque<String> sessionsWithUpdates = new LinkedBlockingDeque();
    private final List<ConnectedObjectPuller> pullers = new ArrayList();
    private final Lock heapSubMutationLock = new ReentrantLock();
    private final Lock queueHealthCheckLock = new ReentrantLock();

    /* loaded from: input_file:com/betfair/cougar/client/socket/ClientConnectedObjectManager$ConnectedHeaps.class */
    public class ConnectedHeaps {
        private Map<Long, HeapState> heapStates = new HashMap();
        private Map<Long, CountDownLatch> initialLatches = new HashMap();
        private BlockingDeque<Long> heapsWithUpdates = new LinkedBlockingDeque();
        private AtomicLong queueLength = new AtomicLong();

        public ConnectedHeaps() {
        }

        public boolean addHeap(long j, String str) {
            ClientConnectedObjectManager.this.heapSubMutationLock.lock();
            try {
                if (this.heapStates.containsKey(Long.valueOf(j))) {
                    return false;
                }
                ImmutableHeap immutableHeap = new ImmutableHeap(str, ClientConnectedObjectManager.this.newListenerConflater);
                this.initialLatches.put(Long.valueOf(j), new CountDownLatch(1));
                this.heapStates.put(Long.valueOf(j), new HeapState(immutableHeap));
                ClientConnectedObjectManager.this.heapSubMutationLock.unlock();
                return true;
            } finally {
                ClientConnectedObjectManager.this.heapSubMutationLock.unlock();
            }
        }

        public CountDownLatch getInitialPopulationLatch(long j) {
            return this.initialLatches.get(Long.valueOf(j));
        }

        public void removeInitialPopulationLatch(long j) {
            ClientConnectedObjectManager.this.heapSubMutationLock.lock();
            try {
                this.initialLatches.remove(Long.valueOf(j));
                ClientConnectedObjectManager.this.heapSubMutationLock.unlock();
            } catch (Throwable th) {
                ClientConnectedObjectManager.this.heapSubMutationLock.unlock();
                throw th;
            }
        }

        public HeapState getHeapState(long j) {
            return this.heapStates.get(Long.valueOf(j));
        }

        public Long pollNextHeapId() {
            Long pollFirst = this.heapsWithUpdates.pollFirst();
            if (pollFirst != null) {
                this.queueLength.decrementAndGet();
            }
            return pollFirst;
        }

        public void queueUpdatedHeap(long j) {
            this.heapsWithUpdates.add(Long.valueOf(j));
            this.queueLength.incrementAndGet();
        }

        public void terminateHeap(long j, Subscription.CloseReason closeReason) {
            ClientConnectedObjectManager.this.heapSubMutationLock.lock();
            try {
                HeapState remove = this.heapStates.remove(Long.valueOf(j));
                if (remove != null) {
                    remove.terminateAllSubscriptions(closeReason);
                }
                this.initialLatches.remove(Long.valueOf(j));
                ClientConnectedObjectManager.this.heapSubMutationLock.unlock();
            } catch (Throwable th) {
                ClientConnectedObjectManager.this.heapSubMutationLock.unlock();
                throw th;
            }
        }

        public void terminateAllHeaps(Subscription.CloseReason closeReason) {
            ClientConnectedObjectManager.this.heapSubMutationLock.lock();
            try {
                Iterator it = new ArrayList(this.heapStates.keySet()).iterator();
                while (it.hasNext()) {
                    terminateHeap(((Long) it.next()).longValue(), closeReason);
                }
            } finally {
                ClientConnectedObjectManager.this.heapSubMutationLock.unlock();
            }
        }

        public boolean isEmpty() {
            return this.heapStates.isEmpty();
        }

        public List<Long> getAllHeapIds() {
            ClientConnectedObjectManager.this.heapSubMutationLock.lock();
            try {
                ArrayList arrayList = new ArrayList(this.heapStates.keySet());
                ClientConnectedObjectManager.this.heapSubMutationLock.unlock();
                return arrayList;
            } catch (Throwable th) {
                ClientConnectedObjectManager.this.heapSubMutationLock.unlock();
                throw th;
            }
        }

        public int getHeapCount() {
            return this.heapStates.size();
        }

        public long getQueueLength() {
            return this.queueLength.get();
        }
    }

    /* loaded from: input_file:com/betfair/cougar/client/socket/ClientConnectedObjectManager$ConnectedObjectPuller.class */
    private class ConnectedObjectPuller implements Runnable {
        private volatile boolean running;

        private ConnectedObjectPuller() {
            this.running = true;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    String str = (String) ClientConnectedObjectManager.this.sessionsWithUpdates.pollFirst(ClientConnectedObjectManager.this.pullerAwaitTimeout, TimeUnit.MILLISECONDS);
                    if (str != null) {
                        ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str, "Found session with queued heap update", new Object[0]);
                        ConnectedHeaps connectedHeaps = (ConnectedHeaps) ClientConnectedObjectManager.this.heapsByServer.get(str);
                        if (connectedHeaps != null) {
                            Long pollNextHeapId = connectedHeaps.pollNextHeapId();
                            if (pollNextHeapId != null) {
                                ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str, "Queued heap update found for heapId = %s", new Object[]{pollNextHeapId});
                                HeapState heapState = connectedHeaps.getHeapState(pollNextHeapId.longValue());
                                if (heapState != null) {
                                    Lock heapUpdateLock = heapState.getHeapUpdateLock();
                                    heapUpdateLock.lock();
                                    try {
                                        try {
                                            HeapDelta peekNextDelta = heapState.peekNextDelta();
                                            if (peekNextDelta == null) {
                                                ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str, "All contiguous deltas already processed for heapId = %s", new Object[]{pollNextHeapId});
                                            }
                                            while (peekNextDelta != null) {
                                                HeapDelta heapDelta = peekNextDelta;
                                                ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str, "Applying delta %s for heapId = %s", new Object[]{Long.valueOf(heapDelta.getUpdateId()), pollNextHeapId});
                                                if (heapDelta.containsHeapTermination()) {
                                                    ClientConnectedObjectManager.this.heapSubMutationLock.lock();
                                                    try {
                                                        heapDelta.applyTo(heapState.getHeap().asListener());
                                                        heapState.popNextDelta();
                                                        ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str, "Found heap termination in delta %s for heapId = %s", new Object[]{Long.valueOf(heapDelta.getUpdateId()), pollNextHeapId});
                                                        ClientConnectedObjectManager.this.terminateSubscriptions(str, pollNextHeapId.longValue(), Subscription.CloseReason.REQUESTED_BY_PUBLISHER);
                                                        peekNextDelta = null;
                                                        ClientConnectedObjectManager.this.heapSubMutationLock.unlock();
                                                    } catch (Throwable th) {
                                                        ClientConnectedObjectManager.this.heapSubMutationLock.unlock();
                                                        throw th;
                                                    }
                                                } else {
                                                    heapDelta.applyTo(heapState.getHeap().asListener());
                                                    heapState.popNextDelta();
                                                    peekNextDelta = heapState.peekNextDelta();
                                                }
                                                if (heapDelta.containsFirstUpdate()) {
                                                    ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str, "Found initial update in delta for heapId = %s", new Object[]{pollNextHeapId});
                                                    CountDownLatch initialPopulationLatch = connectedHeaps.getInitialPopulationLatch(pollNextHeapId.longValue());
                                                    if (initialPopulationLatch != null) {
                                                        initialPopulationLatch.countDown();
                                                    }
                                                }
                                            }
                                            heapUpdateLock.unlock();
                                        } catch (Throwable th2) {
                                            heapUpdateLock.unlock();
                                            throw th2;
                                        }
                                    } catch (Exception e) {
                                        ClientConnectedObjectManager.LOGGER.warn("Error processing update", e);
                                        ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str, "Error occurred processing update for heapId = %s, terminating heap", new Object[]{pollNextHeapId});
                                        ClientConnectedObjectManager.this.terminateSubscriptions(str, pollNextHeapId.longValue(), Subscription.CloseReason.INTERNAL_ERROR);
                                        heapUpdateLock.unlock();
                                    }
                                } else {
                                    ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str, "Received updated for unknown heap, id = %s, assuming it's already been processed by another thread", new Object[]{pollNextHeapId});
                                }
                            } else {
                                ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str, "Queued heap update already processed by another thread", new Object[0]);
                            }
                        } else if (ClientConnectedObjectManager.this.nioLogger.isLogging(NioLogger.LoggingLevel.TRANSPORT)) {
                            ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str, "No heaps found for session, they must have been terminated", new Object[0]);
                        }
                    }
                    if (ClientConnectedObjectManager.this.queueHealthCheckLock.tryLock()) {
                        try {
                            Iterator it = new ArrayList(ClientConnectedObjectManager.this.heapsByServer.keySet()).iterator();
                            while (it.hasNext()) {
                                String str2 = (String) it.next();
                                ConnectedHeaps connectedHeaps2 = (ConnectedHeaps) ClientConnectedObjectManager.this.heapsByServer.get(str2);
                                for (Long l : connectedHeaps2.getAllHeapIds()) {
                                    HeapState heapState2 = connectedHeaps2.getHeapState(l.longValue());
                                    if (heapState2 != null) {
                                        HeapState.QueueHealth checkDeltaQueueHealth = heapState2.checkDeltaQueueHealth(ClientConnectedObjectManager.this.maxDeltaQueue, ClientConnectedObjectManager.this.missingDeltaTimeout);
                                        if (checkDeltaQueueHealth != HeapState.QueueHealth.HEALTHY) {
                                            switch (checkDeltaQueueHealth) {
                                                case QUEUE_TOO_LONG:
                                                    ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str2, "Queued up too many changes looking for next update for heapId = %s, terminating heap", new Object[]{l});
                                                    break;
                                                case WAITED_TOO_LONG:
                                                    ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str2, "Waited too long for next update for heapId = %s, terminating heap", new Object[]{l});
                                                    break;
                                                default:
                                                    ClientConnectedObjectManager.LOGGER.warn("Unrecognized health for queue: " + checkDeltaQueueHealth);
                                                    break;
                                            }
                                            ClientConnectedObjectManager.this.terminateSubscriptions(str2, l.longValue(), Subscription.CloseReason.INTERNAL_ERROR);
                                        }
                                    } else {
                                        ClientConnectedObjectManager.LOGGER.warn("Couldn't find heap state for heapId: " + l);
                                    }
                                }
                            }
                            ClientConnectedObjectManager.this.queueHealthCheckLock.unlock();
                        } catch (Throwable th3) {
                            ClientConnectedObjectManager.this.queueHealthCheckLock.unlock();
                            throw th3;
                        }
                    }
                } catch (InterruptedException e2) {
                } catch (Exception e3) {
                    ClientConnectedObjectManager.LOGGER.warn("Error processing update", e3);
                }
            }
        }

        public void stop() {
            this.running = false;
        }
    }

    ConcurrentHashMap<String, ConnectedHeaps> getHeapsByServer() {
        return this.heapsByServer;
    }

    BlockingDeque<String> getSessionsWithUpdates() {
        return this.sessionsWithUpdates;
    }

    public Lock getHeapSubMutationLock() {
        return this.heapSubMutationLock;
    }

    public void setNumProcessingThreads(int i) {
        this.numProcessingThreads = i;
    }

    public void setMaxInitialPopulationWait(long j) {
        this.maxInitialPopulationWait = j;
    }

    public void setPullerAwaitTimeout(long j) {
        this.pullerAwaitTimeout = j;
    }

    public void setMissingDeltaTimeout(long j) {
        this.missingDeltaTimeout = j;
    }

    public void setMaxDeltaQueue(int i) {
        this.maxDeltaQueue = i;
    }

    public void setObjectIOFactory(CougarObjectIOFactory cougarObjectIOFactory) {
        this.objectIOFactory = cougarObjectIOFactory;
    }

    public void setNewListenerConflater(Conflater conflater) {
        if (conflater != ConflaterFactory.NULL_CONFLATER) {
            this.newListenerConflater = conflater;
        } else {
            this.newListenerConflater = null;
        }
    }

    public void start() {
        for (int i = 0; i < this.numProcessingThreads; i++) {
            ConnectedObjectPuller connectedObjectPuller = new ConnectedObjectPuller();
            this.pullers.add(connectedObjectPuller);
            new Thread(connectedObjectPuller, "ConnectedObjectPuller-" + (i + 1)).start();
        }
    }

    public void stop() {
        Iterator<ConnectedObjectPuller> it = this.pullers.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }

    public void setNioLogger(NioLogger nioLogger) {
        this.nioLogger = nioLogger;
    }

    public ConnectedHeaps getHeapsForSession(IoSession ioSession) {
        return this.heapsByServer.get(NioUtils.getSessionId(ioSession));
    }

    public void handleSubscriptionResponse(final IoSession ioSession, InvocationResponse invocationResponse, final ExecutionObserver executionObserver) {
        this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Received a subscription response", new Object[0]);
        try {
            final NewHeapSubscription newHeapSubscription = (NewHeapSubscription) invocationResponse.getResult();
            this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Received a subscription response for heapId %s with subscriptionId %s", new Object[]{Long.valueOf(newHeapSubscription.getHeapId()), newHeapSubscription.getSubscriptionId()});
            String sessionId = NioUtils.getSessionId(ioSession);
            this.heapSubMutationLock.lock();
            try {
                ConnectedHeaps connectedHeaps = this.heapsByServer.get(sessionId);
                if (connectedHeaps == null) {
                    connectedHeaps = new ConnectedHeaps();
                    this.heapsByServer.put(sessionId, connectedHeaps);
                }
                boolean z = false;
                if (newHeapSubscription.getUri() != null) {
                    this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Received a new heap definition, heapId = %s, heapUrl = %s", new Object[]{Long.valueOf(newHeapSubscription.getHeapId()), newHeapSubscription.getUri()});
                    z = connectedHeaps.addHeap(newHeapSubscription.getHeapId(), newHeapSubscription.getUri());
                    if (!z) {
                        this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Received a new heap definition, heapId = %s, even though we know about the heap already!", new Object[]{Long.valueOf(newHeapSubscription.getHeapId())});
                    }
                }
                final boolean z2 = !z;
                final HeapState heapState = connectedHeaps.getHeapState(newHeapSubscription.getHeapId());
                if (heapState == null) {
                    this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Couldn't find heap definition, heapId = %s", new Object[]{Long.valueOf(newHeapSubscription.getHeapId())});
                    LOGGER.warn("Can't find the heap for this subscription result. Heap id = " + newHeapSubscription.getHeapId());
                    executionObserver.onResult(new ExecutionResult(new CougarClientException(ServerFaultCode.FrameworkError, "Can't find the heap for this subscription result. Heap id = " + newHeapSubscription.getHeapId())));
                } else {
                    if (!z2 || !heapState.haveSeenInitialUpdate()) {
                        final ConnectedHeaps connectedHeaps2 = connectedHeaps;
                        new Thread(new Runnable() { // from class: com.betfair.cougar.client.socket.ClientConnectedObjectManager.1
                            @Override // java.lang.Runnable
                            public void run() {
                                boolean z3 = false;
                                CountDownLatch initialPopulationLatch = connectedHeaps2.getInitialPopulationLatch(newHeapSubscription.getHeapId());
                                try {
                                    boolean z4 = false;
                                    try {
                                        if (initialPopulationLatch != null) {
                                            ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Waiting for initial heap population, heapUrl = %s", new Object[]{newHeapSubscription.getUri()});
                                            z4 = initialPopulationLatch.await(ClientConnectedObjectManager.this.maxInitialPopulationWait, TimeUnit.MILLISECONDS);
                                            connectedHeaps2.removeInitialPopulationLatch(newHeapSubscription.getHeapId());
                                        } else {
                                            ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Initial heap population, heapUrl = %s", new Object[]{newHeapSubscription.getUri()});
                                        }
                                        ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Returning heap to client, heapUrl = %s", new Object[]{newHeapSubscription.getUri()});
                                        if (z4) {
                                            executionObserver.onResult(new ExecutionResult(new ConnectedResponseImpl(heapState.getHeap(), heapState.addSubscription(ClientConnectedObjectManager.this, ioSession, newHeapSubscription.getHeapId(), newHeapSubscription.getSubscriptionId()))));
                                            z3 = true;
                                        }
                                        if (z3) {
                                            return;
                                        }
                                        ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Didn't get initial population message for heap, heapUrl = %s", new Object[]{newHeapSubscription.getUri()});
                                        if (!z2) {
                                            ClientConnectedObjectManager.this.terminateSubscriptions(ioSession, newHeapSubscription.getHeapId(), Subscription.CloseReason.INTERNAL_ERROR);
                                        }
                                        ClientConnectedObjectManager.LOGGER.warn("Didn't get initial population message for heap id = " + newHeapSubscription.getHeapId());
                                        executionObserver.onResult(new ExecutionResult(new CougarClientException(ServerFaultCode.FrameworkError, "Didn't get initial population message for heap id = " + newHeapSubscription.getHeapId())));
                                    } catch (InterruptedException e) {
                                        if (z3) {
                                            return;
                                        }
                                        ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Didn't get initial population message for heap, heapUrl = %s", new Object[]{newHeapSubscription.getUri()});
                                        if (!z2) {
                                            ClientConnectedObjectManager.this.terminateSubscriptions(ioSession, newHeapSubscription.getHeapId(), Subscription.CloseReason.INTERNAL_ERROR);
                                        }
                                        ClientConnectedObjectManager.LOGGER.warn("Didn't get initial population message for heap id = " + newHeapSubscription.getHeapId());
                                        executionObserver.onResult(new ExecutionResult(new CougarClientException(ServerFaultCode.FrameworkError, "Didn't get initial population message for heap id = " + newHeapSubscription.getHeapId())));
                                    } catch (RuntimeException e2) {
                                        ClientConnectedObjectManager.LOGGER.warn("Error processing initial heap population, treating as a failure", e2);
                                        if (z3) {
                                            return;
                                        }
                                        ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Didn't get initial population message for heap, heapUrl = %s", new Object[]{newHeapSubscription.getUri()});
                                        if (!z2) {
                                            ClientConnectedObjectManager.this.terminateSubscriptions(ioSession, newHeapSubscription.getHeapId(), Subscription.CloseReason.INTERNAL_ERROR);
                                        }
                                        ClientConnectedObjectManager.LOGGER.warn("Didn't get initial population message for heap id = " + newHeapSubscription.getHeapId());
                                        executionObserver.onResult(new ExecutionResult(new CougarClientException(ServerFaultCode.FrameworkError, "Didn't get initial population message for heap id = " + newHeapSubscription.getHeapId())));
                                    }
                                } catch (Throwable th) {
                                    if (!z3) {
                                        ClientConnectedObjectManager.this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Didn't get initial population message for heap, heapUrl = %s", new Object[]{newHeapSubscription.getUri()});
                                        if (!z2) {
                                            ClientConnectedObjectManager.this.terminateSubscriptions(ioSession, newHeapSubscription.getHeapId(), Subscription.CloseReason.INTERNAL_ERROR);
                                        }
                                        ClientConnectedObjectManager.LOGGER.warn("Didn't get initial population message for heap id = " + newHeapSubscription.getHeapId());
                                        executionObserver.onResult(new ExecutionResult(new CougarClientException(ServerFaultCode.FrameworkError, "Didn't get initial population message for heap id = " + newHeapSubscription.getHeapId())));
                                    }
                                    throw th;
                                }
                            }
                        }, "SubscriptionResponseHandler-InitialPopulation-" + initialPopulationThreadIdSource.incrementAndGet() + "-" + heapState.getHeapUri()).start();
                        return;
                    }
                    Subscription addSubscription = heapState.addSubscription(this, ioSession, newHeapSubscription.getHeapId(), newHeapSubscription.getSubscriptionId());
                    if (addSubscription != null) {
                        executionObserver.onResult(new ExecutionResult(new ConnectedResponseImpl(heapState.getHeap(), addSubscription)));
                        return;
                    }
                    this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Duplicate subscription returned by the server, id = %s - closing session", new Object[]{newHeapSubscription.getSubscriptionId()});
                    LOGGER.warn("Duplicate subscription returned by the server, id = " + newHeapSubscription.getSubscriptionId() + " - closing session");
                    executionObserver.onResult(new ExecutionResult(new CougarClientException(ServerFaultCode.FrameworkError, "Duplicate subscription returned by the server, id = " + newHeapSubscription.getSubscriptionId())));
                    ioSession.close();
                }
            } finally {
                this.heapSubMutationLock.unlock();
            }
        } catch (Exception e) {
            LOGGER.warn("Error unpacking subscription result", e);
            executionObserver.onResult(new ExecutionResult(new CougarClientException(ServerFaultCode.FrameworkError, "Error unpacking subscription result", e)));
        }
    }

    public void sessionTerminated(IoSession ioSession) {
        terminateAllSubscriptions(ioSession, Subscription.CloseReason.CONNECTION_CLOSED);
    }

    public void applyDelta(IoSession ioSession, HeapDelta heapDelta) {
        this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Applying update for heap, heapId = %s, updateId = %s", new Object[]{Long.valueOf(heapDelta.getHeapId()), Long.valueOf(heapDelta.getUpdateId())});
        ConnectedHeaps connectedHeaps = this.heapsByServer.get(NioUtils.getSessionId(ioSession));
        if (connectedHeaps == null) {
            this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Have no heaps registered for this client, address = %s", new Object[]{ioSession.getRemoteAddress().toString()});
            LOGGER.warn("Received a connected object update, yet have no record of any subscriptions. {address={},heapId={},updateId={2}}", new Object[]{ioSession.getRemoteAddress().toString(), Long.valueOf(heapDelta.getHeapId()), Long.valueOf(heapDelta.getUpdateId())});
            return;
        }
        HeapState heapState = connectedHeaps.getHeapState(heapDelta.getHeapId());
        if (heapState == null) {
            this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Can't find this heap for this client, address = %s, heapId = %s", new Object[]{ioSession.getRemoteAddress().toString(), Long.valueOf(heapDelta.getHeapId())});
            LOGGER.warn("Received a connected object update, yet have no record of a subscription for this heap. {address={},heapId={},updateId={2}}", new Object[]{ioSession.getRemoteAddress().toString(), Long.valueOf(heapDelta.getHeapId()), Long.valueOf(heapDelta.getUpdateId())});
            return;
        }
        if (!heapDelta.getUpdates().isEmpty() && (heapDelta.getUpdates().get(0) instanceof InitialUpdate)) {
            this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Queueing initial update to local heap, heapUri = %s", new Object[]{heapState.getHeapUri()});
        } else {
            this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Queueing patch to local heap, heapUri = %s", new Object[]{heapState.getHeapUri()});
        }
        heapState.queueUpdate(heapDelta);
        connectedHeaps.queueUpdatedHeap(heapDelta.getHeapId());
        this.sessionsWithUpdates.add(NioUtils.getSessionId(ioSession));
    }

    public void terminateSubscription(IoSession ioSession, TerminateSubscription terminateSubscription) {
        Subscription.CloseReason closeReason = Subscription.CloseReason.REQUESTED_BY_PUBLISHER;
        try {
            closeReason = Subscription.CloseReason.valueOf(terminateSubscription.getCloseReason());
        } catch (IllegalArgumentException e) {
        }
        terminateSubscription(ioSession, terminateSubscription.getHeapId(), terminateSubscription.getSubscriptionId(), closeReason);
    }

    public void terminateSubscription(IoSession ioSession, long j, String str, Subscription.CloseReason closeReason) {
        HeapState heapState;
        this.heapSubMutationLock.lock();
        try {
            String sessionId = NioUtils.getSessionId(ioSession);
            ConnectedHeaps connectedHeaps = this.heapsByServer.get(NioUtils.getSessionId(ioSession));
            if (connectedHeaps != null && (heapState = connectedHeaps.getHeapState(j)) != null) {
                this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, sessionId, "Subscription termination received for subscription %s with reason %s", new Object[]{str, closeReason});
                if (closeReason == Subscription.CloseReason.REQUESTED_BY_SUBSCRIBER || closeReason == Subscription.CloseReason.REQUESTED_BY_SUBSCRIBER_ADMINISTRATOR) {
                    try {
                        this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, ioSession, "Notifying server that client wants to terminate subscription %s", new Object[]{str});
                        NioUtils.writeEventMessageToSession(ioSession, new TerminateSubscription(j, str, closeReason.name()), this.objectIOFactory);
                    } catch (Exception e) {
                        this.nioLogger.log(NioLogger.LoggingLevel.SESSION, ioSession, "Error occurred whilst trying to inform server of subscription termination, closing session", new Object[0]);
                        LOGGER.info("Error occurred whilst trying to inform server of subscription termination, closing session", e);
                        ioSession.close();
                    }
                }
                heapState.terminateSubscription(str, closeReason);
                this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, sessionId, "Subscription terminated for heapId = %s and subscriptionId = %s", new Object[]{Long.valueOf(j), str});
                if (heapState.getSubscriptions().isEmpty()) {
                    terminateSubscriptions(sessionId, j, Subscription.CloseReason.INTERNAL_ERROR);
                }
            }
        } finally {
            this.heapSubMutationLock.unlock();
        }
    }

    public void terminateSubscriptions(IoSession ioSession, long j, Subscription.CloseReason closeReason) {
        terminateSubscriptions(NioUtils.getSessionId(ioSession), j, closeReason);
    }

    public void terminateSubscriptions(String str, long j, Subscription.CloseReason closeReason) {
        this.heapSubMutationLock.lock();
        try {
            ConnectedHeaps connectedHeaps = this.heapsByServer.get(str);
            if (connectedHeaps != null) {
                connectedHeaps.terminateHeap(j, closeReason);
                this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str, "Subscriptions terminated for heapId = %s", new Object[]{Long.valueOf(j)});
                if (connectedHeaps.isEmpty()) {
                    this.heapsByServer.remove(str);
                    this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, str, "All subscriptions terminated", new Object[0]);
                }
            }
        } finally {
            this.heapSubMutationLock.unlock();
        }
    }

    public void terminateAllSubscriptions(IoSession ioSession, Subscription.CloseReason closeReason) {
        this.heapSubMutationLock.lock();
        try {
            String sessionId = NioUtils.getSessionId(ioSession);
            ConnectedHeaps connectedHeaps = this.heapsByServer.get(sessionId);
            if (connectedHeaps != null) {
                connectedHeaps.terminateAllHeaps(closeReason);
                this.heapsByServer.remove(sessionId);
                if (this.nioLogger.isLogging(NioLogger.LoggingLevel.TRANSPORT)) {
                    this.nioLogger.log(NioLogger.LoggingLevel.TRANSPORT, sessionId, "All subscriptions terminated", new Object[0]);
                }
            }
        } finally {
            this.heapSubMutationLock.unlock();
        }
    }
}
