package org.restcomm.imscf.common.lwcomm.service.impl;

import com.google.common.cache.Cache;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.naming.InitialContext;
import org.restcomm.imscf.common.lwcomm.config.Configuration;
import org.restcomm.imscf.common.lwcomm.config.Node;
import org.restcomm.imscf.common.lwcomm.service.LwCommService;
import org.restcomm.imscf.common.lwcomm.service.MessageReceiver;
import org.restcomm.imscf.common.lwcomm.service.messages.LwCommMessage;
import org.restcomm.imscf.common.lwcomm.service.messages.MessageSender;
import org.restcomm.imscf.common.lwcomm.service.messages.OutgoingMessage;
import org.restcomm.imscf.common.util.overload.OverloadProtectorParameters;
import org.slf4j.MDC;

@ChannelHandler.Sharable
/* loaded from: input_file:org/restcomm/imscf/common/lwcomm/service/impl/LwCommListenerHandler.class */
public final class LwCommListenerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
    private static final Object MESSAGE_SENDER_STORE_ACCEPTED_VALUE = new Object();
    private static final Object MESSAGE_SENDER_STORE_REJECTED_VALUE = new Object();
    private QueueConnection queueConn;
    private Configuration.DeploymentMode deploymentMode;
    private Configuration.ReceiveMode receiveMode;
    private Configuration.AckSendStrategy ackSendStrategy;
    private volatile MessageReceiver messageReceiver;
    private volatile Cache<String, Object> processedMessageStore;
    private volatile Cache<String, Set<Node>> receivedAckStore;
    private OrderedExecutor messageDeliveryExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.restcomm.imscf.common.lwcomm.service.impl.LwCommListenerHandler$1, reason: invalid class name */
    /* loaded from: input_file:org/restcomm/imscf/common/lwcomm/service/impl/LwCommListenerHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$restcomm$imscf$common$lwcomm$service$messages$LwCommMessage$Type;
        static final /* synthetic */ int[] $SwitchMap$org$restcomm$imscf$common$lwcomm$service$LwCommService$AcceptMode;

        static {
            try {
                $SwitchMap$org$restcomm$imscf$common$lwcomm$config$Configuration$ReceiveMode[Configuration.ReceiveMode.JMS_QUEUE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$restcomm$imscf$common$lwcomm$config$Configuration$ReceiveMode[Configuration.ReceiveMode.LISTENER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$restcomm$imscf$common$lwcomm$config$Configuration$AckSendStrategy = new int[Configuration.AckSendStrategy.values().length];
            try {
                $SwitchMap$org$restcomm$imscf$common$lwcomm$config$Configuration$AckSendStrategy[Configuration.AckSendStrategy.IMMEDIATELY.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$restcomm$imscf$common$lwcomm$config$Configuration$AckSendStrategy[Configuration.AckSendStrategy.SEND_CYCLE.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$restcomm$imscf$common$lwcomm$service$LwCommService$AcceptMode = new int[LwCommService.AcceptMode.values().length];
            try {
                $SwitchMap$org$restcomm$imscf$common$lwcomm$service$LwCommService$AcceptMode[LwCommService.AcceptMode.ACCEPT.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$restcomm$imscf$common$lwcomm$service$LwCommService$AcceptMode[LwCommService.AcceptMode.REJECT.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$restcomm$imscf$common$lwcomm$service$LwCommService$AcceptMode[LwCommService.AcceptMode.DROP.ordinal()] = 3;
            } catch (NoSuchFieldError e7) {
            }
            $SwitchMap$org$restcomm$imscf$common$lwcomm$service$messages$LwCommMessage$Type = new int[LwCommMessage.Type.values().length];
            try {
                $SwitchMap$org$restcomm$imscf$common$lwcomm$service$messages$LwCommMessage$Type[LwCommMessage.Type.NORMAL.ordinal()] = 1;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$restcomm$imscf$common$lwcomm$service$messages$LwCommMessage$Type[LwCommMessage.Type.ACK.ordinal()] = 2;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$restcomm$imscf$common$lwcomm$service$messages$LwCommMessage$Type[LwCommMessage.Type.NACK.ordinal()] = 3;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$restcomm$imscf$common$lwcomm$service$messages$LwCommMessage$Type[LwCommMessage.Type.HEARTBEAT.ordinal()] = 4;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$restcomm$imscf$common$lwcomm$service$messages$LwCommMessage$Type[LwCommMessage.Type.INVALID.ordinal()] = 5;
            } catch (NoSuchFieldError e12) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/restcomm/imscf/common/lwcomm/service/impl/LwCommListenerHandler$DeliverMessageHandler.class */
    public class DeliverMessageHandler implements Runnable {
        private LwCommMessage message;

        public DeliverMessageHandler(LwCommMessage lwCommMessage) {
            this.message = lwCommMessage;
        }

        @Override // java.lang.Runnable
        public void run() {
            MDC.put(LwCommUtil.LOGGER_MDC_MSGID_KEY, this.message.getId());
            deliverMessage(this.message);
        }

        /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
        /* JADX WARN: Failed to find 'out' block for switch in B:4:0x001b. Please report as an issue. */
        private void deliverMessage(LwCommMessage lwCommMessage) {
            long nanoTime = System.nanoTime();
            MDC.put(LwCommUtil.LOGGER_MDC_MSGID_KEY, lwCommMessage.getId());
            try {
                switch (LwCommListenerHandler.this.receiveMode) {
                    case JMS_QUEUE:
                        writeQueue(lwCommMessage.getTargetQueue(), lwCommMessage.getPayload());
                        long nanoTime2 = (System.nanoTime() - nanoTime) / 1000;
                        LwCommServiceImpl.LOGGER.trace("deliverMessage() took {}us", Long.valueOf(nanoTime2));
                        LwCommServiceImpl.getServiceImpl().getStatistics().timeWorker(nanoTime2);
                        return;
                    case LISTENER:
                        try {
                            LwCommListenerHandler.this.messageReceiver.onMessage(lwCommMessage.asIncomingTextMessage());
                        } catch (Exception e) {
                            LwCommServiceImpl.LOGGER.error("Error in group id locking or message receiver threw exception.", e);
                        }
                        long nanoTime22 = (System.nanoTime() - nanoTime) / 1000;
                        LwCommServiceImpl.LOGGER.trace("deliverMessage() took {}us", Long.valueOf(nanoTime22));
                        LwCommServiceImpl.getServiceImpl().getStatistics().timeWorker(nanoTime22);
                        return;
                    default:
                        long nanoTime222 = (System.nanoTime() - nanoTime) / 1000;
                        LwCommServiceImpl.LOGGER.trace("deliverMessage() took {}us", Long.valueOf(nanoTime222));
                        LwCommServiceImpl.getServiceImpl().getStatistics().timeWorker(nanoTime222);
                        return;
                }
            } catch (Throwable th) {
                long nanoTime3 = (System.nanoTime() - nanoTime) / 1000;
                LwCommServiceImpl.LOGGER.trace("deliverMessage() took {}us", Long.valueOf(nanoTime3));
                LwCommServiceImpl.getServiceImpl().getStatistics().timeWorker(nanoTime3);
                throw th;
            }
        }

        private void writeQueue(String str, String str2) {
            if (LwCommListenerHandler.this.deploymentMode == Configuration.DeploymentMode.STANDALONE) {
                return;
            }
            if (LwCommListenerHandler.this.queueConn == null) {
                LwCommServiceImpl.LOGGER.error("Message will not be sent to queue {} because queue connection could not be resolved at service init.", str);
                return;
            }
            QueueSession queueSession = null;
            QueueSender queueSender = null;
            try {
                try {
                    Queue queue = (Queue) new InitialContext().lookup(str);
                    queueSession = LwCommListenerHandler.this.queueConn.createQueueSession(false, 1);
                    queueSender = queueSession.createSender(queue);
                    queueSender.send(queueSession.createTextMessage(str2));
                    LwCommServiceImpl.getServiceImpl().getStatistics().incQueuedIncomingMessageCount();
                    if (queueSender != null) {
                        try {
                            queueSender.close();
                        } catch (Exception e) {
                            LwCommServiceImpl.LOGGER.warn("Error closing queue sender", e);
                        }
                    }
                    if (queueSession != null) {
                        try {
                            queueSession.close();
                        } catch (Exception e2) {
                            LwCommServiceImpl.LOGGER.warn("Error closing queue session", e2);
                        }
                    }
                } catch (Exception e3) {
                    LwCommServiceImpl.LOGGER.error("Error while putting message into queue", e3);
                    if (queueSender != null) {
                        try {
                            queueSender.close();
                        } catch (Exception e4) {
                            LwCommServiceImpl.LOGGER.warn("Error closing queue sender", e4);
                        }
                    }
                    if (queueSession != null) {
                        try {
                            queueSession.close();
                        } catch (Exception e5) {
                            LwCommServiceImpl.LOGGER.warn("Error closing queue session", e5);
                        }
                    }
                }
            } catch (Throwable th) {
                if (queueSender != null) {
                    try {
                        queueSender.close();
                    } catch (Exception e6) {
                        LwCommServiceImpl.LOGGER.warn("Error closing queue sender", e6);
                    }
                }
                if (queueSession != null) {
                    try {
                        queueSession.close();
                    } catch (Exception e7) {
                        LwCommServiceImpl.LOGGER.warn("Error closing queue session", e7);
                    }
                }
                throw th;
            }
        }
    }

    public LwCommListenerHandler(QueueConnection queueConnection, Configuration configuration, Cache<String, Object> cache, Cache<String, Set<Node>> cache2) {
        this.queueConn = queueConnection;
        this.deploymentMode = configuration.getDeploymentMode();
        this.receiveMode = configuration.getReceiveMode();
        this.messageReceiver = configuration.getMessageReceiver();
        this.ackSendStrategy = configuration.getAckSendStrategy();
        this.processedMessageStore = cache;
        this.receivedAckStore = cache2;
        this.messageDeliveryExecutor = new OrderedExecutor(Executors.newFixedThreadPool(configuration.getReceiveWorkerPoolConfig().getMaxThreads(), new NamingThreadFactory("lwcomm_receive_worker")));
    }

    public void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception {
        long nanoTime = System.nanoTime();
        try {
            MDC.put(LwCommUtil.LOGGER_MDC_MSGID_KEY, LwCommUtil.LOGGER_MDC_UNKNOWN_MSGID);
            String byteBuf = ((ByteBuf) datagramPacket.content()).toString(CharsetUtil.UTF_8);
            LwCommServiceImpl.LOGGER.trace("Message got from {}, content:\n{}", datagramPacket.sender(), byteBuf);
            LwCommMessage lwCommMessage = new LwCommMessage(byteBuf);
            MDC.put(LwCommUtil.LOGGER_MDC_MSGID_KEY, lwCommMessage.getId());
            LwCommServiceImpl.LOGGER.debug("LwCommHandler.channelRead0 - thread: {}", Thread.currentThread());
            switch (AnonymousClass1.$SwitchMap$org$restcomm$imscf$common$lwcomm$service$messages$LwCommMessage$Type[lwCommMessage.getType().ordinal()]) {
                case 1:
                    handleNormalMessage(lwCommMessage, byteBuf, channelHandlerContext);
                    break;
                case 2:
                    handleAck(lwCommMessage, byteBuf);
                    break;
                case OverloadProtectorParameters.DEFAULT_DATA_COLLECTION_PERIOD_SEC /* 3 */:
                    handleNack(lwCommMessage);
                    break;
                case 4:
                    LwCommServiceImpl.LOGGER.debug("heartbeat from {}", lwCommMessage.getFrom());
                    LwCommServiceImpl.getServiceImpl().getNodeCatalog().heartbeatFromNode(lwCommMessage.getFrom());
                    LwCommServiceImpl.getServiceImpl().getStatistics().incReceivedHeartbeatCount();
                    break;
                case 5:
                    LwCommServiceImpl.LOGGER.warn("invalid message: {}", byteBuf);
                    LwCommServiceImpl.getServiceImpl().getStatistics().incInvalidMessageCount();
                    break;
                default:
                    LwCommServiceImpl.LOGGER.error("Unexpected message type: {}", lwCommMessage.getType());
                    break;
            }
            long nanoTime2 = (System.nanoTime() - nanoTime) / 1000;
            LwCommServiceImpl.getServiceImpl().getStatistics().timeSpentInChannelRead0(nanoTime2);
            LwCommServiceImpl.LOGGER.debug("Receive handler took: {}us.", Long.valueOf(nanoTime2));
        } catch (Throwable th) {
            long nanoTime3 = (System.nanoTime() - nanoTime) / 1000;
            LwCommServiceImpl.getServiceImpl().getStatistics().timeSpentInChannelRead0(nanoTime3);
            LwCommServiceImpl.LOGGER.debug("Receive handler took: {}us.", Long.valueOf(nanoTime3));
            throw th;
        }
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.flush();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        LwCommServiceImpl.LOGGER.error("Exception in LwComm receiver handler", th);
    }

    public void shutdown() {
        this.messageDeliveryExecutor.shutdown();
    }

    private void handleNormalMessage(LwCommMessage lwCommMessage, String str, ChannelHandlerContext channelHandlerContext) {
        Object obj;
        if (lwCommMessage.getPayloadBytes() != lwCommMessage.getCalculatedPayloadBytes()) {
            LwCommServiceImpl.LOGGER.error("Payload size mismatch in message id {}. In header: {}, actual: {}, raw text received is:\n{}", new Object[]{lwCommMessage.getId(), Integer.valueOf(lwCommMessage.getPayloadBytes()), Integer.valueOf(lwCommMessage.getCalculatedPayloadBytes()), str});
        }
        LwCommServiceImpl.LOGGER.debug("Message is normal message");
        LwCommService.AcceptMode acceptMode = LwCommServiceImpl.getServiceImpl().getAcceptMode(lwCommMessage.getUserTag());
        boolean z = false;
        boolean z2 = false;
        boolean z3 = false;
        switch (AnonymousClass1.$SwitchMap$org$restcomm$imscf$common$lwcomm$service$LwCommService$AcceptMode[acceptMode.ordinal()]) {
            case 1:
                obj = this.processedMessageStore.asMap().putIfAbsent(lwCommMessage.getId(), MESSAGE_SENDER_STORE_ACCEPTED_VALUE);
                if (obj == null) {
                    z3 = true;
                    z = true;
                    LwCommServiceImpl.LOGGER.debug("Accepting message");
                    break;
                }
                break;
            case 2:
                obj = this.processedMessageStore.asMap().putIfAbsent(lwCommMessage.getId(), MESSAGE_SENDER_STORE_REJECTED_VALUE);
                if (obj == null) {
                    z2 = true;
                    LwCommServiceImpl.LOGGER.debug("Rejecting message");
                    LwCommServiceImpl.getServiceImpl().getStatistics().incRejectedIncomingMessageCount();
                    break;
                }
                break;
            case OverloadProtectorParameters.DEFAULT_DATA_COLLECTION_PERIOD_SEC /* 3 */:
                obj = this.processedMessageStore.asMap().get(lwCommMessage.getId());
                if (obj == null) {
                    LwCommServiceImpl.LOGGER.debug("Dropping message");
                    LwCommServiceImpl.getServiceImpl().getStatistics().incDroppedIncomingMessageCount();
                    break;
                }
                break;
            default:
                LwCommServiceImpl.LOGGER.error("Invalid acceptMode: {}", acceptMode);
                return;
        }
        if (MESSAGE_SENDER_STORE_ACCEPTED_VALUE.equals(obj)) {
            z = true;
            LwCommServiceImpl.LOGGER.debug("Sending ACK again to previously accepted message");
        } else if (MESSAGE_SENDER_STORE_REJECTED_VALUE.equals(obj)) {
            z2 = true;
            LwCommServiceImpl.LOGGER.debug("Sending NACK again to previously rejected message");
        }
        if (z3) {
            LwCommServiceImpl.LOGGER.debug("Delivering message");
            this.messageDeliveryExecutor.execute(new DeliverMessageHandler(lwCommMessage), lwCommMessage.getGroupId());
            LwCommServiceImpl.getServiceImpl().getStatistics().incProcessedIncomingMessageCount();
        } else if (obj != null) {
            LwCommServiceImpl.LOGGER.info("Received message with id {} more than one times! However, this does not cause errors since the message has been delivered at most once. Message content:\n{}", lwCommMessage.getId(), str);
            LwCommServiceImpl.getServiceImpl().getStatistics().incOutOfOrderMessageCount();
        }
        LwCommServiceImpl.getServiceImpl().getStatistics().setProcessedIncomingMessageStoreSize(this.processedMessageStore.size());
        if (z || z2) {
            switch (this.ackSendStrategy) {
                case IMMEDIATELY:
                    LwCommServiceImpl.LOGGER.debug("Sending back {} immediately", z ? LwCommMessage.ACK : "NACK");
                    sendAckImmediately(lwCommMessage, z, channelHandlerContext);
                    return;
                case SEND_CYCLE:
                    LwCommServiceImpl.LOGGER.debug("Sending back {} in standard loop to dedicated port", z ? LwCommMessage.ACK : "NACK");
                    MessageSender.createAck(lwCommMessage, z, null).startSendCycle();
                    return;
                default:
                    LwCommServiceImpl.LOGGER.error("Invalid ackSendStrategy: {}", this.ackSendStrategy);
                    return;
            }
        }
    }

    private void sendAckImmediately(LwCommMessage lwCommMessage, boolean z, ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer((z ? OutgoingMessage.createAck(lwCommMessage) : OutgoingMessage.createNack(lwCommMessage)).toRawMessage(), CharsetUtil.UTF_8), new InetSocketAddress(lwCommMessage.getFrom().getHost(), lwCommMessage.getFrom().getPort())));
    }

    private void handleAck(LwCommMessage lwCommMessage, String str) {
        LwCommServiceImpl.LOGGER.debug("ACK arrived for {}", lwCommMessage.getId());
        MessageSender messageSender = LwCommServiceImpl.getServiceImpl().getMessageSenderStore().getMessageSender(lwCommMessage.getId());
        Set set = (Set) this.receivedAckStore.getIfPresent(lwCommMessage.getId());
        if (messageSender != null) {
            messageSender.ackArrived(lwCommMessage);
            LwCommServiceImpl.getServiceImpl().getStatistics().incProcessedAckCount();
        } else if (set == null) {
            LwCommServiceImpl.LOGGER.warn("No MessageSender found for ACK message, and no ACK received in the near past. Message:\n{}", lwCommMessage);
        } else {
            LwCommServiceImpl.LOGGER.warn("Multiple ACK received for message with id {}. This is usually caused by retransmits. Check the sender side logs. Message: {}", lwCommMessage.getId(), lwCommMessage);
        }
        if (set == null) {
            set = new HashSet();
            this.receivedAckStore.put(lwCommMessage.getId(), set);
        }
        set.add(lwCommMessage.getFrom());
        if (set.size() > 1) {
            LwCommServiceImpl.LOGGER.error("ACKs received for message ({}) from multiple nodes: {} -- This message has been processed multiple times! Message content:\n{}", new Object[]{lwCommMessage.getId(), set, str});
            LwCommServiceImpl.getServiceImpl().getStatistics().incOutOfOrderAckCount();
        }
        LwCommServiceImpl.getServiceImpl().getStatistics().setReceivedAckStoreSize(this.receivedAckStore.size());
    }

    private void handleNack(LwCommMessage lwCommMessage) {
        LwCommServiceImpl.LOGGER.debug("NACK arrived for {}", lwCommMessage.getId());
        MessageSender messageSender = LwCommServiceImpl.getServiceImpl().getMessageSenderStore().getMessageSender(lwCommMessage.getId());
        if (messageSender != null) {
            messageSender.nackArrived(lwCommMessage);
            LwCommServiceImpl.getServiceImpl().getStatistics().incProcessedNackCount();
        }
    }
}
