package io.atomix.cluster.messaging.impl;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.atomix.cluster.ClusterService;
import io.atomix.cluster.Node;
import io.atomix.cluster.NodeId;
import io.atomix.cluster.messaging.ClusterEventingService;
import io.atomix.cluster.messaging.ManagedClusterEventingService;
import io.atomix.cluster.messaging.Subscription;
import io.atomix.messaging.MessagingException;
import io.atomix.messaging.MessagingService;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Serializer;
import io.atomix.utils.time.LogicalTimestamp;
import io.atomix.utils.time.WallClockTimestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventingService.class */
public class DefaultClusterEventingService implements ManagedClusterEventingService {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClusterEventingService.class);
    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder().register(KryoNamespaces.BASIC).register(new Class[]{NodeId.class}).register(new Class[]{LogicalTimestamp.class}).register(new Class[]{WallClockTimestamp.class}).register(new Class[]{InternalSubscriptionInfo.class}).register(new Class[]{InternalMessage.class}).register(new Class[]{InternalMessage.Type.class}).build());
    private static final String GOSSIP_MESSAGE_SUBJECT = "ClusterEventingService-update";
    private static final long GOSSIP_INTERVAL_MILLIS = 1000;
    private static final long TOMBSTONE_EXPIRATION_MILLIS = 60000;
    private final ClusterService clusterService;
    private final MessagingService messagingService;
    private final NodeId localNodeId;
    private ScheduledExecutorService gossipExecutor;
    private final AtomicLong logicalTime = new AtomicLong();
    private final Map<NodeId, Long> updateTimes = Maps.newConcurrentMap();
    private final Map<String, InternalTopic> topics = Maps.newConcurrentMap();
    private final AtomicBoolean started = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventingService$InternalMessage.class */
    public static class InternalMessage {
        private final Type type;
        private final byte[] payload;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventingService$InternalMessage$Type.class */
        public enum Type {
            DIRECT,
            ALL
        }

        InternalMessage(Type type, byte[] bArr) {
            this.type = type;
            this.payload = bArr;
        }

        public Type type() {
            return this.type;
        }

        public byte[] payload() {
            return this.payload;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventingService$InternalSubscriber.class */
    public static class InternalSubscriber implements BiFunction<Address, byte[], CompletableFuture<byte[]>> {
        private final AtomicInteger counter;
        private InternalSubscription[] subscriptions;

        private InternalSubscriber() {
            this.counter = new AtomicInteger();
            this.subscriptions = new InternalSubscription[0];
        }

        List<InternalSubscription> subscriptions() {
            return ImmutableList.copyOf(this.subscriptions);
        }

        private InternalSubscription next() {
            InternalSubscription[] internalSubscriptionArr = this.subscriptions;
            return internalSubscriptionArr[this.counter.incrementAndGet() % internalSubscriptionArr.length];
        }

        @Override // java.util.function.BiFunction
        public CompletableFuture<byte[]> apply(Address address, byte[] bArr) {
            InternalMessage internalMessage = (InternalMessage) DefaultClusterEventingService.SERIALIZER.decode(bArr);
            switch (internalMessage.type()) {
                case DIRECT:
                    return (CompletableFuture) next().callback.apply(internalMessage.payload());
                case ALL:
                default:
                    for (InternalSubscription internalSubscription : this.subscriptions) {
                        internalSubscription.callback.apply(internalMessage.payload());
                    }
                    return CompletableFuture.completedFuture(null);
            }
        }

        void add(InternalSubscription internalSubscription) {
            ArrayList arrayList = new ArrayList(this.subscriptions.length + 1);
            arrayList.addAll(Arrays.asList(this.subscriptions));
            arrayList.add(internalSubscription);
            this.subscriptions = (InternalSubscription[]) arrayList.toArray(new InternalSubscription[arrayList.size()]);
        }

        void remove(InternalSubscription internalSubscription) {
            ArrayList newArrayList = Lists.newArrayList(this.subscriptions);
            newArrayList.remove(internalSubscription);
            this.subscriptions = (InternalSubscription[]) newArrayList.toArray(new InternalSubscription[newArrayList.size()]);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventingService$InternalSubscription.class */
    public class InternalSubscription implements Subscription {
        private final InternalTopic topic;
        private final InternalSubscriptionInfo metadata;
        private final Function<byte[], CompletableFuture<byte[]>> callback;

        public InternalSubscription(InternalTopic internalTopic, Function<byte[], CompletableFuture<byte[]>> function) {
            this.topic = internalTopic;
            this.metadata = new InternalSubscriptionInfo(DefaultClusterEventingService.this.localNodeId, internalTopic.topic, new LogicalTimestamp(DefaultClusterEventingService.this.logicalTime.incrementAndGet()));
            this.callback = function;
        }

        @Override // io.atomix.cluster.messaging.Subscription
        public String topic() {
            return this.metadata.topic();
        }

        @Override // io.atomix.cluster.messaging.Subscription
        public CompletableFuture<Void> close() {
            return this.topic.removeLocalSubscription(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventingService$InternalSubscriptionInfo.class */
    public static class InternalSubscriptionInfo {
        private final NodeId nodeId;
        private final String topic;
        private final LogicalTimestamp logicalTimestamp;
        private final boolean tombstone;
        private final WallClockTimestamp timestamp;

        InternalSubscriptionInfo(NodeId nodeId, String str, LogicalTimestamp logicalTimestamp) {
            this(nodeId, str, logicalTimestamp, false);
        }

        InternalSubscriptionInfo(NodeId nodeId, String str, LogicalTimestamp logicalTimestamp, boolean z) {
            this.timestamp = new WallClockTimestamp();
            this.nodeId = nodeId;
            this.topic = str;
            this.logicalTimestamp = logicalTimestamp;
            this.tombstone = z;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public NodeId nodeId() {
            return this.nodeId;
        }

        String topic() {
            return this.topic;
        }

        LogicalTimestamp logicalTimestamp() {
            return this.logicalTimestamp;
        }

        WallClockTimestamp timestamp() {
            return this.timestamp;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isTombstone() {
            return this.tombstone;
        }

        InternalSubscriptionInfo asTombstone() {
            return new InternalSubscriptionInfo(this.nodeId, this.topic, this.logicalTimestamp, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventingService$InternalTopic.class */
    public class InternalTopic {
        private final String topic;
        private final InternalSubscriber subscribers = new InternalSubscriber();
        private final List<InternalSubscriptionInfo> subscriptions = Lists.newCopyOnWriteArrayList();
        private TopicIterator iterator;

        InternalTopic(String str) {
            this.topic = str;
        }

        InternalSubscriber localSubscriber() {
            return this.subscribers;
        }

        List<InternalSubscriptionInfo> remoteSubscriptions() {
            return this.subscriptions;
        }

        TopicIterator iterator() {
            return this.iterator;
        }

        <M, R> CompletableFuture<Subscription> subscribe(Function<byte[], M> function, Function<M, R> function2, Function<R, byte[]> function3, Executor executor) {
            return addLocalSubscription(new InternalSubscription(this, bArr -> {
                CompletableFuture completableFuture = new CompletableFuture();
                executor.execute(() -> {
                    try {
                        completableFuture.complete(function3.apply(function2.apply(function.apply(bArr))));
                    } catch (Exception e) {
                        completableFuture.completeExceptionally(e);
                    }
                });
                return completableFuture;
            }));
        }

        <M, R> CompletableFuture<Subscription> subscribe(Function<byte[], M> function, Function<M, CompletableFuture<R>> function2, Function<R, byte[]> function3) {
            return addLocalSubscription(new InternalSubscription(this, bArr -> {
                return ((CompletableFuture) function2.apply(function.apply(bArr))).thenApply(function3);
            }));
        }

        <M> CompletableFuture<Subscription> subscribe(Function<byte[], M> function, Consumer<M> consumer, Executor executor) {
            return addLocalSubscription(new InternalSubscription(this, bArr -> {
                executor.execute(() -> {
                    try {
                        consumer.accept(function.apply(bArr));
                    } catch (Exception e) {
                    }
                });
                return CompletableFuture.completedFuture(null);
            }));
        }

        private synchronized CompletableFuture<Subscription> addLocalSubscription(InternalSubscription internalSubscription) {
            this.subscribers.add(internalSubscription);
            this.subscriptions.add(internalSubscription.metadata);
            this.iterator = new TopicIterator(this.subscriptions);
            DefaultClusterEventingService.this.messagingService.registerHandler(internalSubscription.topic(), this.subscribers);
            return DefaultClusterEventingService.this.updateNodes().thenApply(r3 -> {
                return internalSubscription;
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized CompletableFuture<Void> removeLocalSubscription(InternalSubscription internalSubscription) {
            this.subscribers.remove(internalSubscription);
            this.subscriptions.remove(internalSubscription.metadata);
            this.subscriptions.add(internalSubscription.metadata.asTombstone());
            this.iterator = new TopicIterator(this.subscriptions);
            if (this.subscriptions.stream().filter(internalSubscriptionInfo -> {
                return internalSubscriptionInfo.isTombstone();
            }).count() == 0) {
                DefaultClusterEventingService.this.messagingService.unregisterHandler(internalSubscription.topic());
            }
            return DefaultClusterEventingService.this.updateNodes();
        }

        synchronized void addRemoteSubscription(InternalSubscriptionInfo internalSubscriptionInfo) {
            this.subscriptions.add(internalSubscriptionInfo);
            this.iterator = new TopicIterator(this.subscriptions);
        }

        synchronized void removeRemoteSubscription(InternalSubscriptionInfo internalSubscriptionInfo) {
            this.subscriptions.remove(internalSubscriptionInfo);
            this.subscriptions.add(internalSubscriptionInfo);
            this.iterator = new TopicIterator(this.subscriptions);
        }

        synchronized void purgeTombstones(long j) {
            int size = this.subscriptions.size();
            this.subscriptions.removeIf(internalSubscriptionInfo -> {
                return internalSubscriptionInfo.isTombstone() && internalSubscriptionInfo.timestamp().unixTimestamp() < j;
            });
            if (this.subscriptions.size() != size) {
                this.iterator = new TopicIterator(this.subscriptions);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventingService$TopicIterator.class */
    public static class TopicIterator implements Iterator<InternalSubscriptionInfo> {
        private final AtomicInteger counter = new AtomicInteger();
        private final InternalSubscriptionInfo[] subscribers;

        TopicIterator(List<InternalSubscriptionInfo> list) {
            List list2 = (List) list.stream().filter(internalSubscriptionInfo -> {
                return !internalSubscriptionInfo.isTombstone();
            }).collect(Collectors.toList());
            Collections.reverse(list2);
            this.subscribers = (InternalSubscriptionInfo[]) list2.toArray(new InternalSubscriptionInfo[list2.size()]);
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.subscribers.length > 0;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public InternalSubscriptionInfo next() {
            return this.subscribers[Math.abs(this.counter.incrementAndGet() % this.subscribers.length)];
        }
    }

    public DefaultClusterEventingService(ClusterService clusterService, MessagingService messagingService) {
        this.clusterService = clusterService;
        this.messagingService = messagingService;
        this.localNodeId = clusterService.getLocalNode().id();
    }

    @Override // io.atomix.cluster.messaging.ClusterEventingService
    public <M> void broadcast(String str, M m, Function<M, byte[]> function) {
        byte[] encode = SERIALIZER.encode(new InternalMessage(InternalMessage.Type.ALL, function.apply(m)));
        getSubscriberNodes(str).forEach(nodeId -> {
            Node node = this.clusterService.getNode(nodeId);
            if (node == null || node.getState() != Node.State.ACTIVE) {
                return;
            }
            this.messagingService.sendAsync(node.address(), str, encode);
        });
    }

    @Override // io.atomix.cluster.messaging.ClusterEventingService
    public <M> CompletableFuture<Void> unicast(String str, M m, Function<M, byte[]> function) {
        Node node;
        NodeId nextNodeId = getNextNodeId(str);
        return (nextNodeId == null || (node = this.clusterService.getNode(nextNodeId)) == null || node.getState() != Node.State.ACTIVE) ? CompletableFuture.completedFuture(null) : this.messagingService.sendAsync(node.address(), str, SERIALIZER.encode(new InternalMessage(InternalMessage.Type.DIRECT, function.apply(m))));
    }

    @Override // io.atomix.cluster.messaging.ClusterEventingService
    public <M, R> CompletableFuture<R> send(String str, M m, Function<M, byte[]> function, Function<byte[], R> function2, Duration duration) {
        Node node;
        NodeId nextNodeId = getNextNodeId(str);
        return (nextNodeId == null || (node = this.clusterService.getNode(nextNodeId)) == null || node.getState() != Node.State.ACTIVE) ? Futures.exceptionalFuture(new MessagingException.NoRemoteHandler()) : this.messagingService.sendAndReceive(node.address(), str, SERIALIZER.encode(new InternalMessage(InternalMessage.Type.DIRECT, function.apply(m))), duration).thenApply((Function) function2);
    }

    private Stream<NodeId> getSubscriberNodes(String str) {
        InternalTopic internalTopic = this.topics.get(str);
        return internalTopic == null ? Stream.empty() : internalTopic.remoteSubscriptions().stream().filter(internalSubscriptionInfo -> {
            return !internalSubscriptionInfo.isTombstone();
        }).map(internalSubscriptionInfo2 -> {
            return internalSubscriptionInfo2.nodeId();
        }).distinct();
    }

    private NodeId getNextNodeId(String str) {
        InternalTopic internalTopic = this.topics.get(str);
        if (internalTopic == null) {
            return null;
        }
        TopicIterator it = internalTopic.iterator();
        if (it.hasNext()) {
            return it.next().nodeId();
        }
        return null;
    }

    @Override // io.atomix.cluster.messaging.ClusterEventingService
    public <M, R> CompletableFuture<Subscription> subscribe(String str, Function<byte[], M> function, Function<M, R> function2, Function<R, byte[]> function3, Executor executor) {
        return this.topics.computeIfAbsent(str, str2 -> {
            return new InternalTopic(str);
        }).subscribe(function, function2, function3, executor);
    }

    @Override // io.atomix.cluster.messaging.ClusterEventingService
    public <M, R> CompletableFuture<Subscription> subscribe(String str, Function<byte[], M> function, Function<M, CompletableFuture<R>> function2, Function<R, byte[]> function3) {
        return this.topics.computeIfAbsent(str, str2 -> {
            return new InternalTopic(str);
        }).subscribe(function, function2, function3);
    }

    @Override // io.atomix.cluster.messaging.ClusterEventingService
    public <M> CompletableFuture<Subscription> subscribe(String str, Function<byte[], M> function, Consumer<M> consumer, Executor executor) {
        return this.topics.computeIfAbsent(str, str2 -> {
            return new InternalTopic(str);
        }).subscribe(function, consumer, executor);
    }

    @Override // io.atomix.cluster.messaging.ClusterEventingService
    public List<Subscription> getSubscriptions(String str) {
        InternalTopic internalTopic = this.topics.get(str);
        return internalTopic == null ? ImmutableList.of() : ImmutableList.copyOf(internalTopic.localSubscriber().subscriptions());
    }

    private void update(Collection<InternalSubscriptionInfo> collection) {
        for (InternalSubscriptionInfo internalSubscriptionInfo : collection) {
            InternalTopic computeIfAbsent = this.topics.computeIfAbsent(internalSubscriptionInfo.topic, str -> {
                return new InternalTopic(str);
            });
            if (computeIfAbsent.remoteSubscriptions().stream().filter(internalSubscriptionInfo2 -> {
                return internalSubscriptionInfo2.nodeId().equals(internalSubscriptionInfo.nodeId()) && internalSubscriptionInfo2.logicalTimestamp().equals(internalSubscriptionInfo.logicalTimestamp());
            }).findFirst().orElse(null) == null) {
                computeIfAbsent.addRemoteSubscription(internalSubscriptionInfo);
            } else if (internalSubscriptionInfo.isTombstone()) {
                computeIfAbsent.removeRemoteSubscription(internalSubscriptionInfo);
            }
        }
    }

    private void gossip() {
        List list = (List) this.clusterService.getNodes().stream().filter(node -> {
            return !this.localNodeId.equals(node.id());
        }).filter(node2 -> {
            return node2.getState() == Node.State.ACTIVE;
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        Collections.shuffle(list);
        updateNode((Node) list.get(0));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<Void> updateNodes() {
        List list = (List) this.clusterService.getNodes().stream().filter(node -> {
            return !this.localNodeId.equals(node.id());
        }).map(this::updateNode).collect(Collectors.toList());
        return CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[list.size()]));
    }

    private CompletableFuture<Void> updateNode(Node node) {
        long currentTimeMillis = System.currentTimeMillis();
        long longValue = this.updateTimes.getOrDefault(node.id(), 0L).longValue();
        Collection collection = (Collection) this.topics.values().stream().flatMap(internalTopic -> {
            return internalTopic.remoteSubscriptions().stream().filter(internalSubscriptionInfo -> {
                return internalSubscriptionInfo.timestamp().unixTimestamp() >= longValue;
            });
        }).collect(Collectors.toList());
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.messagingService.sendAndReceive(node.address(), GOSSIP_MESSAGE_SUBJECT, SERIALIZER.encode(collection)).whenComplete((bArr, th) -> {
            if (th == null) {
                this.updateTimes.put(node.id(), Long.valueOf(currentTimeMillis));
            }
            completableFuture.complete(null);
        });
        return completableFuture;
    }

    private void purgeTombstones() {
        long longValue = ((Long) this.clusterService.getNodes().stream().map(node -> {
            return this.updateTimes.getOrDefault(node.id(), 0L);
        }).reduce((v0, v1) -> {
            return Math.min(v0, v1);
        }).orElse(0L)).longValue();
        Iterator<InternalTopic> it = this.topics.values().iterator();
        while (it.hasNext()) {
            it.next().purgeTombstones(longValue);
        }
    }

    public CompletableFuture<ClusterEventingService> start() {
        if (this.started.compareAndSet(false, true)) {
            this.gossipExecutor = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads("atomix-cluster-event-executor-%d", LOGGER));
            this.gossipExecutor.scheduleAtFixedRate(this::gossip, GOSSIP_INTERVAL_MILLIS, GOSSIP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
            this.gossipExecutor.scheduleAtFixedRate(this::purgeTombstones, TOMBSTONE_EXPIRATION_MILLIS, TOMBSTONE_EXPIRATION_MILLIS, TimeUnit.MILLISECONDS);
            this.messagingService.registerHandler(GOSSIP_MESSAGE_SUBJECT, (address, bArr) -> {
                update((Collection) SERIALIZER.decode(bArr));
                return new byte[0];
            }, this.gossipExecutor);
            LOGGER.info("Started");
        }
        return CompletableFuture.completedFuture(this);
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<Void> stop() {
        if (this.started.compareAndSet(true, false)) {
            if (this.gossipExecutor != null) {
                this.gossipExecutor.shutdown();
            }
            LOGGER.info("Stopped");
        }
        return CompletableFuture.completedFuture(null);
    }
}
