package io.atomix.messaging;

import io.atomix.catalyst.util.Listener;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.messaging.state.TopicCommands;
import io.atomix.messaging.state.TopicState;
import io.atomix.resource.Resource;
import io.atomix.resource.ResourceTypeInfo;
import io.atomix.resource.WriteConsistency;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

@ResourceTypeInfo(id = -31, stateMachine = TopicState.class, typeResolver = TopicCommands.TypeResolver.class)
/* loaded from: input_file:io/atomix/messaging/DistributedTopic.class */
public class DistributedTopic<T> extends Resource<DistributedTopic<T>> {
    private final Set<Consumer<T>> listeners;

    /* loaded from: input_file:io/atomix/messaging/DistributedTopic$TopicListener.class */
    private class TopicListener implements Listener<T> {
        private final Consumer<T> listener;

        private TopicListener(Consumer<T> consumer) {
            this.listener = consumer;
        }

        public void accept(T t) {
            this.listener.accept(t);
        }

        public void close() {
            synchronized (DistributedTopic.this) {
                DistributedTopic.this.listeners.remove(this.listener);
                if (DistributedTopic.this.listeners.isEmpty()) {
                    DistributedTopic.this.submit(new TopicCommands.Unlisten());
                }
            }
        }
    }

    public static Resource.Options options() {
        return new Resource.Options();
    }

    public static Resource.Config config() {
        return new Resource.Config();
    }

    public DistributedTopic(CopycatClient copycatClient, Resource.Options options) {
        super(copycatClient, options);
        this.listeners = new HashSet();
    }

    public CompletableFuture<DistributedTopic<T>> open() {
        return super.open().thenApply(distributedTopic -> {
            this.client.onEvent("message", obj -> {
                Iterator<Consumer<T>> it = this.listeners.iterator();
                while (it.hasNext()) {
                    it.next().accept(obj);
                }
            });
            return distributedTopic;
        });
    }

    public DistributedTopic<T> sync() {
        return (DistributedTopic) with(WriteConsistency.ATOMIC);
    }

    public DistributedTopic<T> async() {
        return (DistributedTopic) with(WriteConsistency.SEQUENTIAL_EVENT);
    }

    public CompletableFuture<Void> publish(T t) {
        return submit(new TopicCommands.Publish(t));
    }

    public CompletableFuture<Listener<T>> subscribe(Consumer<T> consumer) {
        if (this.listeners.isEmpty()) {
            this.listeners.add(consumer);
            return submit(new TopicCommands.Listen()).thenApply(r8 -> {
                return new TopicListener(consumer);
            });
        }
        this.listeners.add(consumer);
        return CompletableFuture.completedFuture(new TopicListener(consumer));
    }
}
