package io.vertx.spi.cluster.consul;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.impl.TaskQueue;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.Lock;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.ext.consul.CheckOptions;
import io.vertx.ext.consul.CheckStatus;
import io.vertx.ext.consul.ConsulClientOptions;
import io.vertx.ext.consul.KeyValueOptions;
import io.vertx.ext.consul.ServiceOptions;
import io.vertx.spi.cluster.consul.impl.ClusterManagerInternalContext;
import io.vertx.spi.cluster.consul.impl.ConsulAsyncMap;
import io.vertx.spi.cluster.consul.impl.ConsulAsyncMultiMap;
import io.vertx.spi.cluster.consul.impl.ConsulCounter;
import io.vertx.spi.cluster.consul.impl.ConsulLock;
import io.vertx.spi.cluster.consul.impl.ConsulMap;
import io.vertx.spi.cluster.consul.impl.ConsulMapListener;
import io.vertx.spi.cluster.consul.impl.ConsulSyncMap;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/vertx/spi/cluster/consul/ConsulClusterManager.class */
public class ConsulClusterManager extends ConsulMap<String, String> implements ClusterManager {
    private static final Logger log = LoggerFactory.getLogger(ConsulClusterManager.class);
    private static final String HA_INFO_MAP_NAME = "__vertx.haInfo";
    private static final String NODES_MAP_NAME = "__vertx.nodes";
    private static final String SERVICE_NAME = "vert.x-cluster-manager";
    private static final long TCP_CHECK_INTERVAL = 10000;
    private final Map<String, Lock> locks;
    private final Map<String, Counter> counters;
    private final Map<String, Map<?, ?>> syncMaps;
    private final Map<String, AsyncMap<?, ?>> asyncMaps;
    private final Map<String, AsyncMultiMap<?, ?>> asyncMultiMaps;
    private final JsonObject consulClusterManagerConfig;
    private final Set<String> nodes;
    private static final String NODE_JOINING_LOCK_NAME = "nodeJoining";
    private static final String NODE_LEAVING_LOCK_NAME = "nodeLeaving";
    private final AtomicReference<Lock> nodeJoiningLock;
    private final AtomicReference<Lock> nodeLeavingLock;
    private final TaskQueue taskQueue;
    private final boolean preferConsistency;
    private String checkId;
    private NetServer tcpServer;
    private JsonObject nodeTcpAddress;
    private volatile boolean active;
    private NodeListener nodeListener;

    public ConsulClusterManager(JsonObject jsonObject) {
        super(NODES_MAP_NAME, new ClusterManagerInternalContext());
        this.locks = new ConcurrentHashMap();
        this.counters = new ConcurrentHashMap();
        this.syncMaps = new ConcurrentHashMap();
        this.asyncMaps = new ConcurrentHashMap();
        this.asyncMultiMaps = new ConcurrentHashMap();
        this.nodes = new HashSet();
        this.nodeJoiningLock = new AtomicReference<>();
        this.nodeLeavingLock = new AtomicReference<>();
        this.taskQueue = new TaskQueue();
        this.nodeTcpAddress = new JsonObject();
        this.consulClusterManagerConfig = jsonObject;
        Objects.requireNonNull(jsonObject, "Given cluster manager can't get initialized.");
        this.appContext.setConsulClientOptions(new ConsulClientOptions(jsonObject)).setNodeId(UUID.randomUUID().toString());
        this.checkId = this.appContext.getNodeId();
        this.preferConsistency = jsonObject.containsKey("preferConsistency") ? jsonObject.getBoolean("preferConsistency").booleanValue() : false;
    }

    public ConsulClusterManager() {
        this(new JsonObject());
    }

    public void setVertx(Vertx vertx) {
        this.appContext.setVertx(vertx);
    }

    public <K, V> void getAsyncMultiMap(String str, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) {
        Future future = Future.future();
        future.complete(this.asyncMultiMaps.computeIfAbsent(str, str2 -> {
            return new ConsulAsyncMultiMap(str, this.preferConsistency, this.appContext);
        }));
        future.setHandler(handler);
    }

    public <K, V> void getAsyncMap(String str, Handler<AsyncResult<AsyncMap<K, V>>> handler) {
        Future future = Future.future();
        future.complete(this.asyncMaps.computeIfAbsent(str, str2 -> {
            return new ConsulAsyncMap(str, this.appContext, this);
        }));
        future.setHandler(handler);
    }

    public <K, V> Map<K, V> getSyncMap(String str) {
        return (Map) this.syncMaps.computeIfAbsent(str, str2 -> {
            return new ConsulSyncMap(str, this.appContext);
        });
    }

    public void getLockWithTimeout(String str, long j, Handler<AsyncResult<Lock>> handler) {
        this.appContext.getVertx().executeBlocking(future -> {
            ConsulLock consulLock = new ConsulLock(str, this.checkId, j, this.appContext);
            boolean z = false;
            long j2 = j;
            do {
                long nanoTime = System.nanoTime();
                try {
                    z = consulLock.tryToAcquire();
                } catch (VertxException e) {
                }
                j2 -= TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                if (z) {
                    break;
                }
            } while (j2 > 0);
            if (z) {
                this.locks.put(str, consulLock);
                future.complete(consulLock);
            } else {
                log.warn("[" + this.appContext.getNodeId() + "]: timed out to get a lock on: " + str);
                future.fail("Timed out waiting to get lock on: " + str);
            }
        }, false, handler);
    }

    public void getCounter(String str, Handler<AsyncResult<Counter>> handler) {
        Objects.requireNonNull(str);
        Future future = Future.future();
        future.complete(this.counters.computeIfAbsent(str, str2 -> {
            return new ConsulCounter(str, this.appContext);
        }));
        future.setHandler(handler);
    }

    public String getNodeID() {
        return this.appContext.getNodeId();
    }

    public List<String> getNodes() {
        return new ArrayList(this.nodes);
    }

    public void nodeListener(NodeListener nodeListener) {
        this.nodeListener = nodeListener;
    }

    public synchronized void join(Handler<AsyncResult<Void>> handler) {
        log.debug(this.appContext.getNodeId() + " is trying to join the cluster.");
        if (this.active) {
            log.warn(this.appContext.getNodeId() + " is NOT active.");
            handler.handle(Future.succeededFuture());
        } else {
            this.active = true;
            this.appContext.initConsulClient();
            deregisterFailingTcpChecks().compose(r3 -> {
                return createTcpServer();
            }).compose(r32 -> {
                return registerService();
            }).compose(r33 -> {
                return registerTcpCheck();
            }).compose(r34 -> {
                return registerSessionAndSave();
            }).compose(r35 -> {
                startListening();
                return clearHaInfoMap();
            }).compose(r4 -> {
                return addLocalNode(this.nodeTcpAddress);
            }).setHandler(asyncResult -> {
                if (asyncResult.succeeded()) {
                    handler.handle(Future.succeededFuture());
                    return;
                }
                try {
                    shutdownTcpServer();
                    if (Objects.nonNull(this.appContext.getEphemeralSessionId())) {
                        destroySession(this.appContext.getEphemeralSessionId());
                    }
                    deregisterTcpCheck();
                    this.appContext.close();
                } finally {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                }
            });
        }
    }

    public synchronized void leave(Handler<AsyncResult<Void>> handler) {
        log.debug(this.appContext.getNodeId() + " is trying to leave the cluster.");
        if (!this.active) {
            log.warn(this.appContext.getNodeId() + "' is NOT active.");
            handler.handle(Future.succeededFuture());
        } else {
            this.active = false;
            this.locks.values().forEach((v0) -> {
                v0.release();
            });
            removeLocalNode().compose(r4 -> {
                return destroySession(this.appContext.getEphemeralSessionId());
            }).compose(r3 -> {
                return deregisterTcpCheck();
            }).compose(r32 -> {
                return shutdownTcpServer();
            }).compose(r5 -> {
                this.appContext.close();
                log.debug("[" + this.appContext.getNodeId() + "] has left the cluster.");
                return Future.succeededFuture();
            }).setHandler(handler);
        }
    }

    public boolean isActive() {
        return this.active;
    }

    private Future<Void> registerSessionAndSave() {
        return registerSession("Session for ephemeral keys for: " + this.appContext.getNodeId(), this.checkId).compose(str -> {
            this.appContext.setEphemeralSessionId(str);
            return Future.succeededFuture();
        });
    }

    private Future<Void> addLocalNode(JsonObject jsonObject) {
        Future future = Future.future();
        getLockWithTimeout(NODE_JOINING_LOCK_NAME, 10100L, future.completer());
        return future.compose(lock -> {
            this.nodeJoiningLock.set(lock);
            return putPlainValue(keyPath(this.appContext.getNodeId()), jsonObject.encode(), new KeyValueOptions().setAcquireSession(this.appContext.getEphemeralSessionId()));
        }).compose(bool -> {
            if (bool.booleanValue()) {
                return Future.succeededFuture();
            }
            this.nodeJoiningLock.get().release();
            return Future.failedFuture("Node: " + this.appContext.getNodeId() + "failed to join the cluster.");
        });
    }

    private Future<Void> removeLocalNode() {
        Future future = Future.future();
        getLockWithTimeout(NODE_LEAVING_LOCK_NAME, 10100L, future.completer());
        return future.compose(lock -> {
            this.nodeLeavingLock.set(lock);
            return deleteValueByKeyPath(keyPath(this.appContext.getNodeId()));
        }).compose(bool -> {
            if (bool.booleanValue()) {
                return Future.succeededFuture();
            }
            this.nodeLeavingLock.get().release();
            return Future.failedFuture("Node: " + this.appContext.getNodeId() + "failed to leave the cluster.");
        });
    }

    private Future<Boolean> isClusterEmpty() {
        return plainKeys().compose(list -> {
            return Future.succeededFuture(Boolean.valueOf(list.isEmpty()));
        });
    }

    private Future<Void> clearHaInfoMap() {
        return isClusterEmpty().compose(bool -> {
            if (!bool.booleanValue()) {
                return Future.succeededFuture();
            }
            Future future = Future.future();
            ((ConsulSyncMap) getSyncMap(HA_INFO_MAP_NAME)).clear(obj -> {
                future.complete();
            });
            return future;
        });
    }

    @Override // io.vertx.spi.cluster.consul.impl.ConsulMap, io.vertx.spi.cluster.consul.impl.ConsulMapListener
    public void entryUpdated(ConsulMapListener.EntryEvent entryEvent) {
        String actualKey = actualKey(entryEvent.getEntry().getKey());
        switch (entryEvent.getEventType()) {
            case WRITE:
                boolean add = this.nodes.add(actualKey);
                if (log.isTraceEnabled() && add) {
                    log.trace("[" + this.appContext.getNodeId() + "] New node: " + actualKey + " has joined the cluster.");
                }
                if (actualKey.equals(this.appContext.getNodeId())) {
                    this.nodeJoiningLock.get().release();
                }
                if (this.nodeListener == null || !this.active) {
                    return;
                }
                this.appContext.getVertx().getOrCreateContext().executeBlocking(future -> {
                    this.nodeListener.nodeAdded(actualKey);
                    future.complete();
                }, this.taskQueue, asyncResult -> {
                    if (log.isTraceEnabled()) {
                        log.trace("[" + this.appContext.getNodeId() + "] Node: " + actualKey + " has been added to nodeListener.", new Object[]{actualKey});
                    }
                });
                return;
            case REMOVE:
                boolean remove = this.nodes.remove(actualKey);
                if (log.isTraceEnabled() && remove) {
                    log.trace("[" + this.appContext.getNodeId() + "] Node: " + actualKey + " has left the cluster.");
                }
                if (actualKey.equals(this.appContext.getNodeId())) {
                    this.nodeLeavingLock.get().release();
                }
                if (this.nodeListener == null || !this.active) {
                    return;
                }
                this.appContext.getVertx().getOrCreateContext().executeBlocking(future2 -> {
                    this.nodeListener.nodeLeft(actualKey);
                    future2.complete();
                }, this.taskQueue, asyncResult2 -> {
                    if (log.isTraceEnabled()) {
                        log.trace("[" + this.appContext.getNodeId() + "] Node: " + actualKey + " has been removed from nodeListener.");
                    }
                });
                return;
            default:
                return;
        }
    }

    private Future<Void> createTcpServer() {
        Future<Void> future = Future.future();
        String string = this.consulClusterManagerConfig.getString("nodeHost");
        if (!Objects.nonNull(string) || string.isEmpty()) {
            try {
                this.nodeTcpAddress.put("host", InetAddress.getLocalHost().getHostAddress());
            } catch (UnknownHostException e) {
                log.error(e);
                future.fail(e);
            }
        } else {
            this.nodeTcpAddress.put("host", string);
        }
        this.tcpServer = this.appContext.getVertx().createNetServer(new NetServerOptions(this.nodeTcpAddress));
        this.tcpServer.connectHandler(netSocket -> {
        });
        this.tcpServer.listen(asyncResult -> {
            if (!asyncResult.succeeded()) {
                future.fail(asyncResult.cause());
            } else {
                this.nodeTcpAddress.put("port", Integer.valueOf(((NetServer) asyncResult.result()).actualPort()));
                future.complete();
            }
        });
        return future;
    }

    private Future<Void> shutdownTcpServer() {
        Future<Void> future = Future.future();
        if (this.tcpServer != null) {
            this.tcpServer.close(future.completer());
        } else {
            future.complete();
        }
        return future;
    }

    private Future<Void> registerService() {
        Future<Void> future = Future.future();
        ServiceOptions serviceOptions = new ServiceOptions();
        serviceOptions.setName(SERVICE_NAME);
        serviceOptions.setTags(Collections.singletonList("vertx-clustering"));
        serviceOptions.setId(SERVICE_NAME);
        this.appContext.getConsulClient().registerService(serviceOptions, asyncResult -> {
            if (!asyncResult.failed()) {
                future.complete();
            } else {
                log.error("[" + this.appContext.getNodeId() + "] - Failed to register vert.x cluster management service.", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    private Future<Void> registerTcpCheck() {
        Future<Void> future = Future.future();
        CheckOptions status = new CheckOptions().setName(this.checkId).setNotes("This check is dedicated to service with id: " + this.appContext.getNodeId()).setId(this.checkId).setTcp(this.nodeTcpAddress.getString("host") + ":" + this.nodeTcpAddress.getInteger("port")).setServiceId(SERVICE_NAME).setInterval(TimeUnit.MILLISECONDS.toSeconds(TCP_CHECK_INTERVAL) + "s").setStatus(CheckStatus.PASSING);
        this.appContext.getConsulClient().registerCheck(status, asyncResult -> {
            if (!asyncResult.failed()) {
                future.complete();
            } else {
                log.error("[" + this.appContext.getNodeId() + "] - Failed to register check: " + status.getId(), asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    private Future<Void> deregisterTcpCheck() {
        Future<Void> future = Future.future();
        this.appContext.getConsulClient().deregisterCheck(this.checkId, asyncResult -> {
            if (asyncResult.succeeded()) {
                future.complete();
            } else {
                log.error("[" + this.appContext.getNodeId() + "] - Failed to deregister check: " + this.checkId, asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    private Future<Void> deregisterFailingTcpChecks() {
        Future future = Future.future();
        this.appContext.getConsulClient().healthChecks(SERVICE_NAME, future.completer());
        return future.compose(checkList -> {
            ArrayList arrayList = new ArrayList();
            checkList.getList().forEach(check -> {
                if (check.getStatus() == CheckStatus.CRITICAL) {
                    Future future2 = Future.future();
                    this.appContext.getConsulClient().deregisterCheck(check.getId(), future2.completer());
                    arrayList.add(future2);
                }
            });
            return CompositeFuture.all(arrayList).compose(compositeFuture -> {
                return Future.succeededFuture();
            });
        });
    }
}
