package org.cacheonix.impl.net.cluster;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.cacheonix.CacheonixException;
import org.cacheonix.ShutdownException;
import org.cacheonix.ShutdownMode;
import org.cacheonix.impl.cache.distributed.partitioned.CacheProcessor;
import org.cacheonix.impl.cache.distributed.partitioned.ShutdownCacheProcessorMessage;
import org.cacheonix.impl.clock.Clock;
import org.cacheonix.impl.net.ClusterNodeAddress;
import org.cacheonix.impl.net.multicast.sender.MulticastSender;
import org.cacheonix.impl.net.processor.AbstractRequestProcessor;
import org.cacheonix.impl.net.processor.Command;
import org.cacheonix.impl.net.processor.Frame;
import org.cacheonix.impl.net.processor.Message;
import org.cacheonix.impl.net.processor.Prepareable;
import org.cacheonix.impl.net.processor.Request;
import org.cacheonix.impl.net.processor.Router;
import org.cacheonix.impl.net.processor.SenderInetAddressAware;
import org.cacheonix.impl.net.processor.UUID;
import org.cacheonix.impl.net.serializer.SerializerFactory;
import org.cacheonix.impl.util.Assert;
import org.cacheonix.impl.util.exception.ExceptionUtils;
import org.cacheonix.impl.util.logging.Logger;
import org.cacheonix.impl.util.thread.ActionableTimeout;
import org.cacheonix.impl.util.time.Timeout;

/* loaded from: input_file:org/cacheonix/impl/net/cluster/ClusterProcessorImpl.class */
public final class ClusterProcessorImpl extends AbstractRequestProcessor implements ClusterProcessor {
    private static final Logger LOG = Logger.getLogger(ClusterProcessorImpl.class);
    private final long gracefulShutdownTimeoutMillis;
    private final Queue<DeliveryNotificationEntry> awaitingDeliveryNotification;
    private final Queue<Frame> receivedFrames;
    private final MessageAssembler messageAssembler;
    private final ActionableTimeout markerTimeout;
    private final AtomicReference<ActionableTimeout> leaveTimeout;
    private final MulticastSender multicastSender;
    private final ClusterAnnouncer clusterAnnouncer;
    private final MulticastMessageListenerList multicastMessageListeners;
    private final AtomicReference<CountDownLatch> shutdownLatch;
    private final PayloadPartitioner partitioner;
    private CacheonixException shutdownCause;
    private final Map<String, CacheProcessor> cacheProcessors;
    private final ClusterProcessorState processorState;

    public ClusterProcessorImpl(String str, Clock clock, Timer timer, Router router, MulticastSender multicastSender, ClusterNodeAddress clusterNodeAddress, long j, long j2, long j3, long j4, long j5, UUID uuid) {
        super(clock, timer, "ClusterProcessor:" + clusterNodeAddress.getTcpPort(), clusterNodeAddress, router);
        this.awaitingDeliveryNotification = new ConcurrentLinkedQueue();
        this.receivedFrames = new ConcurrentLinkedQueue();
        this.messageAssembler = new MessageAssemblerImpl();
        this.leaveTimeout = new AtomicReference<>(null);
        this.multicastMessageListeners = new MulticastMessageListenerListImpl();
        this.shutdownLatch = new AtomicReference<>(null);
        this.partitioner = new PayloadPartitioner();
        this.cacheProcessors = new ConcurrentHashMap(11);
        this.processorState = new ClusterProcessorStateImpl();
        this.processorState.setClusterName(str);
        this.processorState.setAddress(clusterNodeAddress);
        long adjustClusterSurveyTimeout = adjustClusterSurveyTimeout(j4, j5);
        this.processorState.setHomeAloneTimeout(new Timeout(adjustHomeAloneTimeout(j, adjustClusterSurveyTimeout)));
        this.processorState.setClusterAnnouncementTimeoutMillis(j5);
        this.gracefulShutdownTimeoutMillis = j3;
        this.processorState.setWorstCaseLatencyMillis(j2);
        this.processorState.setClusterView(new ClusterViewImpl(uuid, clusterNodeAddress));
        this.markerTimeout = new ObtainMulticastMarkerTimeout(this);
        this.multicastSender = multicastSender;
        this.clusterAnnouncer = new ClusterAnnouncer(clock, this.multicastSender, str, clusterNodeAddress);
        this.processorState.setJoinStatus(new JoinStatusImpl(adjustClusterSurveyTimeout));
        this.processorState.getHomeAloneTimeout().reset();
        reset();
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public void announceCluster(boolean z) throws IOException {
        this.clusterAnnouncer.announce(this.processorState.getClusterView().getClusterUUID(), this.processorState.getClusterView().getRepresentative(), this.processorState.getClusterView().getSize(), z);
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public void sendMulticastFrame(Frame frame) throws IOException {
        this.multicastSender.sendFrame(frame);
    }

    @Override // org.cacheonix.impl.net.processor.AbstractRequestProcessor, org.cacheonix.impl.net.processor.RequestProcessor
    public void processMessage(Message message) throws InterruptedException, IOException {
        processMessage(message, true);
    }

    private void processMessage(Message message, boolean z) throws InterruptedException, IOException {
        if (!z || !message.isRequiresSameCluster() || this.processorState.getClusterView().getClusterUUID().equals(message.getClusterUUID())) {
            super.processMessage(message);
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received message from other cluster: " + message);
        }
        if (message instanceof Request) {
            Request request = (Request) message;
            if (request.isResponseRequired()) {
                post(request.createResponse(4));
            }
        }
    }

    @Override // org.cacheonix.impl.net.processor.AbstractProcessor, org.cacheonix.impl.net.processor.Processor
    public void startup() {
        super.startup();
        Assert.assertTrue(this.processorState.getClusterView().getSize() == 1, "Initial cluster view can only be of size 1: {0}", this.processorState.getClusterView());
        this.processorState.setState(3);
        this.multicastMessageListeners.notifyNodeBlocked();
        BlockedMarker blockedMarker = new BlockedMarker(this.processorState.getClusterView().getClusterUUID());
        blockedMarker.setTargetMajorityClusterSize(this.processorState.getTargetMajoritySize());
        blockedMarker.setNextAnnouncementTime(getClock().currentTime());
        blockedMarker.setReceiver(getAddress());
        post(blockedMarker);
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public Queue<Frame> getReceivedFrames() {
        return this.receivedFrames;
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public void subscribeMulticastMessageListener(MulticastMessageListener multicastMessageListener) {
        this.multicastMessageListeners.add(multicastMessageListener);
    }

    @Override // org.cacheonix.impl.net.processor.AbstractRequestProcessor, org.cacheonix.impl.net.processor.AbstractProcessor, org.cacheonix.impl.net.processor.Processor, org.cacheonix.impl.util.Shutdownable
    public void shutdown() {
        shutdown(ShutdownMode.GRACEFUL_SHUTDOWN);
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public void shutdown(ShutdownMode shutdownMode) {
        if (shutdownMode.equals(ShutdownMode.FORCED_SHUTDOWN)) {
            beginForcedShutdown();
        } else {
            beginGracefulShutdown();
        }
        waitForShutdown(0L);
    }

    private void beginGracefulShutdown() {
        boolean z;
        Assert.assertTrue(!isProcessorThread(), "This method cannot be called from the processor thread");
        if (isShutdown()) {
            return;
        }
        try {
            GetClusterViewSizeRequest getClusterViewSizeRequest = new GetClusterViewSizeRequest();
            getClusterViewSizeRequest.setReceiver(getAddress());
            z = ((Integer) execute(getClusterViewSizeRequest)).intValue() > 1;
        } catch (Exception e) {
            z = false;
        }
        beginCacheProcessorsShutdown(z);
        waitForCacheProcessorsToShutdown();
        if (this.shutdownLatch.compareAndSet(null, new CountDownLatch(1)) && this.leaveTimeout.compareAndSet(null, new LeaveTimeout(this))) {
            this.leaveTimeout.get().reset();
        }
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public final void beginForcedShutdown() {
        try {
            if (isShutdown()) {
                return;
            }
            enqueue(new ShutdownClusterProcessorCommand(this));
        } catch (InterruptedException e) {
            ExceptionUtils.ignoreException(e, "already shutdown");
        } catch (ShutdownException e2) {
            ExceptionUtils.ignoreException(e2, "already shutdown");
        } catch (RuntimeException e3) {
            LOG.warn("Unexpected exception while processing leave timeout: " + e3, e3);
        }
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public void forceShutdown(CacheonixException cacheonixException) {
        Assert.assertTrue(isProcessorThread(), "This method may be called only from the processor thread");
        if (isShutdown()) {
            return;
        }
        this.shutdownCause = cacheonixException;
        beginCacheProcessorsShutdown(false);
        waitForCacheProcessorsToShutdown();
        this.leaveTimeout.compareAndSet(null, new LeaveTimeout(this));
        this.leaveTimeout.get().shutdown();
        this.markerTimeout.shutdown();
        super.shutdown();
        this.shutdownLatch.compareAndSet(null, new CountDownLatch(1));
        this.shutdownLatch.get().countDown();
    }

    private void beginCacheProcessorsShutdown(boolean z) {
        Iterator<Map.Entry<String, CacheProcessor>> it = this.cacheProcessors.entrySet().iterator();
        while (it.hasNext()) {
            try {
                CacheProcessor value = it.next().getValue();
                String cacheName = value.getCacheName();
                ShutdownCacheProcessorMessage shutdownCacheProcessorMessage = new ShutdownCacheProcessorMessage(cacheName, z);
                shutdownCacheProcessorMessage.setReceiver(getAddress());
                value.post(shutdownCacheProcessorMessage);
                LOG.debug("Requested cache processor: " + cacheName + ':' + getAddress() + " to shutdown");
            } catch (Exception e) {
                ExceptionUtils.ignoreException(e, "Shutdown in progress");
            }
        }
    }

    private void waitForCacheProcessorsToShutdown() {
        Iterator<Map.Entry<String, CacheProcessor>> it = this.cacheProcessors.entrySet().iterator();
        while (it.hasNext()) {
            CacheProcessor value = it.next().getValue();
            LOG.debug("Cache processor: " + value.getCacheName() + ':' + getAddress() + ' ' + (value.waitForShutdown(this.gracefulShutdownTimeoutMillis) ? "shutdown" : "didn't shutdown") + " before timeout " + this.gracefulShutdownTimeoutMillis + " ms expired ");
        }
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public MessageAssembler getMessageAssembler() {
        return this.messageAssembler;
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public void reset() {
        this.processorState.setCurrent(null);
        this.processorState.getReceivedList().clear();
        this.messageAssembler.clear();
        this.processorState.getSubmittalQueue().clear();
        this.processorState.setHighestSequenceNumberDelivered(null);
        this.awaitingDeliveryNotification.clear();
        getWaiterList().notifyReset();
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public boolean isShuttingDown() {
        return this.shutdownLatch.get() != null;
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public void cancelMarkerTimeout() {
        this.markerTimeout.cancel();
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public void resetMarkerTimeout() {
        this.markerTimeout.reset();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.cacheonix.impl.net.multicast.server.MulticastServerListener
    public final void receiveFrame(Frame frame) {
        if (frame.getSequenceNumber() >= 0) {
            int state = this.processorState.getState();
            if ((state == 1 || state == 4) && frame.getClusterUUID().equals(this.processorState.getClusterView().getClusterUUID())) {
                this.receivedFrames.add(frame);
                return;
            }
            return;
        }
        try {
            Message message = (Message) SerializerFactory.getInstance().getSerializer(frame.getSerializerType()).deserialize(frame.getPayload());
            if (message instanceof SenderInetAddressAware) {
                ((SenderInetAddressAware) message).setSenderInetAddress(frame.getSenderInetAddress());
            }
            if ((message instanceof ClusterAnnouncement) && !isShutdown()) {
                getClock().adjust(message.getTimestamp());
                enqueue(message);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (Exception e2) {
            LOG.error(e2.toString(), e2);
        }
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public void deliverAssembledMulticastMessages() throws IOException {
        AssembledMessage poll = this.messageAssembler.poll();
        while (true) {
            AssembledMessage assembledMessage = poll;
            if (assembledMessage == null) {
                return;
            }
            Message message = assembledMessage.getMessage();
            getClock().adjust(message.getTimestamp());
            if ((message instanceof Request) && message.getSender().equals(getAddress())) {
                Request request = (Request) message;
                Iterator<DeliveryNotificationEntry> it = this.awaitingDeliveryNotification.iterator();
                while (true) {
                    if (it.hasNext()) {
                        DeliveryNotificationEntry next = it.next();
                        if (next.getRequest().getUuid().equals(request.getUuid())) {
                            next.setStartFrameNumber(assembledMessage.getStartFrameNumber());
                        }
                    }
                }
            }
            switch (message.getDestination()) {
                case 9:
                    try {
                        processMessage(message, false);
                        break;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                case 10:
                    this.multicastMessageListeners.notify(message);
                    break;
                default:
                    LOG.warn("Unknown multicast message destination: " + message.getDestination());
                    break;
            }
            poll = this.messageAssembler.poll();
        }
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public void notifyDeliveredToAll(long j) {
        Iterator<DeliveryNotificationEntry> it = this.awaitingDeliveryNotification.iterator();
        while (it.hasNext()) {
            DeliveryNotificationEntry next = it.next();
            if (next.hasStartFrameNumber() && next.getStartFrameNumber() <= j) {
                it.remove();
                ((DeliveryAware) next.getRequest().getWaiter()).notifyDelivered();
            }
        }
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public MulticastMessageListenerList getMulticastMessageListeners() {
        return this.multicastMessageListeners;
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public void registerCacheProcessor(CacheProcessor cacheProcessor) {
        this.cacheProcessors.put(cacheProcessor.getCacheName(), cacheProcessor);
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public CacheProcessor unregisterCacheProcessor(String str) {
        return this.cacheProcessors.remove(str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.cacheonix.impl.net.processor.SimpleProcessor, org.cacheonix.impl.net.processor.Processor
    public void enqueue(Command command) throws InterruptedException, ShutdownException {
        if (isShutdown()) {
            if (this.shutdownCause == null) {
                throw new ShutdownException();
            }
            throw this.shutdownCause;
        }
        if (!(command instanceof Message)) {
            super.enqueue(command);
            return;
        }
        Message message = (Message) command;
        switch (message.getDestination()) {
            case 9:
            case 10:
                message.setTimestamp(getClock().currentTime());
                if (!(message instanceof Prepareable)) {
                    addToSubmittalQueue(message);
                    return;
                } else if (((Prepareable) message).isPrepared()) {
                    addToSubmittalQueue(message);
                    return;
                } else {
                    super.enqueue(message);
                    return;
                }
            default:
                super.enqueue(message);
                return;
        }
    }

    private void addToSubmittalQueue(Message message) {
        if ((message instanceof Request) && (((Request) message).getWaiter() instanceof DeliveryAware)) {
            this.awaitingDeliveryNotification.add(new DeliveryNotificationEntry((Request) message));
        }
        this.processorState.getSubmittalQueue().add(this.partitioner.partition(message));
    }

    private static long adjustClusterSurveyTimeout(long j, long j2) {
        return j < j2 * 4 ? j2 * 4 : j;
    }

    private static long adjustHomeAloneTimeout(long j, long j2) {
        return j < j2 * 3 ? j2 * 3 : j;
    }

    @Override // org.cacheonix.impl.net.cluster.ClusterProcessor
    public ClusterProcessorState getProcessorState() {
        return this.processorState;
    }
}
