package org.onosproject.store.service.impl;

import com.google.common.base.Verify;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/onosproject/store/service/impl/ClusterMessagingProtocolClient.class */
public class ClusterMessagingProtocolClient implements ProtocolClient {
    public static final long RETRY_INTERVAL_MILLIS = 2000;
    private final ClusterService clusterService;
    private final ClusterCommunicationService clusterCommunicator;
    private final ControllerNode localNode;
    private final TcpMember remoteMember;
    private ControllerNode remoteNode;
    private ExecutorService pool;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final AtomicBoolean connectionOK = new AtomicBoolean(true);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onosproject/store/service/impl/ClusterMessagingProtocolClient$RPCTask.class */
    public class RPCTask<I, O> implements Runnable {
        private final I request;
        private final ClusterMessage message;
        private final CompletableFuture<O> future;

        public RPCTask(I i, CompletableFuture<O> completableFuture) {
            this.request = i;
            this.message = new ClusterMessage(ClusterMessagingProtocolClient.this.localNode.id(), ClusterMessagingProtocolClient.this.messageType(i), (byte[]) Verify.verifyNotNull(ClusterMessagingProtocol.DB_SERIALIZER.encode(i)));
            this.future = completableFuture;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (ClusterMessagingProtocolClient.this.remoteNode == null) {
                    ClusterMessagingProtocolClient.this.remoteNode = ClusterMessagingProtocolClient.this.getControllerNode(ClusterMessagingProtocolClient.this.remoteMember);
                    if (ClusterMessagingProtocolClient.this.remoteNode == null) {
                        throw new IOException("Remote node is offline!");
                    }
                }
                byte[] bArr = (byte[]) ClusterMessagingProtocolClient.this.clusterCommunicator.sendAndReceive(this.message, ClusterMessagingProtocolClient.this.remoteNode.id()).get(ClusterMessagingProtocolClient.RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
                if (!ClusterMessagingProtocolClient.this.connectionOK.getAndSet(true)) {
                    ClusterMessagingProtocolClient.this.log.info("Connectivity to {} restored", ClusterMessagingProtocolClient.this.remoteNode);
                }
                this.future.complete(Verify.verifyNotNull(ClusterMessagingProtocol.DB_SERIALIZER.decode(bArr)));
            } catch (IOException | TimeoutException e) {
                if (ClusterMessagingProtocolClient.this.connectionOK.getAndSet(false)) {
                    ClusterMessagingProtocolClient.this.log.warn("Detected connectivity issues with {}. Reason: {}", ClusterMessagingProtocolClient.this.remoteNode, e.getMessage());
                }
                ClusterMessagingProtocolClient.this.log.debug("RPCTask for {} failed.", this.request, e);
                this.future.completeExceptionally(e);
            } catch (InterruptedException e2) {
                ClusterMessagingProtocolClient.this.log.warn("RPCTask for {} was interrupted: {}", this.request, e2.getMessage());
                ClusterMessagingProtocolClient.this.log.debug("RPCTask for {} was interrupted.", this.request, e2);
                this.future.completeExceptionally(e2);
                Thread.currentThread().interrupt();
            } catch (ExecutionException e3) {
                ClusterMessagingProtocolClient.this.log.warn("RPCTask execution for {} failed: {}", this.request, e3.getMessage());
                ClusterMessagingProtocolClient.this.log.debug("RPCTask execution for {} failed.", this.request, e3);
                this.future.completeExceptionally(e3);
            } catch (Exception e4) {
                ClusterMessagingProtocolClient.this.log.warn("RPCTask for {} terribly failed.", this.request, e4);
                this.future.completeExceptionally(e4);
            }
        }
    }

    public ClusterMessagingProtocolClient(ClusterService clusterService, ClusterCommunicationService clusterCommunicationService, ControllerNode controllerNode, TcpMember tcpMember) {
        this.clusterService = clusterService;
        this.clusterCommunicator = clusterCommunicationService;
        this.localNode = controllerNode;
        this.remoteMember = tcpMember;
    }

    public CompletableFuture<PingResponse> ping(PingRequest pingRequest) {
        return requestReply(pingRequest);
    }

    public CompletableFuture<SyncResponse> sync(SyncRequest syncRequest) {
        return requestReply(syncRequest);
    }

    public CompletableFuture<PollResponse> poll(PollRequest pollRequest) {
        return requestReply(pollRequest);
    }

    public CompletableFuture<SubmitResponse> submit(SubmitRequest submitRequest) {
        return requestReply(submitRequest);
    }

    public synchronized CompletableFuture<Void> connect() {
        if (this.pool == null || this.pool.isShutdown()) {
            this.pool = Executors.newCachedThreadPool(Tools.namedThreads("copycat-netty-messaging-client-%d"));
        }
        return CompletableFuture.completedFuture(null);
    }

    public synchronized CompletableFuture<Void> close() {
        if (this.pool != null) {
            this.pool.shutdownNow();
            this.pool = null;
        }
        return CompletableFuture.completedFuture(null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <I> MessageSubject messageType(I i) {
        Class<?> cls = i.getClass();
        if (cls.equals(PollRequest.class)) {
            return ClusterMessagingProtocol.COPYCAT_POLL;
        }
        if (cls.equals(SyncRequest.class)) {
            return ClusterMessagingProtocol.COPYCAT_SYNC;
        }
        if (cls.equals(SubmitRequest.class)) {
            return ClusterMessagingProtocol.COPYCAT_SUBMIT;
        }
        if (cls.equals(PingRequest.class)) {
            return ClusterMessagingProtocol.COPYCAT_PING;
        }
        throw new IllegalArgumentException("Unknown class " + cls.getName());
    }

    private <I, O> CompletableFuture<O> requestReply(I i) {
        CompletableFuture<O> completableFuture = new CompletableFuture<>();
        if (this.pool == null) {
            this.log.info("Attempted to use closed client, connecting now. {}", i);
            connect();
        }
        this.pool.submit(new RPCTask(i, completableFuture));
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ControllerNode getControllerNode(TcpMember tcpMember) {
        String host = tcpMember.host();
        int port = tcpMember.port();
        for (ControllerNode controllerNode : this.clusterService.getNodes()) {
            if (controllerNode.ip().toString().equals(host) && controllerNode.tcpPort() == port) {
                return controllerNode;
            }
        }
        return null;
    }
}
