package org.axonframework.commandhandling.distributed.jgroups;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.distributed.CommandBusConnector;
import org.axonframework.commandhandling.distributed.CommandDispatchException;
import org.axonframework.commandhandling.distributed.ConsistentHash;
import org.axonframework.commandhandling.distributed.RemoteCommandHandlingException;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.serializer.MessageSerializer;
import org.axonframework.serializer.Serializer;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/commandhandling/distributed/jgroups/JGroupsConnector.class */
public class JGroupsConnector implements CommandBusConnector {
    private static final Logger logger = LoggerFactory.getLogger(JGroupsConnector.class);
    private final JChannel channel;
    private final String clusterName;
    private final CommandBus localSegment;
    private final MessageSerializer serializer;
    private volatile int currentLoadFactor;
    private volatile ConsistentHash consistentHash = ConsistentHash.emptyRing();
    private final JoinCondition joinedCondition = new JoinCondition();
    private final ConcurrentMap<String, MemberAwareCommandCallback> callbacks = new ConcurrentHashMap();
    private final Set<String> supportedCommandNames = new CopyOnWriteArraySet();
    private final MessageReceiver messageReceiver = new MessageReceiver();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/commandhandling/distributed/jgroups/JGroupsConnector$JoinCondition.class */
    public static final class JoinCondition {
        private final CountDownLatch joinCountDown;
        private volatile boolean success;

        private JoinCondition() {
            this.joinCountDown = new CountDownLatch(1);
        }

        public void await() throws InterruptedException {
            this.joinCountDown.await();
        }

        public void await(long j, TimeUnit timeUnit) throws InterruptedException {
            this.joinCountDown.await(j, timeUnit);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void markJoined(boolean z) {
            this.success = z;
            this.joinCountDown.countDown();
        }

        public boolean isJoined() {
            return this.success;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/commandhandling/distributed/jgroups/JGroupsConnector$MemberAwareCommandCallback.class */
    public static class MemberAwareCommandCallback<R> implements CommandCallback<R> {
        private final Address dest;
        private final CommandCallback<R> callback;

        public MemberAwareCommandCallback(Address address, CommandCallback<R> commandCallback) {
            this.dest = address;
            this.callback = commandCallback;
        }

        public boolean isMemberLive(View view) {
            return view.containsMember(this.dest);
        }

        public void onSuccess(R r) {
            this.callback.onSuccess(r);
        }

        public void onFailure(Throwable th) {
            this.callback.onFailure(th);
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/distributed/jgroups/JGroupsConnector$MessageReceiver.class */
    private class MessageReceiver extends ReceiverAdapter {
        private volatile View currentView;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/axonframework/commandhandling/distributed/jgroups/JGroupsConnector$MessageReceiver$ReplyingCallback.class */
        public class ReplyingCallback implements CommandCallback<Object> {
            private final Message msg;
            private final CommandMessage commandMessage;

            public ReplyingCallback(Message message, CommandMessage commandMessage) {
                this.msg = message;
                this.commandMessage = commandMessage;
            }

            public void onSuccess(Object obj) {
                try {
                    JGroupsConnector.this.channel.send(this.msg.getSrc(), new ReplyMessage(this.commandMessage.getIdentifier(), obj, null, JGroupsConnector.this.serializer));
                } catch (Exception e) {
                    JGroupsConnector.logger.error("Unable to send reply to command [name: {}, id: {}]. ", new Object[]{this.commandMessage.getCommandName(), this.commandMessage.getIdentifier(), e});
                }
            }

            public void onFailure(Throwable th) {
                try {
                    JGroupsConnector.this.channel.send(this.msg.getSrc(), new ReplyMessage(this.commandMessage.getIdentifier(), null, th, JGroupsConnector.this.serializer));
                } catch (Exception e) {
                    JGroupsConnector.logger.error("Unable to send reply:", e);
                }
            }
        }

        private MessageReceiver() {
        }

        public void getState(OutputStream outputStream) throws Exception {
            Util.objectToStream(JGroupsConnector.this.consistentHash, new DataOutputStream(outputStream));
        }

        public void setState(InputStream inputStream) throws Exception {
            JGroupsConnector.this.consistentHash = (ConsistentHash) Util.objectFromStream(new DataInputStream(inputStream));
        }

        public void viewAccepted(View view) {
            MemberAwareCommandCallback memberAwareCommandCallback;
            ConsistentHash withExclusively = JGroupsConnector.this.consistentHash.withExclusively(JGroupsConnector.this.getMemberNames(view));
            if (!JGroupsConnector.this.consistentHash.equals(withExclusively)) {
                int i = 0;
                for (Map.Entry entry : JGroupsConnector.this.callbacks.entrySet()) {
                    if (!((MemberAwareCommandCallback) entry.getValue()).isMemberLive(view) && (memberAwareCommandCallback = (MemberAwareCommandCallback) JGroupsConnector.this.callbacks.remove(entry.getKey())) != null) {
                        i++;
                        memberAwareCommandCallback.onFailure(new RemoteCommandHandlingException("The connection with the destination was lost before the result was reported."));
                    }
                }
                JGroupsConnector.this.consistentHash = withExclusively;
                JGroupsConnector.logger.info("Membership has changed. Rebuilt consistent hash ring.");
                JGroupsConnector.logger.debug("New distributed hash: {}", JGroupsConnector.this.consistentHash.toString());
                if (i > 0 && JGroupsConnector.logger.isWarnEnabled()) {
                    JGroupsConnector.logger.warn("A member was disconnected while waiting for a reply. {} messages are lost without reply.", Integer.valueOf(i));
                }
            }
            if (!view.equals(this.currentView)) {
                for (Address address : view.getMembers()) {
                    if (this.currentView == null || !this.currentView.containsMember(address)) {
                        if (!address.equals(JGroupsConnector.this.channel.getAddress())) {
                            if (JGroupsConnector.logger.isInfoEnabled()) {
                                JGroupsConnector.logger.info("New member detected: [{}]. Sending it my configuration.", address.toString());
                            }
                            JGroupsConnector.this.sendMembershipUpdate(address);
                        }
                    }
                }
            }
            this.currentView = view;
        }

        public void suspect(Address address) {
            if (JGroupsConnector.logger.isWarnEnabled()) {
                JGroupsConnector.logger.warn("Suspect member: {}.", JGroupsConnector.this.channel.getName(address));
            }
        }

        public void receive(Message message) {
            Object object = message.getObject();
            if (object instanceof JoinMessage) {
                processJoinMessage(message, (JoinMessage) object);
            } else if (object instanceof DispatchMessage) {
                processDispatchMessage(message, (DispatchMessage) object);
            } else if (object instanceof ReplyMessage) {
                processReplyMessage((ReplyMessage) object);
            }
        }

        private void processDispatchMessage(Message message, DispatchMessage dispatchMessage) {
            CommandMessage<?> commandMessage = dispatchMessage.getCommandMessage(JGroupsConnector.this.serializer);
            if (dispatchMessage.isExpectReply()) {
                JGroupsConnector.this.localSegment.dispatch(commandMessage, new ReplyingCallback(message, commandMessage));
            } else {
                JGroupsConnector.this.localSegment.dispatch(commandMessage);
            }
        }

        private void processJoinMessage(Message message, JoinMessage joinMessage) {
            String name = JGroupsConnector.this.channel.getName(message.getSrc());
            JGroupsConnector.this.consistentHash = JGroupsConnector.this.consistentHash.withAdditionalNode(name, joinMessage.getLoadFactor(), joinMessage.getCommandNames());
            if (JGroupsConnector.logger.isInfoEnabled() && !message.getSrc().equals(JGroupsConnector.this.channel.getAddress())) {
                JGroupsConnector.logger.info("{} joined with load factor: {}", message.getSrc(), Integer.valueOf(joinMessage.getLoadFactor()));
            }
            if (message.getSrc().equals(JGroupsConnector.this.channel.getAddress())) {
                JGroupsConnector.this.joinedCondition.markJoined(true);
                JGroupsConnector.logger.info("Local segment successfully joined the distributed command bus");
            }
        }

        private void processReplyMessage(ReplyMessage replyMessage) {
            MemberAwareCommandCallback memberAwareCommandCallback = (MemberAwareCommandCallback) JGroupsConnector.this.callbacks.remove(replyMessage.getCommandIdentifier());
            if (memberAwareCommandCallback != null) {
                if (replyMessage.isSuccess()) {
                    memberAwareCommandCallback.onSuccess(replyMessage.getReturnValue(JGroupsConnector.this.serializer));
                } else {
                    memberAwareCommandCallback.onFailure(replyMessage.getError(JGroupsConnector.this.serializer));
                }
            }
        }
    }

    public JGroupsConnector(JChannel jChannel, String str, CommandBus commandBus, Serializer serializer) {
        this.channel = jChannel;
        this.clusterName = str;
        this.localSegment = commandBus;
        this.serializer = new MessageSerializer(serializer);
    }

    public synchronized void connect(int i) throws ConnectionFailedException {
        this.currentLoadFactor = i;
        Assert.isTrue(i >= 0, "Load Factor must be a positive integer value.");
        Assert.isTrue(this.channel.getReceiver() == null || this.channel.getReceiver() == this.messageReceiver, "The given channel already has a receiver configured. Has the channel been reused with other Connectors?");
        try {
            this.channel.setReceiver(this.messageReceiver);
            if (this.channel.isConnected() && !this.clusterName.equals(this.channel.getClusterName())) {
                throw new AxonConfigurationException("The Channel that has been configured with this JGroupsConnector is already connected, but not through this cluster");
            }
            if (this.channel.isConnected()) {
                this.channel.getState((Address) null, 10000L);
            } else {
                this.channel.connect(this.clusterName, (Address) null, 10000L);
            }
            sendMembershipUpdate(null);
        } catch (Exception e) {
            this.joinedCondition.markJoined(false);
            this.channel.disconnect();
            throw new ConnectionFailedException("Failed to connect to JGroupsConnectorFactoryBean", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendMembershipUpdate(Address address) throws MembershipUpdateFailedException {
        try {
            if (this.channel.isConnected()) {
                this.channel.send(new Message(address, new JoinMessage(this.currentLoadFactor, new HashSet(this.supportedCommandNames))).setFlag(new Message.Flag[]{Message.Flag.RSVP}));
            }
        } catch (Exception e) {
            throw new MembershipUpdateFailedException("Failed to dispatch Join message to Distributed Command Bus Members", e);
        }
    }

    public boolean awaitJoined() throws InterruptedException {
        this.joinedCondition.await();
        return this.joinedCondition.isJoined();
    }

    public boolean awaitJoined(long j, TimeUnit timeUnit) throws InterruptedException {
        this.joinedCondition.await(j, timeUnit);
        return this.joinedCondition.isJoined();
    }

    @Override // org.axonframework.commandhandling.distributed.CommandBusConnector
    public <R> void send(String str, CommandMessage<?> commandMessage, CommandCallback<R> commandCallback) throws Exception {
        Assert.isTrue(awaitJoined(5L, TimeUnit.SECONDS), "This Connector did not properly join the Cluster yet.");
        String member = this.consistentHash.getMember(str, commandMessage.getPayloadType().getName());
        if (member == null) {
            throw new CommandDispatchException("No node known to accept " + commandMessage.getPayloadType().getName());
        }
        Address address = getAddress(member);
        this.callbacks.put(commandMessage.getIdentifier(), new MemberAwareCommandCallback(address, commandCallback));
        this.channel.send(address, new DispatchMessage(commandMessage, this.serializer, true));
    }

    @Override // org.axonframework.commandhandling.distributed.CommandBusConnector
    public void send(String str, CommandMessage<?> commandMessage) throws Exception {
        Assert.isTrue(awaitJoined(5L, TimeUnit.SECONDS), "This Connector did not properly join the Cluster yet.");
        String member = this.consistentHash.getMember(str, commandMessage.getPayloadType().getName());
        if (member == null) {
            throw new CommandDispatchException("No node known to accept " + commandMessage.getPayloadType().getName());
        }
        this.channel.send(getAddress(member), new DispatchMessage(commandMessage, this.serializer, false));
    }

    @Override // org.axonframework.commandhandling.distributed.CommandBusConnector
    public synchronized <C> void subscribe(String str, CommandHandler<? super C> commandHandler) {
        this.localSegment.subscribe(str, commandHandler);
        if (this.supportedCommandNames.add(str)) {
            sendMembershipUpdate(null);
        }
    }

    @Override // org.axonframework.commandhandling.distributed.CommandBusConnector
    public synchronized <C> boolean unsubscribe(String str, CommandHandler<? super C> commandHandler) {
        if (!this.localSegment.unsubscribe(str, commandHandler)) {
            return false;
        }
        if (!this.supportedCommandNames.remove(str)) {
            return true;
        }
        sendMembershipUpdate(null);
        return true;
    }

    private Address getAddress(String str) {
        Iterator it = this.channel.getView().iterator();
        while (it.hasNext()) {
            Address address = (Address) it.next();
            if (this.channel.getName(address).equals(str)) {
                return address;
            }
        }
        throw new IllegalArgumentException("Given node doesn't seem to be a member of the DistributedCommandBus");
    }

    public ConsistentHash getConsistentHash() {
        return this.consistentHash;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<String> getMemberNames(View view) {
        ArrayList arrayList = new ArrayList(view.size());
        Iterator it = view.getMembers().iterator();
        while (it.hasNext()) {
            arrayList.add(this.channel.getName((Address) it.next()));
        }
        return arrayList;
    }
}
