/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.jgroups.commandhandling;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.CommandBusConnector;
import org.axonframework.commandhandling.distributed.CommandBusConnectorCommunicationException;
import org.axonframework.commandhandling.distributed.CommandCallbackRepository;
import org.axonframework.commandhandling.distributed.CommandCallbackWrapper;
import org.axonframework.commandhandling.distributed.CommandRouter;
import org.axonframework.commandhandling.distributed.ConsistentHash;
import org.axonframework.commandhandling.distributed.ConsistentHashChangeListener;
import org.axonframework.commandhandling.distributed.Member;
import org.axonframework.commandhandling.distributed.RoutingStrategy;
import org.axonframework.commandhandling.distributed.ServiceRegistryException;
import org.axonframework.commandhandling.distributed.SimpleMember;
import org.axonframework.commandhandling.distributed.commandfilter.DenyAll;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.Registration;
import org.axonframework.jgroups.commandhandling.ConnectionFailedException;
import org.axonframework.jgroups.commandhandling.JGroupsDispatchMessage;
import org.axonframework.jgroups.commandhandling.JGroupsReplyMessage;
import org.axonframework.jgroups.commandhandling.JoinMessage;
import org.axonframework.jgroups.commandhandling.MembershipUpdateFailedException;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.serialization.Serializer;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.View;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JGroupsConnector
implements CommandRouter,
Receiver,
CommandBusConnector {
    private static final Logger logger = LoggerFactory.getLogger(JGroupsConnector.class);
    private static final boolean LOCAL_MEMBER = true;
    private static final boolean NON_LOCAL_MEMBER = false;
    private final Object monitor = new Object();
    private final CommandBus localSegment;
    private final CommandCallbackRepository<Address> callbackRepository = new CommandCallbackRepository();
    private final Serializer serializer;
    private final JoinCondition joinedCondition = new JoinCondition();
    private final Map<Address, VersionedMember> members = new ConcurrentHashMap<Address, VersionedMember>();
    private final String clusterName;
    private final RoutingStrategy routingStrategy;
    private final ConsistentHashChangeListener consistentHashChangeListener;
    private final JChannel channel;
    private final AtomicReference<ConsistentHash> consistentHash = new AtomicReference<ConsistentHash>(new ConsistentHash());
    private final AtomicInteger membershipVersion = new AtomicInteger(0);
    private volatile View currentView;
    private volatile int loadFactor = 0;
    private volatile Predicate<? super CommandMessage<?>> commandFilter = DenyAll.INSTANCE;
    private ExecutorService executorService;
    private final boolean executorProvided;

    public JGroupsConnector(CommandBus localSegment, JChannel channel, String clusterName, Serializer serializer) {
        this(localSegment, channel, clusterName, serializer, (RoutingStrategy)new AnnotationRoutingStrategy());
    }

    public JGroupsConnector(CommandBus localSegment, JChannel channel, String clusterName, Serializer serializer, RoutingStrategy routingStrategy) {
        this(localSegment, channel, clusterName, serializer, routingStrategy, ch -> {});
    }

    public JGroupsConnector(CommandBus localSegment, JChannel channel, String clusterName, Serializer serializer, RoutingStrategy routingStrategy, ConsistentHashChangeListener consistentHashChangeListener) {
        this(localSegment, channel, clusterName, serializer, routingStrategy, consistentHashChangeListener, null);
    }

    public JGroupsConnector(CommandBus localSegment, JChannel channel, String clusterName, Serializer serializer, RoutingStrategy routingStrategy, ConsistentHashChangeListener consistentHashChangeListener, ExecutorService executorService) {
        this.localSegment = localSegment;
        this.serializer = serializer;
        this.channel = channel;
        this.clusterName = clusterName;
        this.routingStrategy = routingStrategy;
        this.consistentHashChangeListener = consistentHashChangeListener;
        if (executorService == null) {
            this.executorProvided = false;
        } else {
            this.executorService = executorService;
            this.executorProvided = true;
        }
    }

    public void updateMembership(int loadFactor, Predicate<? super CommandMessage<?>> commandFilter) {
        this.loadFactor = loadFactor;
        this.commandFilter = commandFilter;
        this.broadCastMembership(this.membershipVersion.getAndIncrement(), false);
    }

    protected void broadCastMembership(int updateVersion, boolean expectReply) throws ServiceRegistryException {
        try {
            if (this.channel.isConnected()) {
                Address localAddress = this.channel.getAddress();
                logger.info("Broadcasting membership from {}", (Object)localAddress);
                this.sendMyConfigurationTo(null, expectReply, updateVersion);
            }
        }
        catch (Exception e) {
            throw new ServiceRegistryException("Could not broadcast local membership details to the cluster", (Throwable)e);
        }
    }

    public void connect() throws Exception {
        if (this.channel.getClusterName() != null && !this.clusterName.equals(this.channel.getClusterName())) {
            throw new ConnectionFailedException("Already joined cluster: " + this.channel.getClusterName());
        }
        this.channel.setReceiver((Receiver)this);
        this.channel.connect(this.clusterName);
        Address localAddress = this.channel.getAddress();
        String localName = localAddress.toString();
        SimpleMember localMember = new SimpleMember(localName, (Object)localAddress, true, null);
        this.members.put(localAddress, new VersionedMember((SimpleMember<Address>)localMember, this.membershipVersion.getAndIncrement()));
        this.updateConsistentHash(ch -> ch.with((Member)localMember, this.loadFactor, this.commandFilter));
        if (!this.executorProvided) {
            this.executorService = Executors.newCachedThreadPool((ThreadFactory)new AxonThreadFactory("JGroupsConnector - " + localName));
        }
    }

    public void disconnect() {
        this.channel.disconnect();
        if (!this.executorProvided) {
            this.executorService.shutdown();
        }
    }

    public void getState(OutputStream ostream) {
    }

    public void setState(InputStream istream) {
    }

    public synchronized void viewAccepted(View view) {
        if (this.currentView == null) {
            this.currentView = view;
            logger.info("Local segment ({}) joined the cluster. Broadcasting configuration.", (Object)this.channel.getAddress());
            try {
                this.broadCastMembership(this.membershipVersion.get(), true);
                this.joinedCondition.markJoined();
            }
            catch (Exception e) {
                throw new MembershipUpdateFailedException("Failed to broadcast my settings", e);
            }
        } else if (!view.equals((Object)this.currentView)) {
            Address[][] diff = View.diff((View)this.currentView, (View)view);
            Address[] joined = diff[0];
            Address[] left = diff[1];
            this.currentView = view;
            Address localAddress = this.channel.getAddress();
            Arrays.stream(left).forEach(lm -> this.updateConsistentHash(ch -> {
                VersionedMember member = this.members.get(lm);
                if (member == null) {
                    return ch;
                }
                return ch.without((Member)member);
            }));
            Arrays.stream(left).forEach(this.members::remove);
            Arrays.stream(joined).filter(member -> !member.equals(localAddress)).forEach(member -> this.sendMyConfigurationTo((Address)member, true, this.membershipVersion.get()));
        }
        this.currentView = view;
    }

    public void suspect(Address suspected_mbr) {
        logger.warn("Member is suspect: {}", (Object)suspected_mbr.toString());
    }

    public void block() {
    }

    public void unblock() {
    }

    public void receive(Message msg) {
        this.executorService.execute(() -> {
            Object message = msg.getObject();
            if (message instanceof JoinMessage) {
                this.processJoinMessage(msg, (JoinMessage)message);
            } else if (message instanceof JGroupsDispatchMessage) {
                this.processDispatchMessage(msg, (JGroupsDispatchMessage)message);
            } else if (message instanceof JGroupsReplyMessage) {
                this.processReplyMessage((JGroupsReplyMessage)message);
            } else {
                logger.warn("Received unknown message: " + message.getClass().getName());
            }
        });
    }

    private void processReplyMessage(JGroupsReplyMessage message) {
        CommandCallbackWrapper callbackWrapper = this.callbackRepository.fetchAndRemove(message.getCommandIdentifier());
        if (callbackWrapper == null) {
            logger.warn("Received a callback for a message that has either already received a callback, or which was not sent through this node. Ignoring.");
        } else if (message.isSuccess()) {
            callbackWrapper.success(message.getReturnValue(this.serializer));
        } else {
            Throwable exception = (Throwable)ObjectUtils.getOrDefault((Object)message.getError(this.serializer), (Object)new IllegalStateException(String.format("Unknown execution failure for command [%s]", message.getCommandIdentifier())));
            callbackWrapper.fail(exception);
        }
    }

    private <C, R> void processDispatchMessage(final Message msg, final JGroupsDispatchMessage message) {
        if (message.isExpectReply()) {
            try {
                CommandMessage commandMessage = message.getCommandMessage(this.serializer);
                this.localSegment.dispatch(commandMessage, new CommandCallback<C, R>(){

                    public void onSuccess(CommandMessage<? extends C> commandMessage, R result) {
                        JGroupsConnector.this.sendReply(msg.getSrc(), message.getCommandIdentifier(), result, null);
                    }

                    public void onFailure(CommandMessage<? extends C> commandMessage, Throwable cause) {
                        JGroupsConnector.this.sendReply(msg.getSrc(), message.getCommandIdentifier(), null, cause);
                    }
                });
            }
            catch (Exception e) {
                this.sendReply(msg.getSrc(), message.getCommandIdentifier(), null, e);
            }
        } else {
            try {
                this.localSegment.dispatch(message.getCommandMessage(this.serializer));
            }
            catch (Exception e) {
                logger.error("Could not dispatch command", (Throwable)e);
            }
        }
    }

    private <R> void sendReply(Address address, String commandIdentifier, R result, Throwable cause) {
        JGroupsReplyMessage reply;
        boolean success = cause == null;
        try {
            reply = new JGroupsReplyMessage(commandIdentifier, success, success ? result : cause, this.serializer);
        }
        catch (Exception e) {
            logger.warn(String.format("Could not serialize command reply [%s]. Sending back NULL.", success ? result : cause), (Throwable)e);
            reply = new JGroupsReplyMessage(commandIdentifier, success, null, this.serializer);
        }
        try {
            this.channel.send(address, (Object)reply);
        }
        catch (Exception e) {
            logger.error("Could not send reply", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processJoinMessage(Message message, JoinMessage joinMessage) {
        String joinedMember = message.getSrc().toString();
        if (this.channel.getView().containsMember(message.getSrc())) {
            int loadFactor = joinMessage.getLoadFactor();
            Predicate<? super CommandMessage<?>> commandFilter = joinMessage.messageFilter();
            SimpleMember member = new SimpleMember(joinedMember, (Object)message.getSrc(), false, s -> {});
            Object object = this.monitor;
            synchronized (object) {
                int order = this.members.compute((Address)member.endpoint(), (k, v) -> {
                    if (v == null || v.order() <= joinMessage.getOrder()) {
                        return new VersionedMember((SimpleMember<Address>)member, joinMessage.getOrder());
                    }
                    return v;
                }).order();
                if (joinMessage.getOrder() != order) {
                    logger.info("Received outdated update. Discarding it.");
                    return;
                }
                this.updateConsistentHash(ch -> ch.with((Member)member, loadFactor, commandFilter));
            }
            if (joinMessage.isExpectReply() && !this.channel.getAddress().equals(message.getSrc())) {
                this.sendMyConfigurationTo((Address)member.endpoint(), false, this.membershipVersion.get());
            }
            if (logger.isInfoEnabled() && !message.getSrc().equals(this.channel.getAddress())) {
                logger.info("{} joined with load factor: {}", (Object)joinedMember, (Object)loadFactor);
            } else {
                logger.debug("Got my own ({}) join message for load factor: {}", (Object)joinedMember, (Object)loadFactor);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Got a network of members: {}", this.members.values());
            }
        } else {
            logger.warn("Received join message from '{}', but a connection with the sender has been lost.", (Object)message.getSrc().toString());
        }
    }

    private void updateConsistentHash(UnaryOperator<ConsistentHash> consistentHashUpdate) {
        this.consistentHashChangeListener.onConsistentHashChanged(this.consistentHash.updateAndGet(consistentHashUpdate));
    }

    private void sendMyConfigurationTo(Address endpoint, boolean expectReply, int order) {
        try {
            logger.info("Sending my configuration to {}.", ObjectUtils.getOrDefault((Object)endpoint, (Object)"all nodes"));
            Message returnJoinMessage = new Message(endpoint, (Object)new JoinMessage(this.loadFactor, this.commandFilter, order, expectReply));
            returnJoinMessage.setFlag(new Message.Flag[]{Message.Flag.OOB});
            this.channel.send(returnJoinMessage);
        }
        catch (Exception e) {
            logger.warn("An exception occurred while sending membership information to newly joined member: {}", (Object)endpoint);
        }
    }

    public boolean awaitJoined() throws InterruptedException {
        this.joinedCondition.await();
        return this.joinedCondition.isJoined();
    }

    public boolean awaitJoined(long timeout, TimeUnit timeUnit) throws InterruptedException {
        this.joinedCondition.await(timeout, timeUnit);
        return this.joinedCondition.isJoined();
    }

    public String getNodeName() {
        return this.channel.getName();
    }

    protected ConsistentHash getConsistentHash() {
        return this.consistentHash.get();
    }

    public <C> void send(Member destination, CommandMessage<? extends C> command) throws Exception {
        this.channel.send(this.resolveAddress(destination), (Object)new JGroupsDispatchMessage(command, this.serializer, false));
    }

    public <C, R> void send(Member destination, CommandMessage<C> command, CommandCallback<? super C, R> callback) throws Exception {
        this.callbackRepository.store(command.getIdentifier(), new CommandCallbackWrapper((Object)destination, command, callback));
        this.channel.send(this.resolveAddress(destination), (Object)new JGroupsDispatchMessage(command, this.serializer, true));
    }

    public Registration subscribe(String commandName, MessageHandler<? super CommandMessage<?>> handler) {
        return this.localSegment.subscribe(commandName, handler);
    }

    protected Address resolveAddress(Member destination) {
        return (Address)destination.getConnectionEndpoint(Address.class).orElseThrow(() -> new CommandBusConnectorCommunicationException("The target member doesn't expose a JGroups endpoint"));
    }

    public Optional<Member> findDestination(CommandMessage<?> message) {
        String routingKey = this.routingStrategy.getRoutingKey(message);
        return this.consistentHash.get().getMember(routingKey, message);
    }

    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super CommandMessage<?>> handlerInterceptor) {
        return this.localSegment.registerHandlerInterceptor(handlerInterceptor);
    }

    private static class VersionedMember
    implements Member {
        private final SimpleMember<Address> member;
        private final int version;

        public VersionedMember(SimpleMember<Address> member, int version) {
            this.member = member;
            this.version = version;
        }

        public int order() {
            return this.version;
        }

        public String name() {
            return this.member.name();
        }

        public <T> Optional<T> getConnectionEndpoint(Class<T> protocol) {
            return this.member.getConnectionEndpoint(protocol);
        }

        public boolean local() {
            return this.member.local();
        }

        public void suspect() {
            this.member.suspect();
        }
    }

    private static final class JoinCondition {
        private final CountDownLatch joinCountDown = new CountDownLatch(1);
        private volatile boolean success;

        private JoinCondition() {
        }

        public void await() throws InterruptedException {
            this.joinCountDown.await();
        }

        public void await(long timeout, TimeUnit timeUnit) throws InterruptedException {
            this.joinCountDown.await(timeout, timeUnit);
        }

        private void markJoined() {
            this.success = true;
            this.joinCountDown.countDown();
        }

        public boolean isJoined() {
            return this.success;
        }
    }
}

