package org.neo4j.causalclustering.messaging;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import org.neo4j.causalclustering.messaging.Inbound;

/* loaded from: input_file:org/neo4j/causalclustering/messaging/TestNetwork.class */
public class TestNetwork<T> {
    private final Map<T, TestNetwork<T>.Inbound> inboundChannels = new HashMap();
    private final Map<T, TestNetwork<T>.Outbound> outboundChannels = new HashMap();
    private final AtomicLong seqGen = new AtomicLong();
    private final BiFunction<T, T, Long> latencySpecMillis;

    /* loaded from: input_file:org/neo4j/causalclustering/messaging/TestNetwork$Inbound.class */
    public class Inbound implements org.neo4j.causalclustering.messaging.Inbound<Message> {
        private Inbound.MessageHandler<Message> handler;
        private TestNetwork<T>.Inbound.NetworkThread networkThread;
        private final BlockingQueue<Message> Q = new ArrayBlockingQueue(64, true);
        private volatile boolean disconnected = false;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/neo4j/causalclustering/messaging/TestNetwork$Inbound$NetworkThread.class */
        public class NetworkThread extends Thread {
            private volatile boolean done = false;

            NetworkThread() {
            }

            public void kill() throws InterruptedException {
                this.done = true;
                interrupt();
                join();
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!this.done) {
                    try {
                        Message message = (Message) Inbound.this.Q.poll(1L, TimeUnit.SECONDS);
                        if (message != null && Inbound.this.handler != null) {
                            Inbound.this.handler.handle(message);
                        }
                    } catch (InterruptedException e) {
                        this.done = true;
                    }
                }
            }
        }

        public Inbound(T t) {
            TestNetwork.this.inboundChannels.put(t, this);
        }

        public void start() {
            this.networkThread = new NetworkThread();
            this.networkThread.start();
        }

        public void stop() throws InterruptedException {
            this.networkThread.kill();
        }

        public synchronized void deliver(Message message) {
            if (this.disconnected) {
                return;
            }
            this.Q.offer(message);
        }

        public void registerHandler(Inbound.MessageHandler<Message> messageHandler) {
            this.handler = messageHandler;
        }

        public void disconnect() {
            this.disconnected = true;
        }

        public void reconnect() {
            this.disconnected = false;
        }
    }

    /* loaded from: input_file:org/neo4j/causalclustering/messaging/TestNetwork$Outbound.class */
    public class Outbound implements org.neo4j.causalclustering.messaging.Outbound<T, Message> {
        private TestNetwork<T>.Outbound.NetworkThread networkThread;
        private volatile boolean disconnected = false;
        private T me;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/neo4j/causalclustering/messaging/TestNetwork$Outbound$NetworkThread.class */
        public class NetworkThread extends Thread {
            private volatile boolean done = false;
            private final TreeSet<TestNetwork<T>.Outbound.NetworkThread.MessageContext> msgQueue = new TreeSet<>((messageContext, messageContext2) -> {
                int compare = Long.compare(messageContext.atMillis, messageContext2.atMillis);
                if (compare == 0 && messageContext != messageContext2) {
                    compare = messageContext.seqNum < messageContext2.seqNum ? -1 : 1;
                }
                return compare;
            });

            /* JADX INFO: Access modifiers changed from: private */
            /* loaded from: input_file:org/neo4j/causalclustering/messaging/TestNetwork$Outbound$NetworkThread$MessageContext.class */
            public class MessageContext {
                private final T destination;
                private final Message message;
                private long atMillis;
                private long seqNum;

                private MessageContext(T t, Message message, long j) {
                    this.destination = t;
                    this.message = message;
                    this.atMillis = j;
                    this.seqNum = TestNetwork.this.seqGen.getAndIncrement();
                }

                public boolean equals(Object obj) {
                    if (this == obj) {
                        return true;
                    }
                    return obj != null && getClass() == obj.getClass() && this.seqNum == ((MessageContext) obj).seqNum;
                }

                public int hashCode() {
                    return Objects.hash(Long.valueOf(this.seqNum));
                }
            }

            NetworkThread() {
            }

            public void kill() throws InterruptedException {
                this.done = true;
                interrupt();
                join();
            }

            public synchronized void scheduleDelivery(T t, Message message, long j) {
                if (Outbound.this.disconnected) {
                    return;
                }
                this.msgQueue.add(new MessageContext(t, message, j));
                notifyAll();
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public synchronized void run() {
                while (!this.done) {
                    long currentTimeMillis = System.currentTimeMillis();
                    Iterator<TestNetwork<T>.Outbound.NetworkThread.MessageContext> it = this.msgQueue.iterator();
                    while (it.hasNext()) {
                        TestNetwork<T>.Outbound.NetworkThread.MessageContext next = it.next();
                        if (((MessageContext) next).atMillis <= currentTimeMillis) {
                            it.remove();
                            Inbound inbound = (Inbound) TestNetwork.this.inboundChannels.get(((MessageContext) next).destination);
                            if (inbound != null) {
                                inbound.deliver(((MessageContext) next).message);
                            }
                        }
                    }
                    try {
                        try {
                            long currentTimeMillis2 = ((MessageContext) this.msgQueue.first()).atMillis - System.currentTimeMillis();
                            if (currentTimeMillis2 > 0) {
                                wait(currentTimeMillis2);
                            }
                        } catch (NoSuchElementException e) {
                            wait(1000L);
                        }
                    } catch (InterruptedException e2) {
                        this.done = true;
                    }
                }
            }
        }

        public Outbound(T t) {
            this.me = t;
            TestNetwork.this.outboundChannels.put(t, this);
        }

        public void start() {
            this.networkThread = new NetworkThread();
            this.networkThread.start();
        }

        public void stop() throws InterruptedException {
            this.networkThread.kill();
        }

        public void send(T t, Message message) {
            doSend(t, message, System.currentTimeMillis());
        }

        private void doSend(T t, Message message, long j) {
            this.networkThread.scheduleDelivery(t, message, j + ((Long) TestNetwork.this.latencySpecMillis.apply(this.me, t)).longValue());
        }

        public void disconnect() {
            this.disconnected = true;
        }

        public void reconnect() {
            this.disconnected = false;
        }
    }

    public TestNetwork(BiFunction<T, T, Long> biFunction) {
        this.latencySpecMillis = biFunction;
    }

    public void disconnect(T t) {
        disconnectOutbound(t);
        disconnectInbound(t);
    }

    public void reconnect(T t) {
        reconnectInbound(t);
        reconnectOutbound(t);
    }

    public void reset() {
        this.inboundChannels.values().forEach((v0) -> {
            v0.reconnect();
        });
        this.outboundChannels.values().forEach((v0) -> {
            v0.reconnect();
        });
    }

    public void disconnectInbound(T t) {
        this.inboundChannels.get(t).disconnect();
    }

    public void reconnectInbound(T t) {
        this.inboundChannels.get(t).reconnect();
    }

    public void disconnectOutbound(T t) {
        this.outboundChannels.get(t).disconnect();
    }

    public void reconnectOutbound(T t) {
        this.outboundChannels.get(t).reconnect();
    }

    public void start() {
        Iterator<TestNetwork<T>.Inbound> it = this.inboundChannels.values().iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        Iterator<TestNetwork<T>.Outbound> it2 = this.outboundChannels.values().iterator();
        while (it2.hasNext()) {
            it2.next().start();
        }
    }

    public void stop() {
        Iterator<TestNetwork<T>.Outbound> it = this.outboundChannels.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().stop();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        Iterator<TestNetwork<T>.Inbound> it2 = this.inboundChannels.values().iterator();
        while (it2.hasNext()) {
            try {
                it2.next().stop();
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
