package herddb.network.netty;

import herddb.network.Channel;
import herddb.network.Message;
import herddb.network.ReplyCallback;
import herddb.network.SendResultCallback;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;

/* loaded from: input_file:herddb/network/netty/NettyChannel.class */
public class NettyChannel extends Channel {
    volatile io.netty.channel.Channel socket;
    private static final Logger LOGGER = Logger.getLogger(NettyChannel.class.getName());
    private static final AtomicLong idGenerator = new AtomicLong();
    private final ExecutorService callbackexecutor;
    private final String remoteAddress;
    private final ConcurrentLongHashMap<ReplyCallback> pendingReplyMessages = new ConcurrentLongHashMap<>();
    private final ConcurrentLongHashMap<Message> pendingReplyMessagesSource = new ConcurrentLongHashMap<>();
    private final ConcurrentLongLongHashMap pendingReplyMessagesDeadline = new ConcurrentLongLongHashMap();
    private boolean ioErrors = false;
    private final long id = idGenerator.incrementAndGet();
    private volatile boolean closed = false;

    public String toString() {
        return "NettyChannel{name=" + this.name + ", id=" + this.id + ", socket=" + this.socket + " pending " + this.pendingReplyMessages.size() + " msgs}";
    }

    public NettyChannel(String str, io.netty.channel.Channel channel, ExecutorService executorService) {
        this.name = str;
        this.socket = channel;
        this.callbackexecutor = executorService;
        if (channel instanceof SocketChannel) {
            this.remoteAddress = ((SocketChannel) channel).remoteAddress() + "";
        } else {
            this.remoteAddress = "jvm-local";
        }
    }

    public void messageReceived(Message message) {
        if (message.getReplyMessageId() >= 0) {
            handleReply(message);
        } else {
            submitCallback(() -> {
                try {
                    this.messagesReceiver.messageReceived(message, this);
                } catch (Throwable th) {
                    LOGGER.log(Level.SEVERE, this + ": error " + th, th);
                    close();
                }
            });
        }
    }

    private void handleReply(Message message) {
        ReplyCallback replyCallback = this.pendingReplyMessages.get(message.getReplyMessageId());
        this.pendingReplyMessages.remove(message.getReplyMessageId());
        this.pendingReplyMessagesDeadline.remove(message.getReplyMessageId());
        Message remove = this.pendingReplyMessagesSource.remove(message.getReplyMessageId());
        if (replyCallback == null || remove == null) {
            return;
        }
        submitCallback(() -> {
            replyCallback.replyReceived(remove, message, null);
        });
    }

    @Override // herddb.network.Channel
    public void sendOneWayMessage(final Message message, final SendResultCallback sendResultCallback) {
        if (message.getMessageId() < 0) {
            message.assignMessageId();
        }
        io.netty.channel.Channel channel = this.socket;
        if (channel == null || !channel.isOpen()) {
            sendResultCallback.messageSent(message, new Exception(this + " connection is closed"));
        } else {
            channel.writeAndFlush(message).addListener2(new GenericFutureListener() { // from class: herddb.network.netty.NettyChannel.1
                @Override // io.netty.util.concurrent.GenericFutureListener
                public void operationComplete(Future future) throws Exception {
                    if (future.isSuccess()) {
                        sendResultCallback.messageSent(message, null);
                        return;
                    }
                    NettyChannel.LOGGER.log(Level.SEVERE, this + ": error " + future.cause(), future.cause());
                    sendResultCallback.messageSent(message, future.cause());
                    NettyChannel.this.close();
                }
            });
        }
    }

    @Override // herddb.network.Channel
    public void sendReplyMessage(Message message, Message message2) {
        if (message2.getMessageId() < 0) {
            message2.assignMessageId();
        }
        if (this.socket == null) {
            LOGGER.log(Level.SEVERE, this + " channel not active, discarding reply message " + message2);
        } else {
            message2.setReplyMessageId(message.messageId);
            sendOneWayMessage(message2, new SendResultCallback() { // from class: herddb.network.netty.NettyChannel.2
                @Override // herddb.network.SendResultCallback
                public void messageSent(Message message3, Throwable th) {
                    if (th != null) {
                        NettyChannel.LOGGER.log(Level.SEVERE, this + " error:" + th, th);
                    }
                }
            });
        }
    }

    private void processPendingReplyMessagesDeadline() {
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis();
        this.pendingReplyMessagesDeadline.forEach((j, j2) -> {
            if (j2 < currentTimeMillis) {
                arrayList.add(Long.valueOf(j));
            }
        });
        if (arrayList.isEmpty()) {
            return;
        }
        LOGGER.log(Level.SEVERE, this + " found " + arrayList + " without reply, channel will be closed");
        this.ioErrors = true;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            long longValue = ((Long) it.next()).longValue();
            Message remove = this.pendingReplyMessagesSource.remove(longValue);
            ReplyCallback remove2 = this.pendingReplyMessages.remove(longValue);
            this.pendingReplyMessagesDeadline.remove(longValue);
            if (remove != null && remove2 != null) {
                submitCallback(() -> {
                    remove2.replyReceived(remove, null, new IOException(this + " reply timeout expired, channel will be closed"));
                });
            }
        }
        close();
    }

    @Override // herddb.network.Channel
    public void sendMessageWithAsyncReply(final Message message, long j, final ReplyCallback replyCallback) {
        if (message.getMessageId() < 0) {
            message.assignMessageId();
        }
        if (!isValid()) {
            submitCallback(() -> {
                replyCallback.replyReceived(message, null, new Exception(this + " connection is not active"));
            });
            return;
        }
        this.pendingReplyMessages.put(message.getMessageId(), replyCallback);
        this.pendingReplyMessagesSource.put(message.getMessageId(), message);
        this.pendingReplyMessagesDeadline.put(message.getMessageId(), System.currentTimeMillis() + j);
        sendOneWayMessage(message, new SendResultCallback() { // from class: herddb.network.netty.NettyChannel.3
            @Override // herddb.network.SendResultCallback
            public void messageSent(Message message2, Throwable th) {
                if (th != null) {
                    NettyChannel.LOGGER.log(Level.SEVERE, this + ": error while sending reply message to " + message2, th);
                    NettyChannel nettyChannel = NettyChannel.this;
                    ReplyCallback replyCallback2 = replyCallback;
                    Message message3 = message;
                    nettyChannel.submitCallback(() -> {
                        replyCallback2.replyReceived(message3, null, new Exception(this + ": error while sending reply message to " + message2, th));
                    });
                }
            }
        });
    }

    @Override // herddb.network.Channel
    public boolean isValid() {
        io.netty.channel.Channel channel = this.socket;
        return (channel == null || !channel.isOpen() || this.ioErrors) ? false : true;
    }

    @Override // herddb.network.Channel, java.lang.AutoCloseable
    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        LOGGER.log(Level.SEVERE, this + ": closing");
        String str = this.socket + "";
        if (this.socket != null) {
            try {
                this.socket.close().await2();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                this.socket = null;
            }
        }
        failPendingMessages(str);
    }

    private void failPendingMessages(String str) {
        this.pendingReplyMessages.forEach((j, replyCallback) -> {
            Message remove = this.pendingReplyMessagesSource.remove(j);
            LOGGER.log(Level.SEVERE, this + " message " + j + " was not replied (" + remove + ") callback:" + replyCallback);
            if (remove != null) {
                submitCallback(() -> {
                    replyCallback.replyReceived(remove, null, new IOException("comunication channel is closed. Cannot wait for pending messages, socket=" + str));
                });
            }
        });
        this.pendingReplyMessages.clear();
        this.pendingReplyMessagesSource.clear();
        this.pendingReplyMessagesDeadline.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void exceptionCaught(Throwable th) {
        LOGGER.log(Level.SEVERE, this + " io-error " + th, th);
        this.ioErrors = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void channelClosed() {
        failPendingMessages(this.socket + "");
        submitCallback(() -> {
            if (this.messagesReceiver != null) {
                this.messagesReceiver.channelClosed(this);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submitCallback(Runnable runnable) {
        try {
            this.callbackexecutor.submit(runnable);
        } catch (RejectedExecutionException e) {
            LOGGER.log(Level.SEVERE, this + " rejected runnable " + runnable + BookKeeperConstants.COLON + e);
            try {
                runnable.run();
            } catch (Throwable th) {
                LOGGER.log(Level.SEVERE, this + " error on rejected runnable " + runnable + BookKeeperConstants.COLON + th);
            }
        }
    }

    @Override // herddb.network.Channel
    public String getRemoteAddress() {
        return this.remoteAddress;
    }

    @Override // herddb.network.Channel
    public void channelIdle() {
        LOGGER.log(Level.FINEST, "{0} channelIdle", this);
        processPendingReplyMessagesDeadline();
    }

    @Override // herddb.network.Channel
    public String getName() {
        return this.name;
    }

    @Override // herddb.network.Channel
    public void setName(String str) {
        this.name = str;
    }
}
