package org.onosproject.store.cluster.messaging.impl;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.security.AppGuard;
import org.onosproject.security.AppPermission;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.utils.MeteringAgent;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate = true, service = {ClusterCommunicationService.class})
/* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.class */
public class ClusterCommunicationManager implements ClusterCommunicationService {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, false);
    private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, false);
    private static final String PRIMITIVE_NAME = "clusterCommunication";
    private static final String SUBJECT_PREFIX = "subject";
    private static final String ENDPOINT_PREFIX = "endpoint";
    private static final String SERIALIZING = "serialization";
    private static final String DESERIALIZING = "deserialization";
    private static final String NODE_PREFIX = "node:";
    private static final String ROUND_TRIP_SUFFIX = ".rtt";
    private static final String ONE_WAY_SUFFIX = ".oneway";

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected MessagingService messagingService;
    private NodeId localNodeId;

    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager$InternalClusterMessageHandler.class */
    private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
        private ClusterMessageHandler handler;

        public InternalClusterMessageHandler(ClusterMessageHandler clusterMessageHandler) {
            this.handler = clusterMessageHandler;
        }

        @Override // java.util.function.BiFunction
        public byte[] apply(Endpoint endpoint, byte[] bArr) {
            ClusterMessage fromBytes = ClusterMessage.fromBytes(bArr);
            this.handler.handle(fromBytes);
            return fromBytes.response();
        }
    }

    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager$InternalMessageConsumer.class */
    private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
        private final Function<byte[], M> decoder;
        private final Consumer<M> consumer;

        public InternalMessageConsumer(Function<byte[], M> function, Consumer<M> consumer) {
            this.decoder = function;
            this.consumer = consumer;
        }

        @Override // java.util.function.BiConsumer
        public void accept(Endpoint endpoint, byte[] bArr) {
            this.consumer.accept(ClusterCommunicationManager.this.timeFunction(this.decoder, ClusterCommunicationManager.this.subjectMeteringAgent, ClusterCommunicationManager.DESERIALIZING).apply(ClusterMessage.fromBytes(bArr).payload()));
        }
    }

    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager$InternalMessageResponder.class */
    private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
        private final Function<byte[], M> decoder;
        private final Function<R, byte[]> encoder;
        private final Function<M, CompletableFuture<R>> handler;

        public InternalMessageResponder(Function<byte[], M> function, Function<R, byte[]> function2, Function<M, CompletableFuture<R>> function3) {
            this.decoder = function;
            this.encoder = function2;
            this.handler = function3;
        }

        @Override // java.util.function.BiFunction
        public CompletableFuture<byte[]> apply(Endpoint endpoint, byte[] bArr) {
            return ((CompletableFuture) this.handler.apply(ClusterCommunicationManager.this.timeFunction(this.decoder, ClusterCommunicationManager.this.subjectMeteringAgent, ClusterCommunicationManager.DESERIALIZING).apply(ClusterMessage.fromBytes(bArr).payload()))).thenApply(obj -> {
                return (byte[]) ClusterCommunicationManager.this.timeFunction(this.encoder, ClusterCommunicationManager.this.subjectMeteringAgent, ClusterCommunicationManager.SERIALIZING).apply(obj);
            });
        }
    }

    @Activate
    public void activate() {
        this.localNodeId = this.clusterService.getLocalNode().id();
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.log.info("Stopped");
    }

    public <M> void broadcast(M m, MessageSubject messageSubject, Function<M, byte[]> function) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        multicast(m, messageSubject, function, (Set) this.clusterService.getNodes().stream().filter(controllerNode -> {
            return !Objects.equal(controllerNode, this.clusterService.getLocalNode());
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet()));
    }

    public <M> void broadcastIncludeSelf(M m, MessageSubject messageSubject, Function<M, byte[]> function) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        multicast(m, messageSubject, function, (Set) this.clusterService.getNodes().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet()));
    }

    public <M> CompletableFuture<Void> unicast(M m, MessageSubject messageSubject, Function<M, byte[]> function, NodeId nodeId) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        try {
            return doUnicast(messageSubject, new ClusterMessage(this.localNodeId, messageSubject, (byte[]) timeFunction(function, this.subjectMeteringAgent, SERIALIZING).apply(m)).getBytes(), nodeId);
        } catch (Exception e) {
            return Tools.exceptionalFuture(e);
        }
    }

    public <M> void multicast(M m, MessageSubject messageSubject, Function<M, byte[]> function, Set<NodeId> set) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        byte[] bytes = new ClusterMessage(this.localNodeId, messageSubject, (byte[]) timeFunction(function, this.subjectMeteringAgent, SERIALIZING).apply(m)).getBytes();
        set.forEach(nodeId -> {
            doUnicast(messageSubject, bytes, nodeId);
        });
    }

    public <M, R> CompletableFuture<R> sendAndReceive(M m, MessageSubject messageSubject, Function<M, byte[]> function, Function<byte[], R> function2, NodeId nodeId, Duration duration) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        try {
            return (CompletableFuture<R>) sendAndReceive(messageSubject, new ClusterMessage(this.clusterService.getLocalNode().id(), messageSubject, (byte[]) timeFunction(function, this.subjectMeteringAgent, SERIALIZING).apply(m)).getBytes(), nodeId, duration).thenApply(bArr -> {
                return timeFunction(function2, this.subjectMeteringAgent, DESERIALIZING).apply(bArr);
            });
        } catch (Exception e) {
            return Tools.exceptionalFuture(e);
        }
    }

    private CompletableFuture<Void> doUnicast(MessageSubject messageSubject, byte[] bArr, NodeId nodeId) {
        ControllerNode node = this.clusterService.getNode(nodeId);
        Preconditions.checkArgument(node != null, "Unknown nodeId: %s", nodeId);
        Endpoint endpoint = new Endpoint(node.ip(), node.tcpPort());
        MeteringAgent.Context startTimer = this.subjectMeteringAgent.startTimer(messageSubject.toString() + ".oneway");
        return this.messagingService.sendAsync(endpoint, messageSubject.toString(), bArr).whenComplete((r4, th) -> {
            startTimer.stop(th);
        });
    }

    private CompletableFuture<byte[]> sendAndReceive(MessageSubject messageSubject, byte[] bArr, NodeId nodeId, Duration duration) {
        ControllerNode node = this.clusterService.getNode(nodeId);
        Preconditions.checkArgument(node != null, "Unknown nodeId: %s", nodeId);
        Endpoint endpoint = new Endpoint(node.ip(), node.tcpPort());
        MeteringAgent.Context startTimer = this.endpointMeteringAgent.startTimer("node:" + nodeId.toString() + ".rtt");
        MeteringAgent.Context startTimer2 = this.subjectMeteringAgent.startTimer(messageSubject.toString() + ".rtt");
        return this.messagingService.sendAndReceive(endpoint, messageSubject.toString(), bArr, duration).whenComplete((bArr2, th) -> {
            startTimer2.stop(th);
            startTimer.stop(th);
        });
    }

    public void addSubscriber(MessageSubject messageSubject, ClusterMessageHandler clusterMessageHandler, ExecutorService executorService) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        this.messagingService.registerHandler(messageSubject.toString(), new InternalClusterMessageHandler(clusterMessageHandler), executorService);
    }

    public void removeSubscriber(MessageSubject messageSubject) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        this.messagingService.unregisterHandler(messageSubject.toString());
    }

    public <M, R> void addSubscriber(MessageSubject messageSubject, Function<byte[], M> function, Function<M, R> function2, Function<R, byte[]> function3, Executor executor) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        this.messagingService.registerHandler(messageSubject.toString(), new InternalMessageResponder(function, function3, obj -> {
            CompletableFuture completableFuture = new CompletableFuture();
            executor.execute(() -> {
                try {
                    completableFuture.complete(function2.apply(obj));
                } catch (Exception e) {
                    completableFuture.completeExceptionally(e);
                }
            });
            return completableFuture;
        }));
    }

    public <M, R> void addSubscriber(MessageSubject messageSubject, Function<byte[], M> function, Function<M, CompletableFuture<R>> function2, Function<R, byte[]> function3) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        this.messagingService.registerHandler(messageSubject.toString(), new InternalMessageResponder(function, function3, function2));
    }

    public <M> void addSubscriber(MessageSubject messageSubject, Function<byte[], M> function, Consumer<M> consumer, Executor executor) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        this.messagingService.registerHandler(messageSubject.toString(), new InternalMessageConsumer(function, consumer), executor);
    }

    private <A, B> Function<A, B> timeFunction(final Function<A, B> function, final MeteringAgent meteringAgent, final String str) {
        Preconditions.checkNotNull(function);
        Preconditions.checkNotNull(meteringAgent);
        Preconditions.checkNotNull(str);
        return new Function<A, B>() { // from class: org.onosproject.store.cluster.messaging.impl.ClusterCommunicationManager.1
            @Override // java.util.function.Function
            public B apply(A a) {
                MeteringAgent.Context startTimer = meteringAgent.startTimer(str);
                try {
                    B b = (B) function.apply(a);
                    startTimer.stop((Throwable) null);
                    return b;
                } catch (Exception e) {
                    startTimer.stop(e);
                    Throwables.throwIfUnchecked(Throwables.getRootCause(e));
                    throw new IllegalStateException(e.getCause());
                }
            }
        };
    }
}
