package io.atomix.messaging;

import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.Server;
import io.atomix.catalyst.util.concurrent.Futures;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.messaging.state.MessageBusCommands;
import io.atomix.messaging.state.MessageBusState;
import io.atomix.resource.Resource;
import io.atomix.resource.ResourceTypeInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Function;

@ResourceTypeInfo(id = -30, stateMachine = MessageBusState.class)
/* loaded from: input_file:io/atomix/messaging/DistributedMessageBus.class */
public class DistributedMessageBus extends Resource<DistributedMessageBus, Resource.Options> {
    private Client client;
    private Server server;
    private final Map<Integer, Connection> connections;
    private volatile CompletableFuture<DistributedMessageBus> openFuture;
    private volatile CompletableFuture<Void> closeFuture;
    private final Map<String, RemoteConsumers> remotes;
    private final Map<String, InternalMessageConsumer> consumers;
    private volatile boolean open;

    /* loaded from: input_file:io/atomix/messaging/DistributedMessageBus$InternalMessageConsumer.class */
    private class InternalMessageConsumer<T> implements MessageConsumer<T> {
        private final String topic;
        private Function<T, ?> consumer;

        private InternalMessageConsumer(String str, Function<T, ?> function) {
            this.topic = str;
            this.consumer = function;
        }

        @Override // io.atomix.messaging.MessageConsumer
        public String topic() {
            return this.topic;
        }

        @Override // io.atomix.messaging.MessageConsumer
        public MessageConsumer<T> onMessage(Function<T, ?> function) {
            this.consumer = function;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CompletableFuture<Object> consume(Object obj) {
            Object apply = this.consumer.apply(obj);
            return apply instanceof CompletableFuture ? (CompletableFuture) apply : CompletableFuture.completedFuture(apply);
        }

        @Override // io.atomix.messaging.MessageConsumer
        public CompletableFuture<Void> close() {
            return DistributedMessageBus.this.submit(new MessageBusCommands.Unregister(this.topic));
        }
    }

    /* loaded from: input_file:io/atomix/messaging/DistributedMessageBus$InternalMessageProducer.class */
    private class InternalMessageProducer<T> implements MessageProducer<T> {
        private final String topic;

        private InternalMessageProducer(String str) {
            this.topic = str;
        }

        @Override // io.atomix.messaging.MessageProducer
        public String topic() {
            return this.topic;
        }

        @Override // io.atomix.messaging.MessageProducer
        public <U> CompletableFuture<U> send(T t) {
            return DistributedMessageBus.this.next(this.topic).thenCompose((Function) connection -> {
                return connection == null ? Futures.exceptionalFuture(new IllegalStateException("no handlers")) : connection.send(new Message(this.topic, t));
            });
        }

        @Override // io.atomix.messaging.MessageProducer
        public CompletableFuture<Void> close() {
            return CompletableFuture.completedFuture(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/messaging/DistributedMessageBus$RemoteConsumers.class */
    public static class RemoteConsumers {
        private final List<Address> consumers;
        private Iterator<Address> iterator;

        private RemoteConsumers(Set<Address> set) {
            this.consumers = new ArrayList(set);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void add(Address address) {
            this.consumers.add(address);
            this.iterator = this.consumers.iterator();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean remove(Address address) {
            this.consumers.remove(address);
            return this.consumers.isEmpty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Address next() {
            if (this.consumers.isEmpty()) {
                return null;
            }
            if (this.iterator == null || !this.iterator.hasNext()) {
                this.iterator = this.consumers.iterator();
            }
            return this.iterator.next();
        }
    }

    public DistributedMessageBus(CopycatClient copycatClient, Resource.Options options) {
        super(copycatClient, options);
        this.connections = new HashMap();
        this.remotes = new ConcurrentHashMap();
        this.consumers = new ConcurrentHashMap();
    }

    public synchronized CompletableFuture<DistributedMessageBus> open(Address address) {
        if (this.openFuture != null) {
            return this.openFuture;
        }
        this.client = ((Resource) this).client.transport().client();
        this.server = ((Resource) this).client.transport().server();
        this.openFuture = new CompletableFuture<>();
        ((Resource) this).client.context().execute(() -> {
            this.server.listen(address, this::connectListener).whenComplete((r4, th) -> {
                synchronized (this) {
                    if (th == null) {
                        this.open = true;
                        CompletableFuture<DistributedMessageBus> completableFuture = this.openFuture;
                        if (completableFuture != null) {
                            this.openFuture = null;
                            completableFuture.complete(null);
                        }
                    } else {
                        this.open = false;
                        CompletableFuture<DistributedMessageBus> completableFuture2 = this.openFuture;
                        if (completableFuture2 != null) {
                            this.openFuture = null;
                            completableFuture2.completeExceptionally(th);
                        }
                    }
                }
            });
        });
        return this.openFuture.thenCompose(distributedMessageBus -> {
            CompletableFuture completableFuture = new CompletableFuture();
            submit(new MessageBusCommands.Join(address)).whenComplete((map, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                    return;
                }
                for (Map.Entry entry : map.entrySet()) {
                    this.remotes.put(entry.getKey(), new RemoteConsumers((Set) entry.getValue()));
                }
                ((Resource) this).client.onEvent("register", this::registerConsumer);
                ((Resource) this).client.onEvent("unregister", this::unregisterConsumer);
                completableFuture.complete(null);
            });
            return completableFuture;
        }).thenApply((Function<? super U, ? extends U>) r3 -> {
            return this;
        });
    }

    public boolean isOpen() {
        return this.open;
    }

    private void connectListener(Connection connection) {
        connection.handler(Message.class, this::handleMessage);
    }

    private void registerConsumer(MessageBusCommands.ConsumerInfo consumerInfo) {
        RemoteConsumers remoteConsumers = this.remotes.get(consumerInfo.topic());
        if (remoteConsumers != null) {
            remoteConsumers.add(consumerInfo.address());
        } else {
            this.remotes.put(consumerInfo.topic(), new RemoteConsumers(Collections.singleton(consumerInfo.address())));
        }
    }

    private void unregisterConsumer(MessageBusCommands.ConsumerInfo consumerInfo) {
        RemoteConsumers remoteConsumers = this.remotes.get(consumerInfo.topic());
        if (remoteConsumers == null || !remoteConsumers.remove(consumerInfo.address())) {
            return;
        }
        this.remotes.remove(consumerInfo.topic());
    }

    private CompletableFuture<Void> handleMessage(Message message) {
        InternalMessageConsumer internalMessageConsumer = this.consumers.get(message.topic());
        return internalMessageConsumer == null ? Futures.exceptionalFuture(new IllegalStateException("unknown topic " + message.topic())) : internalMessageConsumer.consume(message.body());
    }

    public <T> CompletableFuture<MessageProducer<T>> producer(String str) {
        return CompletableFuture.completedFuture(new InternalMessageProducer(str));
    }

    public <T> CompletableFuture<MessageConsumer<T>> consumer(String str) {
        return consumer(str, (Function) null);
    }

    public <T> CompletableFuture<MessageConsumer<T>> consumer(String str, Function<T, ?> function) {
        CompletableFuture<MessageConsumer<T>> completableFuture = new CompletableFuture<>();
        submit(new MessageBusCommands.Register(str)).whenComplete((BiConsumer) (r11, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            InternalMessageConsumer internalMessageConsumer = new InternalMessageConsumer(str, function);
            this.consumers.put(str, internalMessageConsumer);
            completableFuture.complete(internalMessageConsumer);
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<Connection> next(String str) {
        Address next;
        RemoteConsumers remoteConsumers = this.remotes.get(str);
        if (remoteConsumers != null && (next = remoteConsumers.next()) != null) {
            return getConnection(next);
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Connection> getConnection(Address address) {
        Connection connection = this.connections.get(Integer.valueOf(address.hashCode()));
        return connection == null ? createConnection(address) : CompletableFuture.completedFuture(connection);
    }

    private CompletableFuture<Connection> createConnection(Address address) {
        return this.client.connect(address).thenApply(connection -> {
            this.connections.put(Integer.valueOf(address.hashCode()), connection);
            connection.closeListener(connection -> {
                this.connections.remove(Integer.valueOf(address.hashCode()));
            });
            return connection;
        });
    }

    public synchronized CompletableFuture<Void> close() {
        if (this.closeFuture != null) {
            return this.closeFuture;
        }
        if (this.server == null) {
            return Futures.exceptionalFuture(new IllegalStateException("message bus not open"));
        }
        this.closeFuture = new CompletableFuture<>();
        ((Resource) this).client.context().execute(() -> {
            this.server.close().whenComplete((r4, th) -> {
                synchronized (this) {
                    this.open = false;
                    if (th == null) {
                        CompletableFuture<Void> completableFuture = this.closeFuture;
                        if (completableFuture != null) {
                            this.closeFuture = null;
                            completableFuture.complete(null);
                        }
                    } else {
                        this.open = false;
                        CompletableFuture<Void> completableFuture2 = this.closeFuture;
                        if (completableFuture2 != null) {
                            this.closeFuture = null;
                            completableFuture2.completeExceptionally(th);
                        }
                    }
                }
            });
        });
        return this.closeFuture;
    }

    public boolean isClosed() {
        return !this.open;
    }
}
