package blazingcache.network.jvm;

import blazingcache.network.Channel;
import blazingcache.network.Message;
import blazingcache.network.ReplyCallback;
import blazingcache.network.SendResultCallback;
import blazingcache.network.netty.MessageUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:blazingcache/network/jvm/JVMChannel.class */
public class JVMChannel extends Channel {
    private static final Logger LOGGER = Logger.getLogger(JVMChannel.class.getName());
    private JVMChannel otherSide;
    private volatile boolean active = false;
    private final Map<String, ReplyCallback> pendingReplyMessages = new ConcurrentHashMap();
    private final Map<String, Message> pendingReplyMessagesSource = new ConcurrentHashMap();
    private final ExecutorService callbackexecutor = Executors.newCachedThreadPool();
    private final ExecutorService executionserializer = Executors.newFixedThreadPool(1);
    private String id = UUID.randomUUID().toString();
    private volatile boolean closed = false;

    public String toString() {
        return "JVMChannel{active=" + this.active + ", id=" + this.id + '}';
    }

    private Message cloneMessage(Message message) {
        ByteBuf buffer = Unpooled.buffer();
        MessageUtils.encodeMessage(buffer, message);
        return MessageUtils.decodeMessage(buffer);
    }

    private void receiveMessageFromPeer(Message message) {
        Message cloneMessage = cloneMessage(message);
        if (cloneMessage.getReplyMessageId() != null) {
            handleReply(cloneMessage);
        } else {
            submitCallback(() -> {
                try {
                    this.messagesReceiver.messageReceived(cloneMessage);
                } catch (Throwable th) {
                    LOGGER.log(Level.SEVERE, this + ": error " + th, th);
                    close();
                }
            });
        }
    }

    public void setOtherSide(JVMChannel jVMChannel) {
        this.otherSide = jVMChannel;
        this.active = true;
    }

    @Override // blazingcache.network.Channel
    public void sendOneWayMessage(Message message, SendResultCallback sendResultCallback) {
        message.setMessageId(UUID.randomUUID().toString());
        Message cloneMessage = cloneMessage(message);
        if (!this.active || this.executionserializer.isShutdown()) {
            return;
        }
        this.executionserializer.submit(() -> {
            this.otherSide.receiveMessageFromPeer(cloneMessage);
            submitCallback(() -> {
                sendResultCallback.messageSent(cloneMessage, null);
            });
        });
    }

    private void handleReply(Message message) {
        ReplyCallback replyCallback = this.pendingReplyMessages.get(message.getReplyMessageId());
        if (replyCallback != null) {
            this.pendingReplyMessages.remove(message.getReplyMessageId());
            Message remove = this.pendingReplyMessagesSource.remove(message.getReplyMessageId());
            if (remove != null) {
                submitCallback(() -> {
                    replyCallback.replyReceived(remove, message, null);
                });
            }
        }
    }

    @Override // blazingcache.network.Channel
    public void sendReplyMessage(Message message, Message message2) {
        message2.setMessageId(UUID.randomUUID().toString());
        Message cloneMessage = cloneMessage(message2);
        if (this.executionserializer.isShutdown()) {
            LOGGER.log(Level.SEVERE, "channel shutdown, discarding reply message " + cloneMessage);
        } else {
            this.executionserializer.submit(() -> {
                if (!this.active) {
                    LOGGER.log(Level.SEVERE, "channel not active, discarding reply message " + cloneMessage);
                } else {
                    cloneMessage.setReplyMessageId(message.messageId);
                    this.otherSide.receiveMessageFromPeer(cloneMessage);
                }
            });
        }
    }

    private void submitCallback(Runnable runnable) {
        try {
            this.callbackexecutor.submit(runnable);
        } catch (RejectedExecutionException e) {
        }
    }

    @Override // blazingcache.network.Channel
    public void sendMessageWithAsyncReply(Message message, long j, ReplyCallback replyCallback) {
        message.setMessageId(UUID.randomUUID().toString());
        Message cloneMessage = cloneMessage(message);
        if (this.executionserializer.isShutdown()) {
            LOGGER.log(Level.SEVERE, "[JVM] channel shutdown, discarding sendMessageWithAsyncReply");
        } else {
            this.executionserializer.submit(() -> {
                if (!this.active) {
                    this.callbackexecutor.submit(() -> {
                        replyCallback.replyReceived(cloneMessage, null, new Exception("connection is not active"));
                    });
                    return;
                }
                this.pendingReplyMessages.put(cloneMessage.getMessageId(), replyCallback);
                this.pendingReplyMessagesSource.put(cloneMessage.getMessageId(), cloneMessage);
                this.otherSide.receiveMessageFromPeer(cloneMessage);
            });
        }
    }

    @Override // blazingcache.network.Channel
    public boolean isValid() {
        return this.active;
    }

    @Override // blazingcache.network.Channel, java.lang.AutoCloseable
    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        LOGGER.log(Level.SEVERE, this + ": closing");
        this.active = false;
        this.pendingReplyMessages.forEach((str, replyCallback) -> {
            submitCallback(() -> {
                Message remove = this.pendingReplyMessagesSource.remove(str);
                if (remove != null) {
                    replyCallback.replyReceived(remove, null, new Exception("comunication channel closed"));
                }
            });
        });
        this.pendingReplyMessages.clear();
        if (this.otherSide.active) {
            this.otherSide.close();
        }
        this.executionserializer.shutdown();
        this.callbackexecutor.shutdown();
        this.messagesReceiver.channelClosed();
    }

    @Override // blazingcache.network.Channel
    public void channelIdle() {
    }
}
