package io.bosonnetwork.kademlia;

import com.google.common.base.Preconditions;
import io.bosonnetwork.Id;
import io.bosonnetwork.Network;
import io.bosonnetwork.kademlia.NetworkEngine;
import io.bosonnetwork.kademlia.Throttle;
import io.bosonnetwork.kademlia.exceptions.CryptoError;
import io.bosonnetwork.kademlia.exceptions.IOError;
import io.bosonnetwork.kademlia.messages.ErrorMessage;
import io.bosonnetwork.kademlia.messages.Message;
import io.bosonnetwork.kademlia.messages.MessageException;
import io.bosonnetwork.utils.AddressUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Formatter;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/bosonnetwork/kademlia/RPCServer.class */
public class RPCServer implements NetworkEngine.Selectable {
    private static final int WRITE_STATE_INITIAL = -1;
    private static final int WRITE_STATE_IDLE = 0;
    private static final int WRITE_STATE_WRITING = 1;
    private static final int WRITE_STATE_AWAITING = 2;
    private static final int WRITE_STATE_CLOSED = 3;
    private DHT dht;
    private InetSocketAddress addr;
    private DatagramChannel channel;
    private Instant startTime;
    private State state;
    private AtomicInteger writeState;
    private AtomicLong receivedMessages;
    private AtomicLong sentMessages;
    private volatile boolean isReachable;
    private long messagesAtLastReachableCheck;
    private long lastReachableCheck;
    private AtomicInteger callQueueGuard;
    private Map<Integer, RPCCall> calls;
    private Queue<RPCCall> callQueue;
    private Queue<Message> pipeline;
    private Throttle inboundThrottle;
    private Throttle outboundThrottle;
    private TimeoutSampler timeoutSampler;
    private RPCStatistics stats;
    private ExponentialWeightendMovingAverage unverifiedLossrate;
    private ExponentialWeightendMovingAverage verifiedEntryLossrate;
    private static final ThreadLocal<ByteBuffer> writeBuffer;
    private static final ThreadLocal<ByteBuffer> readBuffer;
    private int nextTxid;
    private static final Logger log;
    private final RPCCallListener callListener;
    private static final int MIN_PACKET_SIZE = 61;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/bosonnetwork/kademlia/RPCServer$State.class */
    public enum State {
        INITIAL,
        RUNNING,
        STOPPED
    }

    public RPCServer(DHT dht, InetSocketAddress inetSocketAddress, boolean z) {
        this.nextTxid = ThreadLocalRandom.current().nextInt(1, 32768);
        this.callListener = new RPCCallListener() { // from class: io.bosonnetwork.kademlia.RPCServer.1
            @Override // io.bosonnetwork.kademlia.RPCCallListener
            public void onTimeout(RPCCall rPCCall) {
                RPCServer.this.calls.remove(Integer.valueOf(rPCCall.getRequest().getTxid()), rPCCall);
                RPCServer.this.stats.timeoutMessage(rPCCall.getRequest());
                if (rPCCall.knownReachableAtCreationTime()) {
                    RPCServer.this.verifiedEntryLossrate.updateAverage(1.0d);
                } else {
                    RPCServer.this.unverifiedLossrate.updateAverage(1.0d);
                }
                RPCServer.this.dht.onTimeout(rPCCall);
                RPCServer.this.processCallQueue();
            }

            @Override // io.bosonnetwork.kademlia.RPCCallListener
            public void onResponse(RPCCall rPCCall, Message message) {
                if (rPCCall.knownReachableAtCreationTime()) {
                    RPCServer.this.verifiedEntryLossrate.updateAverage(0.0d);
                } else {
                    RPCServer.this.unverifiedLossrate.updateAverage(0.0d);
                }
            }
        };
        Preconditions.checkArgument(inetSocketAddress != null, "Invalid socket address");
        this.dht = dht;
        this.addr = inetSocketAddress;
        this.state = State.INITIAL;
        this.writeState = new AtomicInteger(WRITE_STATE_INITIAL);
        this.callQueueGuard = new AtomicInteger(WRITE_STATE_IDLE);
        this.calls = new ConcurrentHashMap(Constants.MAX_ACTIVE_CALLS);
        this.callQueue = new ConcurrentLinkedQueue();
        this.pipeline = new ConcurrentLinkedQueue();
        this.outboundThrottle = z ? new Throttle.Eanbled() : new Throttle.Disabled();
        this.inboundThrottle = z ? new Throttle.Eanbled() : new Throttle.Disabled();
        this.timeoutSampler = new TimeoutSampler();
        this.stats = new RPCStatistics();
        this.receivedMessages = new AtomicLong();
        this.sentMessages = new AtomicLong();
        this.unverifiedLossrate = new ExponentialWeightendMovingAverage(0.01d, 0.5d);
        this.verifiedEntryLossrate = new ExponentialWeightendMovingAverage(0.01d, 0.5d);
        dht.setRPCServer(this);
    }

    public RPCServer(DHT dht, InetSocketAddress inetSocketAddress) {
        this(dht, inetSocketAddress, true);
    }

    public RPCServer(DHT dht, InetAddress inetAddress, int i, boolean z) {
        this(dht, new InetSocketAddress(inetAddress, i), z);
    }

    public RPCServer(DHT dht, InetAddress inetAddress, int i) {
        this(dht, inetAddress, i, true);
    }

    public InetSocketAddress getAddress() {
        return this.addr;
    }

    public int getPort() {
        return this.addr.getPort();
    }

    public State getState() {
        return this.state;
    }

    public Node getNode() {
        return this.dht.getNode();
    }

    public DHT getDHT() {
        return this.dht;
    }

    public long getNumberOfReceivedMessages() {
        return this.receivedMessages.get();
    }

    public long getNumberOfSentMessages() {
        return this.sentMessages.get();
    }

    public int getNumberOfActiveRPCCalls() {
        return this.calls.size();
    }

    public RPCStatistics getStats() {
        return this.stats;
    }

    public boolean isReachable() {
        return this.isReachable;
    }

    public void checkReachability(long j) {
        if (this.receivedMessages.get() != this.messagesAtLastReachableCheck) {
            this.isReachable = true;
            this.lastReachableCheck = j;
            this.messagesAtLastReachableCheck = this.receivedMessages.get();
        } else if (j - this.lastReachableCheck > 60000) {
            this.isReachable = false;
            this.timeoutSampler.reset();
        }
    }

    public synchronized void start() throws IOError {
        Preconditions.checkState(this.state == State.INITIAL, "already started");
        try {
            this.channel = DatagramChannel.open(StandardProtocolFamily.INET);
            this.channel.configureBlocking(false);
            this.channel.setOption((SocketOption<SocketOption>) StandardSocketOptions.SO_RCVBUF, (SocketOption) 2097152);
            this.channel.setOption((SocketOption<SocketOption>) StandardSocketOptions.SO_REUSEADDR, (SocketOption) true);
            this.channel.bind((SocketAddress) this.addr);
            this.writeState.set(WRITE_STATE_IDLE);
            this.state = State.RUNNING;
            this.startTime = Instant.now();
            getNode().getNetworkEngine().register(this);
            log.info("Started RPC server {}", AddressUtils.toString(this.addr));
        } catch (IOException e) {
            throw new IOError("Open and bing UDP socket error.", e);
        }
    }

    public synchronized void stop() {
        if (this.state == State.STOPPED) {
            return;
        }
        this.state = State.STOPPED;
        this.writeState.set(3);
        if (this.channel != null) {
            try {
                this.channel.close();
            } catch (IOException e) {
            }
        }
        Stream.of((Object[]) new Stream[]{this.calls.values().stream(), this.callQueue.stream(), this.pipeline.stream().map(message -> {
            return message.getAssociatedCall();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        })}).flatMap(stream -> {
            return stream;
        }).forEach(rPCCall -> {
            rPCCall.cancel();
        });
        this.pipeline.clear();
        log.info("Stopped RPC Server {}", this.addr);
    }

    public void sendCall(RPCCall rPCCall) {
        this.callQueue.add(rPCCall);
        processCallQueue();
    }

    private void processCallQueue() {
        RPCCall poll;
        if (this.callQueueGuard.compareAndSet(WRITE_STATE_IDLE, 1)) {
            this.outboundThrottle.decay();
            int size = Constants.MAX_ACTIVE_CALLS - this.calls.size();
            while (size > 0 && (poll = this.callQueue.poll()) != null) {
                int estimateDeplayAndInc = this.outboundThrottle.estimateDeplayAndInc(poll.getRequest().getRemoteAddress().getAddress());
                if (estimateDeplayAndInc > 0) {
                    int nextInt = estimateDeplayAndInc + ThreadLocalRandom.current().nextInt(10, 50);
                    log.info("Throttled(delay {}ms) the RPCCall to remote peer {}@{}, {}", new Object[]{Integer.valueOf(nextInt), poll.getTargetId(), AddressUtils.toString(poll.getRequest().getRemoteAddress()), poll.getRequest()});
                    getScheduler().schedule(() -> {
                        this.callQueue.add(poll);
                        processCallQueue();
                        this.outboundThrottle.saturatingDec(poll.getRequest().getRemoteAddress().getAddress());
                    }, nextInt, TimeUnit.MILLISECONDS);
                } else {
                    int i = this.nextTxid;
                    this.nextTxid = i + 1;
                    int i2 = i;
                    if (i2 == 0) {
                        int i3 = this.nextTxid;
                        this.nextTxid = i3 + 1;
                        i2 = i3;
                    }
                    poll.getRequest().setTxid(i2);
                    if (this.calls.putIfAbsent(Integer.valueOf(i2), poll) == null) {
                        size += WRITE_STATE_INITIAL;
                        dispatchCall(poll);
                    } else {
                        this.callQueue.add(poll);
                        log.error("!!! Should never happen - put the call to call queue failed.");
                    }
                }
            }
            this.callQueueGuard.set(WRITE_STATE_IDLE);
            if (size <= 0 || this.callQueue.peek() == null) {
                return;
            }
            getDHT().getNode().getScheduler().execute(this::processCallQueue);
        }
    }

    private void dispatchCall(RPCCall rPCCall) {
        Message request = rPCCall.getRequest();
        if (!$assertionsDisabled && request.getRemoteAddress() == null) {
            throw new AssertionError();
        }
        rPCCall.addListener(this.callListener);
        if (!rPCCall.knownReachableAtCreationTime()) {
            this.timeoutSampler.registerCall(rPCCall);
        }
        if (rPCCall.getExpectedRTT() == -1) {
            rPCCall.setExpectedRTT(this.timeoutSampler.getStallTimeout());
        }
        request.setAssociatedCall(rPCCall);
        fillPipeline(request);
    }

    public void sendMessage(Message message) {
        Preconditions.checkArgument(message.getRemoteAddress() != null, "message destination can not be null");
        fillPipeline(message);
    }

    private void fillPipeline(Message message) {
        if (message.getId() == null) {
            message.setId(getNode().getId());
        }
        message.setServer(this);
        message.setVersion(Constants.VERSION);
        RPCCall associatedCall = message.getAssociatedCall();
        if (associatedCall != null) {
            this.dht.onSend(associatedCall);
        }
        this.pipeline.add(message);
        processPipeline();
    }

    private void processPipeline() {
        int send;
        if (this.writeState.compareAndSet(WRITE_STATE_IDLE, 1)) {
            while (true) {
                Message poll = this.pipeline.poll();
                if (poll == null) {
                    break;
                }
                try {
                    ByteBuffer byteBuffer = writeBuffer.get();
                    byteBuffer.clear();
                    byte[] encrypt = getNode().encrypt(poll.getRemoteId(), poll.serialize());
                    byteBuffer.put(poll.getId().bytes());
                    byteBuffer.put(encrypt);
                    byteBuffer.flip();
                    send = this.channel.send(byteBuffer, poll.getRemoteAddress());
                } catch (CryptoError e) {
                    log.error("Failed to encrypt message {}/{} to {}: {}", new Object[]{poll.getMethod(), poll.getType(), AddressUtils.toString(poll.getRemoteAddress()), poll});
                } catch (IOException e2) {
                    if (!this.channel.isOpen()) {
                        return;
                    }
                    if (e2.getMessage().equals("No buffer space available")) {
                        log.debug("Awaiting the socket available to send the messages.");
                        this.pipeline.add(poll);
                        this.writeState.set(2);
                        getNode().getNetworkEngine().updateInterestOps(this);
                        return;
                    }
                    log.error("Failed while attempting to send {}/{} to {}: {}", new Object[]{poll.getMethod(), poll.getType(), AddressUtils.toString(poll.getRemoteAddress()), poll});
                    log.error("Stack trace", e2);
                    if (poll.getAssociatedCall() != null) {
                        poll.getAssociatedCall().failed();
                    }
                }
                if (send == 0) {
                    log.debug("Awaiting the socket available to send the messages.");
                    this.pipeline.add(poll);
                    this.writeState.set(2);
                    getNode().getNetworkEngine().updateInterestOps(this);
                    return;
                }
                log.trace("sent {}/{} to {}: [{}]{}", new Object[]{poll.getMethod(), poll.getType(), AddressUtils.toString(poll.getRemoteAddress()), Integer.valueOf(send), poll});
                if (poll.getAssociatedCall() != null) {
                    poll.getAssociatedCall().sent(this);
                    this.inboundThrottle.clear(poll.getRemoteAddress().getAddress());
                }
                this.sentMessages.incrementAndGet();
                this.stats.sentMessage(poll);
                this.stats.sentBytes(send + this.dht.getType().protocolHeaderSize());
            }
            this.writeState.compareAndSet(1, WRITE_STATE_IDLE);
            if (this.pipeline.peek() != null) {
                getNode().getScheduler().execute(this::processPipeline);
            }
        }
    }

    private void processPackets() throws IOException {
        ByteBuffer byteBuffer = readBuffer.get();
        this.inboundThrottle.decay();
        while (true) {
            byteBuffer.clear();
            InetSocketAddress inetSocketAddress = (InetSocketAddress) this.channel.receive(byteBuffer);
            if (inetSocketAddress == null) {
                return;
            }
            byteBuffer.flip();
            if (byteBuffer.limit() == 0) {
                return;
            }
            this.stats.receivedBytes(byteBuffer.limit() + this.dht.getType().protocolHeaderSize());
            if (byteBuffer.limit() < MIN_PACKET_SIZE || inetSocketAddress.getPort() == 0 || !this.dht.getType().canUseSocketAddress(inetSocketAddress)) {
                log.warn("Dropped an invalid packet from {}.", AddressUtils.toString(inetSocketAddress));
                this.stats.droppedPacket(byteBuffer.limit() + this.dht.getType().protocolHeaderSize());
            } else if (this.inboundThrottle.saturatingInc(inetSocketAddress.getAddress())) {
                log.warn("Throttled an packet from {}", AddressUtils.toString(inetSocketAddress));
                this.stats.droppedPacket(byteBuffer.limit() + this.dht.getType().protocolHeaderSize());
            } else {
                byte[] bArr = new byte[byteBuffer.limit()];
                byteBuffer.get(bArr);
                getNode().getScheduler().execute(() -> {
                    handlePacket(bArr, inetSocketAddress);
                });
            }
        }
    }

    private void handlePacket(byte[] bArr, InetSocketAddress inetSocketAddress) {
        Id of = Id.of(bArr, WRITE_STATE_IDLE);
        Blacklist blacklist = getNode().getBlacklist();
        if (blacklist.isBanned(inetSocketAddress)) {
            log.warn("Ignored the message from banned address {}", AddressUtils.toString(inetSocketAddress));
            return;
        }
        if (blacklist.isBanned(of)) {
            log.warn("Ignored the message from banned node {}", of);
            return;
        }
        try {
            Message parse = Message.parse(getNode().decrypt(of, Arrays.copyOfRange(bArr, 32, bArr.length)));
            parse.setId(of);
            blacklist.observe(inetSocketAddress, of);
            log.trace("Received {}/{} from {}: [{}]{}", new Object[]{parse.getMethod(), parse.getType(), AddressUtils.toString(inetSocketAddress), Integer.valueOf(bArr.length), parse});
            this.receivedMessages.incrementAndGet();
            this.stats.receivedMessage(parse);
            parse.setOrigin(inetSocketAddress);
            if (parse.getType() != Message.Type.ERROR && parse.getTxid() == 0) {
                log.warn("Received a message with invalid transaction id.");
                ErrorMessage errorMessage = new ErrorMessage(parse.getMethod(), WRITE_STATE_IDLE, ErrorCode.ProtocolError.value(), "Received a message with an invalid transaction id, expected a non-zero transaction id");
                errorMessage.setRemote(parse.getId(), parse.getOrigin());
                sendMessage(errorMessage);
                return;
            }
            if (parse.getType() == Message.Type.REQUEST) {
                handleMessage(parse);
                return;
            }
            RPCCall rPCCall = this.calls.get(Integer.valueOf(parse.getTxid()));
            if (rPCCall == null) {
                if (parse.getType() != Message.Type.RESPONSE || Duration.between(this.startTime, Instant.now()).getSeconds() <= 120) {
                    if (parse.getType() == Message.Type.ERROR) {
                        handleMessage(parse);
                        return;
                    } else {
                        log.debug("Ignored message: {}", parse);
                        return;
                    }
                }
                log.warn("Cannot find RPC call for {} {}", parse.getType() == Message.Type.RESPONSE ? "response" : "error", Integer.valueOf(parse.getTxid()));
                ErrorMessage errorMessage2 = new ErrorMessage(parse.getMethod(), parse.getTxid(), ErrorCode.ProtocolError.value(), "Received a response message whose transaction ID did not match a pending request or transaction expired");
                errorMessage2.setRemote(parse.getId(), parse.getOrigin());
                sendMessage(errorMessage2);
                return;
            }
            if (rPCCall.getRequest().getRemoteAddress().getAddress().equals(parse.getOrigin().getAddress())) {
                if (this.calls.remove(Integer.valueOf(parse.getTxid()), rPCCall)) {
                    parse.setAssociatedCall(rPCCall);
                    rPCCall.responsed(parse);
                    processCallQueue();
                    handleMessage(parse);
                    return;
                }
                return;
            }
            log.warn("Transaction id matched, socket address did not, ignoring message, request: {} -> response: {}, version: {}", new Object[]{rPCCall.getRequest().getRemoteAddress(), parse.getOrigin(), parse.getReadableVersion()});
            if (parse.getType() == Message.Type.RESPONSE && this.dht.getType() == Network.IPv6) {
                ErrorMessage errorMessage3 = new ErrorMessage(parse.getMethod(), parse.getTxid(), ErrorCode.ProtocolError.value(), "A request was sent to " + rPCCall.getRequest().getRemoteAddress() + " and a response with matching transaction id was received from " + parse.getOrigin() + " . Multihomed nodes should ensure that sockets are properly bound and responses are sent with the correct source socket address. See BEPs 32 and 45.");
                errorMessage3.setRemote(parse.getId(), rPCCall.getRequest().getRemoteAddress());
                sendMessage(errorMessage3);
            }
            rPCCall.responseSocketMismatch();
            rPCCall.stall();
        } catch (CryptoError e) {
            log.warn("Decrypt packet error from {}, ignored.", AddressUtils.toString(inetSocketAddress));
            this.stats.droppedPacket(bArr.length);
            blacklist.observeInvalidMessage(inetSocketAddress);
        } catch (MessageException e2) {
            log.warn("Got a wrong packet from {}, ignored.", AddressUtils.toString(inetSocketAddress));
            this.stats.droppedPacket(bArr.length);
            blacklist.observeInvalidMessage(inetSocketAddress);
        }
    }

    public void handleMessage(Message message) {
        this.dht.onMessage(message);
    }

    Duration age() {
        Instant instant = this.startTime;
        return instant == null ? Duration.ZERO : Duration.between(instant, Instant.now());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ScheduledExecutorService getScheduler() {
        return getNode().getScheduler();
    }

    @Override // io.bosonnetwork.kademlia.NetworkEngine.Selectable
    public SelectableChannel getChannel() {
        return this.channel;
    }

    @Override // io.bosonnetwork.kademlia.NetworkEngine.Selectable
    public void selectEvent(SelectionKey selectionKey) throws IOException {
        if (selectionKey.isValid()) {
            if (selectionKey.isWritable()) {
                this.writeState.set(WRITE_STATE_IDLE);
                getNode().getNetworkEngine().updateInterestOps(this);
                getNode().getScheduler().execute(this::processPipeline);
            } else if (selectionKey.isReadable()) {
                processPackets();
            }
        }
    }

    @Override // io.bosonnetwork.kademlia.NetworkEngine.Selectable
    public void checkState() throws IOException {
        if (!this.channel.isOpen() || this.channel.socket().isClosed()) {
            stop();
        }
    }

    @Override // io.bosonnetwork.kademlia.NetworkEngine.Selectable
    public int interestOps() {
        int i = 1;
        if (this.writeState.get() == 2) {
            i = 1 | 4;
        }
        return i;
    }

    public String toString() {
        Formatter formatter = new Formatter();
        formatter.format("%s @ %s%n", getNode().getId(), AddressUtils.toString(getAddress()));
        formatter.format("rx: %d tx: %d active: %d baseRTT: %d loss: %f  loss (verified): %f uptime: %s%n", Long.valueOf(getNumberOfReceivedMessages()), Long.valueOf(getNumberOfSentMessages()), Integer.valueOf(getNumberOfActiveRPCCalls()), Long.valueOf(this.timeoutSampler.getStallTimeout()), Double.valueOf(this.unverifiedLossrate.getAverage()), Double.valueOf(this.verifiedEntryLossrate.getAverage()), age());
        formatter.format("RTT stats (%dsamples) %s", Long.valueOf(this.timeoutSampler.getSampleCount()), this.timeoutSampler.getStats());
        return formatter.toString();
    }

    static {
        $assertionsDisabled = !RPCServer.class.desiredAssertionStatus();
        writeBuffer = ThreadLocal.withInitial(() -> {
            return ByteBuffer.allocateDirect(1500);
        });
        readBuffer = ThreadLocal.withInitial(() -> {
            return ByteBuffer.allocateDirect(Constants.RECEIVE_BUFFER_SIZE);
        });
        log = LoggerFactory.getLogger(RPCServer.class);
    }
}
