package io.joynr.dispatcher;

import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import io.joynr.common.ExpiryDate;
import io.joynr.dispatcher.rpc.Callback;
import io.joynr.dispatcher.rpc.JsonRequestInterpreter;
import io.joynr.endpoints.JoynrMessagingEndpointAddress;
import io.joynr.exceptions.JoynrCommunicationException;
import io.joynr.exceptions.JoynrException;
import io.joynr.exceptions.JoynrMessageNotSentException;
import io.joynr.exceptions.JoynrSendBufferFullException;
import io.joynr.exceptions.JoynrShutdownException;
import io.joynr.messaging.IMessageReceivers;
import io.joynr.messaging.MessageReceiver;
import io.joynr.messaging.ReceiverStatusListener;
import io.joynr.pubsub.publication.PublicationManager;
import io.joynr.pubsub.subscription.AttributeSubscriptionListener;
import io.joynr.pubsub.subscription.BroadcastSubscriptionListener;
import io.joynr.pubsub.subscription.SubscriptionManager;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import joynr.JoynrMessage;
import joynr.Reply;
import joynr.Request;
import joynr.SubscriptionPublication;
import joynr.SubscriptionRequest;
import joynr.SubscriptionStop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:WEB-INF/lib/libjoynr-0.8.0.jar:io/joynr/dispatcher/RequestReplyDispatcherImpl.class */
public class RequestReplyDispatcherImpl implements RequestReplyDispatcher {
    private ReplyCallerDirectory replyCallerDirectory;
    private MessagingEndpointDirectory messagingEndpointDirectory;
    protected RequestReplySender messageSender;
    private JsonRequestInterpreter jsonRequestInterpreter;
    private static final Logger logger = LoggerFactory.getLogger(RequestReplyDispatcherImpl.class);
    private final MessageReceiver messageReceiver;
    private final ObjectMapper objectMapper;
    private PublicationManager publicationManager;
    private SubscriptionManager subscriptionManager;
    private ScheduledExecutorService cleanupScheduler;
    private Map<String, PayloadListener<?>> oneWayRecipients = Maps.newHashMap();
    private Map<String, RequestCaller> requestCallerDirectory = Maps.newHashMap();
    private ConcurrentHashMap<String, ConcurrentLinkedQueue<ContentWithExpiryDate<JoynrMessage>>> messageQueue = new ConcurrentHashMap<>();
    private boolean shutdown = false;
    private boolean registering = false;

    @Inject
    public RequestReplyDispatcherImpl(RequestReplySender requestReplySender, IMessageReceivers iMessageReceivers, MessageReceiver messageReceiver, MessagingEndpointDirectory messagingEndpointDirectory, ReplyCallerDirectory replyCallerDirectory, @Named("joynr.messaging.channelid") String str, ObjectMapper objectMapper, PublicationManager publicationManager, SubscriptionManager subscriptionManager, JsonRequestInterpreter jsonRequestInterpreter, @Named("joynr.scheduler.cleanup") ScheduledExecutorService scheduledExecutorService) {
        this.messageSender = requestReplySender;
        this.messageReceiver = messageReceiver;
        this.messagingEndpointDirectory = messagingEndpointDirectory;
        this.replyCallerDirectory = replyCallerDirectory;
        this.objectMapper = objectMapper;
        this.publicationManager = publicationManager;
        this.subscriptionManager = subscriptionManager;
        this.jsonRequestInterpreter = jsonRequestInterpreter;
        this.cleanupScheduler = scheduledExecutorService;
        iMessageReceivers.registerMessageReceiver(messageReceiver, str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.util.Map<java.lang.String, io.joynr.dispatcher.PayloadListener<?>>] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v6 */
    @Override // io.joynr.dispatcher.RequestReplyDispatcher
    public void addOneWayRecipient(String str, PayloadListener<?> payloadListener) {
        ?? r0 = this.oneWayRecipients;
        synchronized (r0) {
            this.oneWayRecipients.put(str, payloadListener);
            r0 = r0;
            ConcurrentLinkedQueue<ContentWithExpiryDate<JoynrMessage>> remove = this.messageQueue.remove(str);
            if (remove != null) {
                Iterator<ContentWithExpiryDate<JoynrMessage>> it = remove.iterator();
                while (it.hasNext()) {
                    ContentWithExpiryDate<JoynrMessage> next = it.next();
                    if (!next.isExpired()) {
                        deliverMessageToListener(payloadListener, next.getContent());
                    }
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.util.Map<java.lang.String, io.joynr.dispatcher.RequestCaller>] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    @Override // io.joynr.dispatcher.RequestReplyDispatcher
    public void addRequestCaller(String str, RequestCaller requestCaller) {
        ?? r0 = this.requestCallerDirectory;
        synchronized (r0) {
            this.requestCallerDirectory.put(str, requestCaller);
            startReceiver();
            r0 = r0;
            ConcurrentLinkedQueue<ContentWithExpiryDate<JoynrMessage>> remove = this.messageQueue.remove(str);
            if (remove != null) {
                Iterator<ContentWithExpiryDate<JoynrMessage>> it = remove.iterator();
                while (it.hasNext()) {
                    ContentWithExpiryDate<JoynrMessage> next = it.next();
                    if (!next.isExpired()) {
                        executeRequestAndReply(requestCaller, next.getContent());
                    }
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v3, types: [io.joynr.messaging.MessageReceiver] */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private void startReceiver() {
        if (this.shutdown) {
            throw new JoynrShutdownException("cannot start receiver: dispatcher is already shutting down");
        }
        ?? r0 = this.messageReceiver;
        synchronized (r0) {
            if (!this.registering) {
                this.registering = true;
                this.messageReceiver.registerMessageListener(this);
                if (!this.messageReceiver.isStarted()) {
                    this.messageReceiver.startReceiver(new ReceiverStatusListener() { // from class: io.joynr.dispatcher.RequestReplyDispatcherImpl.1
                        @Override // io.joynr.messaging.ReceiverStatusListener
                        public void receiverStarted() {
                        }

                        @Override // io.joynr.messaging.ReceiverStatusListener
                        public void receiverException(Throwable th) {
                            RequestReplyDispatcherImpl.this.shutdown(false);
                        }
                    });
                }
            }
            r0 = r0;
        }
    }

    @Override // io.joynr.dispatcher.RequestReplyDispatcher
    public void addReplyCaller(String str, ReplyCaller replyCaller, long j) {
        this.replyCallerDirectory.putReplyCaller(str, replyCaller, DispatcherUtils.convertTtlToExpirationDate(j));
        startReceiver();
    }

    @Override // io.joynr.dispatcher.RequestReplyDispatcher
    public void removeReplyCaller(String str) {
        this.replyCallerDirectory.getAndRemoveReplyCaller(str);
    }

    @Override // io.joynr.messaging.MessageArrivedListener
    public void error(JoynrMessage joynrMessage, Throwable th) {
        ReplyCaller andRemoveReplyCaller;
        if (joynrMessage == null) {
            logger.error("error: ", th);
            return;
        }
        if (joynrMessage.getType().equals(JoynrMessage.MESSAGE_TYPE_REQUEST)) {
            try {
                String requestReplyId = ((Request) this.objectMapper.readValue(joynrMessage.getPayload(), Request.class)).getRequestReplyId();
                if (requestReplyId == null || (andRemoveReplyCaller = this.replyCallerDirectory.getAndRemoveReplyCaller(requestReplyId)) == null) {
                    return;
                }
                andRemoveReplyCaller.error(th);
            } catch (IOException e) {
                logger.error("Error extracting payload for message " + joynrMessage.getId() + ", raw payload: " + joynrMessage.getPayload(), e.getMessage());
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.util.Map<java.lang.String, io.joynr.dispatcher.RequestCaller>] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v6 */
    @Override // io.joynr.dispatcher.RequestReplyDispatcher
    public void removeRequestCaller(String str) {
        ?? r0 = this.requestCallerDirectory;
        synchronized (r0) {
            this.requestCallerDirectory.remove(str);
            r0 = r0;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.util.Map<java.lang.String, io.joynr.dispatcher.PayloadListener<?>>] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v6 */
    @Override // io.joynr.dispatcher.RequestReplyDispatcher
    public void removeListener(String str) {
        ?? r0 = this.oneWayRecipients;
        synchronized (r0) {
            this.oneWayRecipients.remove(str);
            r0 = r0;
        }
    }

    @Override // io.joynr.messaging.MessageArrivedListener
    public void messageArrived(JoynrMessage joynrMessage) {
        if (joynrMessage != null) {
            if (DispatcherUtils.isExpired(joynrMessage.getExpiryDate())) {
                logger.debug("TTL expired, discarding message : {}", joynrMessage.toLogMessage());
                return;
            }
            String type = joynrMessage.getType();
            if (JoynrMessage.MESSAGE_TYPE_REPLY.equals(type)) {
                handleReplyMessageReceived(joynrMessage);
                return;
            }
            if (JoynrMessage.MESSAGE_TYPE_REQUEST.equals(type)) {
                handleRequestMessageReceived(joynrMessage);
                return;
            }
            if (JoynrMessage.MESSAGE_TYPE_ONE_WAY.equals(type)) {
                handleOneWayMessageReceived(joynrMessage);
                return;
            }
            if (JoynrMessage.MESSAGE_TYPE_SUBSCRIPTION_REQUEST.equals(type) || JoynrMessage.MESSAGE_TYPE_BROADCAST_SUBSCRIPTION_REQUEST.equals(type)) {
                handleSubscriptionRequestReceived(joynrMessage);
            } else if (JoynrMessage.MESSAGE_TYPE_SUBSCRIPTION_STOP.equals(type)) {
                handleSubscriptionStopReceived(joynrMessage);
            } else if (JoynrMessage.MESSAGE_TYPE_PUBLICATION.equals(type)) {
                handlePublicationReceived(joynrMessage);
            }
        }
    }

    private void handlePublicationReceived(JoynrMessage joynrMessage) {
        logger.info("Publication received");
        deliverPublication(joynrMessage);
    }

    private void deliverPublication(JoynrMessage joynrMessage) {
        Object convertValue;
        try {
            SubscriptionPublication subscriptionPublication = (SubscriptionPublication) this.objectMapper.readValue(joynrMessage.getPayload(), SubscriptionPublication.class);
            String subscriptionId = subscriptionPublication.getSubscriptionId();
            Class<?> type = this.subscriptionManager.getType(subscriptionId);
            if (TypeReference.class.isAssignableFrom(type)) {
                convertValue = this.objectMapper.convertValue(subscriptionPublication.getResponse(), (TypeReference<?>) type.newInstance());
            } else {
                convertValue = this.objectMapper.convertValue(subscriptionPublication.getResponse(), type);
            }
            this.subscriptionManager.touchSubscriptionState(subscriptionId);
            if (this.subscriptionManager.isBroadcast(subscriptionId)) {
                callBroadcastListener(subscriptionId, convertValue);
            } else {
                callSubscriptionListener(subscriptionId, convertValue);
            }
        } catch (Exception e) {
            logger.error("Error delivering publication: {} : {}", e.getClass(), e.getMessage());
        }
    }

    private <T> void callSubscriptionListener(String str, T t) {
        AttributeSubscriptionListener<T> subscriptionListener = this.subscriptionManager.getSubscriptionListener(str);
        if (subscriptionListener == null) {
            logger.error("No subscription listener found for incoming publication!");
        } else {
            subscriptionListener.onReceive(t);
        }
    }

    private void callBroadcastListener(String str, Object obj) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
        BroadcastSubscriptionListener broadcastSubscriptionListener = this.subscriptionManager.getBroadcastSubscriptionListener(str);
        List list = (List) obj;
        Object[] array = list.toArray(new Object[list.size()]);
        Method declaredMethod = broadcastSubscriptionListener.getClass().getDeclaredMethod("onReceive", getParameterTypesForBroadcastPublication(array));
        if (!declaredMethod.isAccessible()) {
            declaredMethod.setAccessible(true);
        }
        declaredMethod.invoke(broadcastSubscriptionListener, array);
    }

    private Class<?>[] getParameterTypesForBroadcastPublication(Object[] objArr) {
        ArrayList arrayList = new ArrayList(objArr.length);
        for (Object obj : objArr) {
            arrayList.add(obj.getClass());
        }
        return (Class[]) arrayList.toArray(new Class[arrayList.size()]);
    }

    private void handleSubscriptionStopReceived(JoynrMessage joynrMessage) {
        logger.info("Subscription stop received");
        try {
            this.publicationManager.stopPublication(((SubscriptionStop) this.objectMapper.readValue(joynrMessage.getPayload(), SubscriptionStop.class)).getSubscriptionId());
        } catch (Exception e) {
            logger.error("Error delivering subscription stop: {}", e.getMessage());
        }
    }

    private void handleSubscriptionRequestReceived(JoynrMessage joynrMessage) {
        String to = joynrMessage.getTo();
        String from = joynrMessage.getFrom();
        if (!this.requestCallerDirectory.containsKey(to)) {
            logger.debug("Received subscriptionRequest for unknown participant. Discarding request.");
            return;
        }
        RequestCaller requestCaller = this.requestCallerDirectory.get(to);
        try {
            SubscriptionRequest subscriptionRequest = (SubscriptionRequest) this.objectMapper.readValue(joynrMessage.getPayload(), SubscriptionRequest.class);
            this.messagingEndpointDirectory.put(from, new JoynrMessagingEndpointAddress(joynrMessage.getHeaderValue(JoynrMessage.HEADER_NAME_REPLY_CHANNELID)));
            this.publicationManager.addSubscriptionRequest(from, to, subscriptionRequest, requestCaller);
        } catch (JsonParseException e) {
            logger.error("Error parsing request payload. msgId: {}. from: {} to: {}. Reason: {}. Discarding request.", (Object[]) new String[]{from, to, joynrMessage.getId(), e.getMessage()});
        } catch (JsonMappingException e2) {
            logger.error("Error parsing request payload. msgId: {}. from: {} to: {}. Reason: {}. Discarding request.", (Object[]) new String[]{from, to, joynrMessage.getId(), e2.getMessage()});
        } catch (IOException e3) {
            logger.error("Error parsing request payload. msgId: {}. from: {} to: {}. Reason: {}. Discarding request.", (Object[]) new String[]{from, to, joynrMessage.getId(), e3.getMessage()});
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v11 */
    /* JADX WARN: Type inference failed for: r0v3, types: [java.util.Map<java.lang.String, io.joynr.dispatcher.PayloadListener<?>>] */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.lang.Throwable] */
    private void handleOneWayMessageReceived(JoynrMessage joynrMessage) {
        String headerValue = joynrMessage.getHeaderValue(JoynrMessage.HEADER_NAME_TO_PARTICIPANT_ID);
        ?? r0 = this.oneWayRecipients;
        synchronized (r0) {
            PayloadListener<?> payloadListener = this.oneWayRecipients.get(headerValue);
            if (payloadListener != null) {
                deliverMessageToListener(payloadListener, joynrMessage);
            } else {
                putMessage(headerValue, joynrMessage, ExpiryDate.fromAbsolute(joynrMessage.getExpiryDate()));
            }
            r0 = r0;
        }
    }

    private void deliverMessageToListener(PayloadListener payloadListener, JoynrMessage joynrMessage) {
        try {
            payloadListener.receive(this.objectMapper.readValue(joynrMessage.getPayload(), Object.class));
        } catch (JsonParseException e) {
            logger.error("Error parsing oneway payload. msgId: {}. from: {} to: {}. Reason: {}. Discarding oneway.", (Object[]) new String[]{joynrMessage.getFrom(), joynrMessage.getFrom(), joynrMessage.getId(), e.getMessage()});
        } catch (JsonMappingException e2) {
            logger.error("Error parsing oneway payload. msgId: {}. from: {} to: {}. Reason: {}. Discarding oneway.", (Object[]) new String[]{joynrMessage.getFrom(), joynrMessage.getFrom(), joynrMessage.getId(), e2.getMessage()});
        } catch (IOException e3) {
            logger.error("Error parsing oneway payload. msgId: {}. from: {} to: {}. Reason: {}. Discarding oneway.", (Object[]) new String[]{joynrMessage.getFrom(), joynrMessage.getFrom(), joynrMessage.getId(), e3.getMessage()});
        }
    }

    private void handleRequestMessageReceived(JoynrMessage joynrMessage) {
        String headerValue = joynrMessage.getHeaderValue(JoynrMessage.HEADER_NAME_FROM_PARTICIPANT_ID);
        String headerValue2 = joynrMessage.getHeaderValue(JoynrMessage.HEADER_NAME_TO_PARTICIPANT_ID);
        String headerValue3 = joynrMessage.getHeaderValue(JoynrMessage.HEADER_NAME_REPLY_CHANNELID);
        if (headerValue3 != null && !headerValue3.isEmpty()) {
            this.messagingEndpointDirectory.put(headerValue, new JoynrMessagingEndpointAddress(headerValue3));
        }
        if (this.requestCallerDirectory.containsKey(headerValue2)) {
            executeRequestAndReply(this.requestCallerDirectory.get(headerValue2), joynrMessage);
        } else {
            putMessage(headerValue2, joynrMessage, ExpiryDate.fromAbsolute(joynrMessage.getExpiryDate()));
            logger.info("No requestCaller found for participantId: {} queuing request message.", headerValue2);
        }
    }

    private void executeRequestAndReply(RequestCaller requestCaller, final JoynrMessage joynrMessage) {
        try {
            Request request = (Request) this.objectMapper.readValue(joynrMessage.getPayload(), Request.class);
            logger.debug("executing request from message: {} request: {}", joynrMessage.getId(), request.getRequestReplyId());
            if (requestCaller instanceof RequestCallerAsync) {
                this.jsonRequestInterpreter.execute(new Callback<Reply>() { // from class: io.joynr.dispatcher.RequestReplyDispatcherImpl.2
                    @Override // io.joynr.dispatcher.rpc.Callback
                    public void onSuccess(Reply reply) {
                        try {
                            if (DispatcherUtils.isExpired(joynrMessage.getExpiryDate())) {
                                RequestReplyDispatcherImpl.logger.error("Error: reply {} is not send to caller, as the expiryDate of the requesting message {} has been reached.", reply, new Date(joynrMessage.getExpiryDate()));
                            } else {
                                RequestReplyDispatcherImpl.this.sendReply(joynrMessage, reply);
                            }
                        } catch (Exception e) {
                            RequestReplyDispatcherImpl.logger.error("Error processing message: \r\n {} : error : {}", joynrMessage, e);
                        }
                    }

                    @Override // io.joynr.dispatcher.rpc.Callback
                    public void onFailure(JoynrException joynrException) {
                        RequestReplyDispatcherImpl.logger.error("Error processing message: \r\n {} ; error: {}", joynrMessage, joynrException);
                    }
                }, (RequestCallerAsync) requestCaller, request);
            } else if (requestCaller instanceof RequestCallerSync) {
                sendReply(joynrMessage, this.jsonRequestInterpreter.execute((RequestCallerSync) requestCaller, request));
            } else {
                logger.error("Error processing message: \r\n {}. RequestCaller type {} unknown.", joynrMessage, requestCaller.getClass().getName());
            }
        } catch (Throwable th) {
            logger.error("Error processing message: \r\n {}", joynrMessage, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendReply(JoynrMessage joynrMessage, Reply reply) throws JsonGenerationException, JsonMappingException, IOException {
        String headerValue = joynrMessage.getHeaderValue(JoynrMessage.HEADER_NAME_TO_PARTICIPANT_ID);
        String headerValue2 = joynrMessage.getHeaderValue(JoynrMessage.HEADER_NAME_FROM_PARTICIPANT_ID);
        long parseLong = Long.parseLong(joynrMessage.getHeader().get(JoynrMessage.HEADER_NAME_EXPIRY_DATE));
        if (parseLong <= System.currentTimeMillis()) {
            logger.error("Expiry Date exceeded. Reply discarded: messageId: {} requestReplyId: {}", joynrMessage.getId(), reply.getRequestReplyId());
            return;
        }
        try {
            this.messageSender.sendReply(headerValue, headerValue2, reply, ExpiryDate.fromAbsolute(parseLong));
        } catch (JoynrCommunicationException e) {
            logger.error("Responder could not reply due to a JoynCommunicationException: ", (Throwable) e);
        } catch (JoynrMessageNotSentException e2) {
            logger.error("Responder could not reply due to a JoynrMessageNotSentException: ", (Throwable) e2);
        } catch (JoynrSendBufferFullException e3) {
            logger.error("Responder could not reply due to a JoynSendBufferFullException: ", (Throwable) e3);
        }
    }

    private void handleReplyMessageReceived(JoynrMessage joynrMessage) {
        try {
            Reply reply = (Reply) this.objectMapper.readValue(joynrMessage.getPayload(), Reply.class);
            ReplyCaller andRemoveReplyCaller = this.replyCallerDirectory.getAndRemoveReplyCaller(reply.getRequestReplyId());
            if (andRemoveReplyCaller == null) {
                logger.warn("No reply caller found for id: " + reply.getRequestReplyId());
            } else {
                logger.debug("Parsed response from json with payload :" + joynrMessage.getPayload());
                andRemoveReplyCaller.messageCallBack(reply);
            }
        } catch (Exception e) {
            logger.error("Error parsing reply payload. msgId: {}. from: {} to: {}. Reason: {}. Discarding reply.", (Object[]) new String[]{joynrMessage.getFrom(), joynrMessage.getFrom(), joynrMessage.getId(), e.getMessage()});
        }
    }

    @Override // io.joynr.dispatcher.RequestReplyDispatcher
    public void shutdown(boolean z) {
        logger.info("SHUTTING DOWN Dispatcher");
        this.shutdown = true;
        try {
            this.messageReceiver.shutdown(z);
        } catch (Exception unused) {
            logger.error("error shutting down messageReceiver");
        }
        try {
            this.replyCallerDirectory.shutdown();
        } catch (Exception unused2) {
            logger.error("error shutting down replyCallerDirectory");
        }
    }

    private void putMessage(final String str, JoynrMessage joynrMessage, ExpiryDate expiryDate) {
        if (!this.messageQueue.containsKey(str)) {
            this.messageQueue.putIfAbsent(str, new ConcurrentLinkedQueue<>());
        }
        final ContentWithExpiryDate<JoynrMessage> contentWithExpiryDate = new ContentWithExpiryDate<>(joynrMessage, expiryDate);
        this.messageQueue.get(str).add(contentWithExpiryDate);
        this.cleanupScheduler.schedule(new Runnable() { // from class: io.joynr.dispatcher.RequestReplyDispatcherImpl.3
            @Override // java.lang.Runnable
            public void run() {
                ((ConcurrentLinkedQueue) RequestReplyDispatcherImpl.this.messageQueue.get(str)).remove(contentWithExpiryDate);
                JoynrMessage joynrMessage2 = (JoynrMessage) contentWithExpiryDate.getContent();
                RequestReplyDispatcherImpl.logger.warn("TTL DISCARD. msgId: {} from: {} to: {} because it has expired. ", (Object[]) new String[]{joynrMessage2.getId(), joynrMessage2.getFrom(), joynrMessage2.getTo()});
            }
        }, expiryDate.getRelativeTtl(), TimeUnit.MILLISECONDS);
    }
}
