package org.onosproject.store.cluster.messaging.impl;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
@Component(immediate = true)
/* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.class */
public class ClusterCommunicationManager implements ClusterCommunicationService {
    private final Logger log = LoggerFactory.getLogger(getClass());

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    private ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected MessagingService messagingService;
    private NodeId localNodeId;

    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager$InternalClusterMessageHandler.class */
    private class InternalClusterMessageHandler implements Function<byte[], byte[]> {
        private ClusterMessageHandler handler;

        public InternalClusterMessageHandler(ClusterMessageHandler clusterMessageHandler) {
            this.handler = clusterMessageHandler;
        }

        @Override // java.util.function.Function
        public byte[] apply(byte[] bArr) {
            ClusterMessage fromBytes = ClusterMessage.fromBytes(bArr);
            this.handler.handle(fromBytes);
            return fromBytes.response();
        }
    }

    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager$InternalMessageConsumer.class */
    private class InternalMessageConsumer<M> implements Consumer<byte[]> {
        private final Function<byte[], M> decoder;
        private final Consumer<M> consumer;

        public InternalMessageConsumer(Function<byte[], M> function, Consumer<M> consumer) {
            this.decoder = function;
            this.consumer = consumer;
        }

        @Override // java.util.function.Consumer
        public void accept(byte[] bArr) {
            this.consumer.accept(this.decoder.apply(ClusterMessage.fromBytes(bArr).payload()));
        }
    }

    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager$InternalMessageResponder.class */
    private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
        private final Function<byte[], M> decoder;
        private final Function<R, byte[]> encoder;
        private final Function<M, CompletableFuture<R>> handler;

        public InternalMessageResponder(Function<byte[], M> function, Function<R, byte[]> function2, Function<M, CompletableFuture<R>> function3) {
            this.decoder = function;
            this.encoder = function2;
            this.handler = function3;
        }

        @Override // java.util.function.Function
        public CompletableFuture<byte[]> apply(byte[] bArr) {
            return ((CompletableFuture) this.handler.apply(this.decoder.apply(ClusterMessage.fromBytes(bArr).payload()))).thenApply((Function) this.encoder);
        }
    }

    @Activate
    public void activate() {
        this.localNodeId = this.clusterService.getLocalNode().id();
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.log.info("Stopped");
    }

    public <M> void broadcast(M m, MessageSubject messageSubject, Function<M, byte[]> function) {
        multicast(m, messageSubject, function, (Set) this.clusterService.getNodes().stream().filter(controllerNode -> {
            return !Objects.equal(controllerNode, this.clusterService.getLocalNode());
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet()));
    }

    public <M> void broadcastIncludeSelf(M m, MessageSubject messageSubject, Function<M, byte[]> function) {
        multicast(m, messageSubject, function, (Set) this.clusterService.getNodes().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet()));
    }

    public <M> CompletableFuture<Void> unicast(M m, MessageSubject messageSubject, Function<M, byte[]> function, NodeId nodeId) {
        try {
            return doUnicast(messageSubject, new ClusterMessage(this.localNodeId, messageSubject, function.apply(m)).getBytes(), nodeId);
        } catch (Exception e) {
            return Tools.exceptionalFuture(e);
        }
    }

    public <M> void multicast(M m, MessageSubject messageSubject, Function<M, byte[]> function, Set<NodeId> set) {
        byte[] bytes = new ClusterMessage(this.localNodeId, messageSubject, function.apply(m)).getBytes();
        set.forEach(nodeId -> {
            doUnicast(messageSubject, bytes, nodeId);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <M, R> CompletableFuture<R> sendAndReceive(M m, MessageSubject messageSubject, Function<M, byte[]> function, Function<byte[], R> function2, NodeId nodeId) {
        try {
            return (CompletableFuture<R>) sendAndReceive(messageSubject, new ClusterMessage(this.clusterService.getLocalNode().id(), messageSubject, function.apply(m)).getBytes(), nodeId).thenApply((Function<? super byte[], ? extends U>) function2);
        } catch (Exception e) {
            return Tools.exceptionalFuture(e);
        }
    }

    private CompletableFuture<Void> doUnicast(MessageSubject messageSubject, byte[] bArr, NodeId nodeId) {
        ControllerNode node = this.clusterService.getNode(nodeId);
        Preconditions.checkArgument(node != null, "Unknown nodeId: %s", new Object[]{nodeId});
        return this.messagingService.sendAsync(new Endpoint(node.ip(), node.tcpPort()), messageSubject.value(), bArr);
    }

    private CompletableFuture<byte[]> sendAndReceive(MessageSubject messageSubject, byte[] bArr, NodeId nodeId) {
        ControllerNode node = this.clusterService.getNode(nodeId);
        Preconditions.checkArgument(node != null, "Unknown nodeId: %s", new Object[]{nodeId});
        return this.messagingService.sendAndReceive(new Endpoint(node.ip(), node.tcpPort()), messageSubject.value(), bArr);
    }

    public void addSubscriber(MessageSubject messageSubject, ClusterMessageHandler clusterMessageHandler, ExecutorService executorService) {
        this.messagingService.registerHandler(messageSubject.value(), new InternalClusterMessageHandler(clusterMessageHandler), executorService);
    }

    public void removeSubscriber(MessageSubject messageSubject) {
        this.messagingService.unregisterHandler(messageSubject.value());
    }

    public <M, R> void addSubscriber(MessageSubject messageSubject, Function<byte[], M> function, Function<M, R> function2, Function<R, byte[]> function3, Executor executor) {
        this.messagingService.registerHandler(messageSubject.value(), new InternalMessageResponder(function, function3, obj -> {
            CompletableFuture completableFuture = new CompletableFuture();
            executor.execute(() -> {
                try {
                    completableFuture.complete(function2.apply(obj));
                } catch (Exception e) {
                    completableFuture.completeExceptionally(e);
                }
            });
            return completableFuture;
        }));
    }

    public <M, R> void addSubscriber(MessageSubject messageSubject, Function<byte[], M> function, Function<M, CompletableFuture<R>> function2, Function<R, byte[]> function3) {
        this.messagingService.registerHandler(messageSubject.value(), new InternalMessageResponder(function, function3, function2));
    }

    public <M> void addSubscriber(MessageSubject messageSubject, Function<byte[], M> function, Consumer<M> consumer, Executor executor) {
        this.messagingService.registerHandler(messageSubject.value(), new InternalMessageConsumer(function, consumer), executor);
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindMessagingService(MessagingService messagingService) {
        this.messagingService = messagingService;
    }

    protected void unbindMessagingService(MessagingService messagingService) {
        if (this.messagingService == messagingService) {
            this.messagingService = null;
        }
    }
}
