package io.atomix.cluster.messaging.impl;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.ManagedClusterEventService;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.Subscription;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.FallbackNamespace;
import io.atomix.utils.serializer.NamespaceImpl;
import io.atomix.utils.serializer.Namespaces;
import io.atomix.utils.serializer.Serializer;
import io.atomix.utils.time.LogicalTimestamp;
import io.atomix.utils.time.WallClockTimestamp;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventService.class */
public class DefaultClusterEventService implements ManagedClusterEventService, ClusterMembershipEventListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClusterEventService.class);
    private static final Serializer SERIALIZER = Serializer.using(new FallbackNamespace(new NamespaceImpl.Builder().register(Namespaces.BASIC).register(new Class[]{MemberId.class}).register(new Class[]{LogicalTimestamp.class}).register(new Class[]{WallClockTimestamp.class})));
    private static final String SUBSCRIPTION_PROPERTY_NAME = "event-service-topics-subscribed";
    private final ClusterMembershipService membershipService;
    private final MessagingService messagingService;
    private final MemberId localMemberId;
    private final Map<String, InternalTopic> topics = Maps.newConcurrentMap();
    private final Map<MemberId, Set<String>> remoteMemberSubscriptions = Maps.newConcurrentMap();
    private final AtomicBoolean started = new AtomicBoolean();
    private ScheduledExecutorService eventServiceExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventService$InternalSubscriber.class */
    public static class InternalSubscriber implements BiFunction<Address, byte[], CompletableFuture<byte[]>> {
        private final List<InternalSubscription> subscriptions = new CopyOnWriteArrayList();

        private InternalSubscriber() {
        }

        List<InternalSubscription> subscriptions() {
            return ImmutableList.copyOf(this.subscriptions);
        }

        @Override // java.util.function.BiFunction
        public CompletableFuture<byte[]> apply(Address address, byte[] bArr) {
            Iterator<InternalSubscription> it = this.subscriptions.iterator();
            while (it.hasNext()) {
                it.next().callback.apply(bArr);
            }
            return CompletableFuture.completedFuture(null);
        }

        void add(InternalSubscription internalSubscription) {
            this.subscriptions.add(internalSubscription);
        }

        void remove(InternalSubscription internalSubscription) {
            this.subscriptions.remove(internalSubscription);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventService$InternalSubscription.class */
    public class InternalSubscription implements Subscription {
        private final InternalTopic topic;
        private final Function<byte[], CompletableFuture<byte[]>> callback;

        InternalSubscription(InternalTopic internalTopic, Function<byte[], CompletableFuture<byte[]>> function) {
            this.topic = internalTopic;
            this.callback = function;
        }

        @Override // io.atomix.cluster.messaging.Subscription
        public String topic() {
            return this.topic.topic;
        }

        @Override // io.atomix.cluster.messaging.Subscription
        public CompletableFuture<Void> close() {
            return this.topic.removeLocalSubscription(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventService$InternalTopic.class */
    public class InternalTopic {
        private final String topic;
        private final InternalSubscriber localSubscribers = new InternalSubscriber();
        private final Set<MemberId> subscriptions = Sets.newCopyOnWriteArraySet();

        InternalTopic(String str) {
            this.topic = str;
        }

        InternalSubscriber localSubscriber() {
            return this.localSubscribers;
        }

        Set<MemberId> remoteSubscriptions() {
            return this.subscriptions;
        }

        <M, R> CompletableFuture<Subscription> subscribe(Function<byte[], M> function, Function<M, R> function2, Function<R, byte[]> function3, Executor executor) {
            return addLocalSubscription(new InternalSubscription(this, bArr -> {
                CompletableFuture completableFuture = new CompletableFuture();
                executor.execute(() -> {
                    try {
                        completableFuture.complete((byte[]) function3.apply(function2.apply(function.apply(bArr))));
                    } catch (Exception e) {
                        completableFuture.completeExceptionally(e);
                    }
                });
                return completableFuture;
            }));
        }

        <M, R> CompletableFuture<Subscription> subscribe(Function<byte[], M> function, Function<M, CompletableFuture<R>> function2, Function<R, byte[]> function3) {
            return addLocalSubscription(new InternalSubscription(this, bArr -> {
                return ((CompletableFuture) function2.apply(function.apply(bArr))).thenApply(function3);
            }));
        }

        <M> CompletableFuture<Subscription> subscribe(Function<byte[], M> function, Consumer<M> consumer, Executor executor) {
            return addLocalSubscription(new InternalSubscription(this, bArr -> {
                executor.execute(() -> {
                    try {
                        Object apply = function.apply(bArr);
                        try {
                            consumer.accept(apply);
                        } catch (RuntimeException e) {
                            DefaultClusterEventService.LOGGER.error("Failed to handle message {} for topic {}", new Object[]{apply, this.topic, e});
                        }
                    } catch (RuntimeException e2) {
                        DefaultClusterEventService.LOGGER.error("Failed to decode message payload for topic {}", this.topic, e2);
                    }
                });
                return CompletableFuture.completedFuture(null);
            }));
        }

        private synchronized CompletableFuture<Subscription> addLocalSubscription(InternalSubscription internalSubscription) {
            if (this.localSubscribers.subscriptions.isEmpty()) {
                DefaultClusterEventService.this.messagingService.registerHandler(internalSubscription.topic(), this.localSubscribers);
            }
            this.localSubscribers.add(internalSubscription);
            this.subscriptions.add(DefaultClusterEventService.this.localMemberId);
            return DefaultClusterEventService.this.updateNodes().thenApply(r3 -> {
                return internalSubscription;
            });
        }

        private synchronized CompletableFuture<Void> removeLocalSubscription(InternalSubscription internalSubscription) {
            this.localSubscribers.remove(internalSubscription);
            if (this.localSubscribers.subscriptions.isEmpty()) {
                this.subscriptions.remove(DefaultClusterEventService.this.localMemberId);
                DefaultClusterEventService.this.messagingService.unregisterHandler(internalSubscription.topic());
            }
            return DefaultClusterEventService.this.updateNodes();
        }

        void addRemoteSubscription(MemberId memberId) {
            this.subscriptions.add(memberId);
        }

        void removeRemoteSubscription(MemberId memberId) {
            this.subscriptions.remove(memberId);
        }
    }

    public DefaultClusterEventService(ClusterMembershipService clusterMembershipService, MessagingService messagingService) {
        this.membershipService = clusterMembershipService;
        this.messagingService = messagingService;
        this.localMemberId = clusterMembershipService.getLocalMember().id();
    }

    @Override // io.atomix.cluster.messaging.ClusterEventService
    public <M> void broadcast(String str, M m, Function<M, byte[]> function) {
        byte[] apply = function.apply(m);
        getSubscriberNodes(str).forEach(memberId -> {
            Member member = this.membershipService.getMember(memberId);
            if (member == null || !member.isReachable()) {
                return;
            }
            this.messagingService.sendAsync(member.address(), str, apply);
        });
    }

    @Override // io.atomix.cluster.messaging.ClusterEventService
    public <M, R> CompletableFuture<Subscription> subscribe(String str, Function<byte[], M> function, Function<M, R> function2, Function<R, byte[]> function3, Executor executor) {
        return this.topics.computeIfAbsent(str, str2 -> {
            return new InternalTopic(str);
        }).subscribe(function, function2, function3, executor);
    }

    @Override // io.atomix.cluster.messaging.ClusterEventService
    public <M, R> CompletableFuture<Subscription> subscribe(String str, Function<byte[], M> function, Function<M, CompletableFuture<R>> function2, Function<R, byte[]> function3) {
        return this.topics.computeIfAbsent(str, str2 -> {
            return new InternalTopic(str);
        }).subscribe(function, function2, function3);
    }

    @Override // io.atomix.cluster.messaging.ClusterEventService
    public <M> CompletableFuture<Subscription> subscribe(String str, Function<byte[], M> function, Consumer<M> consumer, Executor executor) {
        return this.topics.computeIfAbsent(str, str2 -> {
            return new InternalTopic(str);
        }).subscribe(function, consumer, executor);
    }

    @Override // io.atomix.cluster.messaging.ClusterEventService
    public List<Subscription> getSubscriptions(String str) {
        InternalTopic internalTopic = this.topics.get(str);
        return internalTopic == null ? ImmutableList.of() : ImmutableList.copyOf(internalTopic.localSubscriber().subscriptions());
    }

    @Override // io.atomix.cluster.messaging.ClusterEventService
    public Set<MemberId> getSubscribers(String str) {
        InternalTopic internalTopic = this.topics.get(str);
        return internalTopic == null ? Set.of() : internalTopic.remoteSubscriptions();
    }

    private Stream<MemberId> getSubscriberNodes(String str) {
        return getSubscribers(str).stream();
    }

    private CompletableFuture<Void> updateNodes() {
        this.membershipService.getLocalMember().properties().setProperty(SUBSCRIPTION_PROPERTY_NAME, topicsAsString(new HashSet(this.topics.keySet())));
        return CompletableFuture.completedFuture(null);
    }

    private String topicsAsString(Set<String> set) {
        return new String(Base64.getEncoder().encode(SERIALIZER.encode(set)), StandardCharsets.UTF_8);
    }

    private Set<String> topicsFromString(String str) {
        return (Set) SERIALIZER.decode(Base64.getDecoder().decode(str.getBytes(StandardCharsets.UTF_8)));
    }

    public CompletableFuture<ClusterEventService> start() {
        if (this.started.compareAndSet(false, true)) {
            this.eventServiceExecutor = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads("atomix-cluster-event-executor-%d", LOGGER));
            this.membershipService.addListener(this);
            this.membershipService.getMembers().forEach(member -> {
                event(new ClusterMembershipEvent(ClusterMembershipEvent.Type.MEMBER_ADDED, member));
            });
            LOGGER.info("Started");
        }
        return CompletableFuture.completedFuture(this);
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<Void> stop() {
        if (this.started.compareAndSet(true, false)) {
            if (this.eventServiceExecutor != null) {
                this.eventServiceExecutor.shutdown();
            }
            LOGGER.info("Stopped");
        }
        return CompletableFuture.completedFuture(null);
    }

    public void event(ClusterMembershipEvent clusterMembershipEvent) {
        this.eventServiceExecutor.execute(() -> {
            switch ((ClusterMembershipEvent.Type) clusterMembershipEvent.type()) {
                case MEMBER_ADDED:
                    updateRemoteSubscription(clusterMembershipEvent);
                    return;
                case METADATA_CHANGED:
                    updateRemoteSubscription(clusterMembershipEvent);
                    return;
                case REACHABILITY_CHANGED:
                    return;
                case MEMBER_REMOVED:
                    removeAllSubscription(((Member) clusterMembershipEvent.subject()).id());
                    return;
                default:
                    LOGGER.warn("Unexpected membership event type {} from {}", clusterMembershipEvent.type(), clusterMembershipEvent.subject());
                    return;
            }
        });
    }

    private void removeAllSubscription(MemberId memberId) {
        Set<String> remove = this.remoteMemberSubscriptions.remove(memberId);
        if (remove != null) {
            remove.forEach(str -> {
                this.topics.get(str).removeRemoteSubscription(memberId);
            });
        }
    }

    private void updateRemoteSubscription(ClusterMembershipEvent clusterMembershipEvent) {
        String property = ((Member) clusterMembershipEvent.subject()).properties().getProperty(SUBSCRIPTION_PROPERTY_NAME);
        if (property == null) {
            removeAllSubscription(((Member) clusterMembershipEvent.subject()).id());
            return;
        }
        Set<String> set = topicsFromString(property);
        set.forEach(str -> {
            this.topics.computeIfAbsent(str, str -> {
                return new InternalTopic(str);
            }).addRemoteSubscription(((Member) clusterMembershipEvent.subject()).id());
        });
        this.remoteMemberSubscriptions.put(((Member) clusterMembershipEvent.subject()).id(), set);
    }
}
