/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols;

import java.io.Closeable;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.PhysicalAddress;
import org.jgroups.View;
import org.jgroups.ViewId;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.conf.ConfiguratorFactory;
import org.jgroups.protocols.PingData;
import org.jgroups.protocols.PingHeader;
import org.jgroups.protocols.TCP;
import org.jgroups.protocols.TP;
import org.jgroups.stack.IpAddress;
import org.jgroups.stack.Protocol;
import org.jgroups.util.ByteArray;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.NameCache;
import org.jgroups.util.Responses;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Tuple;
import org.jgroups.util.UUID;
import org.jgroups.util.Util;

@MBean
public abstract class Discovery
extends Protocol {
    @Property(description="Return from the discovery phase as soon as we have 1 coordinator response")
    protected boolean break_on_coord_rsp = true;
    @Property(description="Whether or not to return the entire logical-physical address cache mappings on a discovery request, or not.")
    protected boolean return_entire_cache;
    @Property(description="If greater than 0, we'll wait a random number of milliseconds in range [0..stagger_timeout] before sending a discovery response. This prevents traffic spikes in large clusters when everyone sends their discovery response at the same time", type=AttributeType.TIME)
    protected long stagger_timeout;
    @Property(description="If a persistent disk cache (PDC) is present, combine the discovery results with the contents of the disk cache before returning the results")
    protected boolean use_disk_cache;
    @Property(description="Max size of the member list shipped with a discovery request. If we have more, the mbrs field in the discovery request header is nulled and members return the entire membership, not individual members")
    protected int max_members_in_discovery_request = 500;
    @Property(description="Expiry time of discovery responses in ms", type=AttributeType.TIME)
    protected long discovery_rsp_expiry_time = 60000L;
    @Property(description="If true then the discovery is done on a separate timer thread. Should be set to true when discovery is blocking and/or takes more than a few milliseconds")
    protected boolean async_discovery;
    @Property(description="If enabled, use a separate thread for every discovery request. Can be used with or without async_discovery")
    protected boolean async_discovery_use_separate_thread_per_request;
    @Property(description="When a new node joins, and we have a static discovery protocol (TCPPING), then send the contents of the discovery cache to new and existing members if true (and we're the coord). Addresses JGRP-1903")
    protected boolean send_cache_on_join = true;
    @Property(description="The max rank of this member to respond to discovery requests, e.g. if max_rank_to_reply=2 in {A,B,C,D,E}, only A (rank 1) and B (rank 2) will reply. A value <= 0 means everybody will reply. This attribute is ignored if TP.use_ip_addrs is false.")
    protected int max_rank_to_reply;
    @Property(description="The number of times a discovery process is executed when finding initial members (https://issues.redhat.com/browse/JGRP-2317)")
    protected int num_discovery_runs = 1;
    @ManagedAttribute(description="Total number of discovery requests sent ")
    protected int num_discovery_requests;
    protected volatile boolean is_server;
    protected volatile boolean is_leaving;
    protected TimeScheduler timer;
    protected volatile View view;
    @ManagedAttribute(description="Whether this member is the current coordinator")
    protected volatile boolean is_coord;
    protected volatile Address current_coord;
    protected String cluster_name;
    protected TP transport;
    protected final Map<Long, Responses> ping_responses = new HashMap<Long, Responses>();
    protected final List<Future<?>> discovery_req_futures = new ArrayList();
    @ManagedAttribute(description="Whether the transport supports multicasting")
    protected boolean transport_supports_multicasting = true;
    @ManagedAttribute(description="True if sending a message can block at the transport level")
    protected boolean sends_can_block = true;
    protected Consumer<PingData> discovery_rsp_callback;
    protected static final byte[] WHITESPACE = " \t".getBytes();

    @Override
    public void init() throws Exception {
        this.transport = this.getTransport();
        if (this.stagger_timeout < 0L) {
            throw new IllegalArgumentException("stagger_timeout cannot be negative");
        }
        if (this.num_discovery_runs < 1) {
            throw new IllegalArgumentException("num_discovery_runs must be >= 1");
        }
        this.transport_supports_multicasting = this.transport.supportsMulticasting();
        this.sends_can_block = this.transport instanceof TCP;
    }

    @Override
    public void start() throws Exception {
        super.start();
        this.timer = this.transport.getTimer();
        if (this.timer == null) {
            throw new Exception("timer cannot be retrieved from protocol stack");
        }
    }

    @Override
    public void stop() {
        this.is_coord = false;
        this.is_server = false;
        this.clearRequestFutures();
    }

    public abstract boolean isDynamic();

    public void handleDisconnect() {
    }

    public void handleConnect() {
    }

    public void discoveryRequestReceived(Address sender, String logical_name, PhysicalAddress physical_addr) {
    }

    public String getClusterName() {
        return this.cluster_name;
    }

    public <T extends Discovery> T setClusterName(String n) {
        this.cluster_name = n;
        return (T)this;
    }

    public int getNumberOfDiscoveryRequestsSent() {
        return this.num_discovery_requests;
    }

    public boolean breakOnCoordResponse() {
        return this.break_on_coord_rsp;
    }

    public <T extends Discovery> T breakOnCoordResponse(boolean flag) {
        this.break_on_coord_rsp = flag;
        return (T)this;
    }

    public boolean returnEntireCache() {
        return this.return_entire_cache;
    }

    public <T extends Discovery> T returnEntireCache(boolean flag) {
        this.return_entire_cache = flag;
        return (T)this;
    }

    public long staggerTimeout() {
        return this.stagger_timeout;
    }

    public <T extends Discovery> T staggerTimeout(long timeout) {
        this.stagger_timeout = timeout;
        return (T)this;
    }

    public boolean useDiskCache() {
        return this.use_disk_cache;
    }

    public <T extends Discovery> T useDiskCache(boolean flag) {
        this.use_disk_cache = flag;
        return (T)this;
    }

    public <T extends Discovery> T discoveryRspExpiryTime(long t2) {
        this.discovery_rsp_expiry_time = t2;
        return (T)this;
    }

    @ManagedAttribute
    public String getView() {
        return this.view != null ? this.view.getViewId().toString() : "null";
    }

    public ViewId getViewId() {
        return this.view != null ? this.view.getViewId() : null;
    }

    @ManagedAttribute(description="The address of the current coordinator")
    public String getCurrentCoord() {
        return this.current_coord != null ? this.current_coord.toString() : "n/a";
    }

    protected boolean isMergeRunning() {
        Object retval = this.up_prot.up(new Event(100));
        return retval instanceof Boolean && (Boolean)retval != false;
    }

    @ManagedOperation(description="Sends information about my cache to everyone but myself")
    public void sendCacheInformation() {
        ArrayList<Address> current_members = new ArrayList<Address>(this.view.getMembers());
        this.disseminateDiscoveryInformation(current_members, null, current_members);
    }

    @Override
    public List<Integer> providedUpServices() {
        return Arrays.asList(12, 87, 11);
    }

    @Override
    public void resetStats() {
        super.resetStats();
        this.num_discovery_requests = 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addResponse(Responses rsp) {
        Map<Long, Responses> map = this.ping_responses;
        synchronized (map) {
            this.ping_responses.put(System.nanoTime(), rsp);
        }
    }

    protected abstract void findMembers(List<Address> var1, boolean var2, Responses var3);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Responses findMembers(List<Address> members, boolean initial_discovery, boolean async, long timeout) {
        ++this.num_discovery_requests;
        int num_expected = members != null ? members.size() : 0;
        int capacity = members != null ? members.size() : 16;
        Responses rsps = new Responses(num_expected, initial_discovery && this.break_on_coord_rsp, capacity);
        this.addResponse(rsps);
        if (async || this.async_discovery || this.num_discovery_runs > 1 && initial_discovery) {
            Runnable find_method = () -> this.callFindMembersInAllDiscoveryProtocols(members, initial_discovery, rsps);
            this.timer.execute(find_method);
            if (this.num_discovery_runs > 1 && initial_discovery) {
                int num_reqs_to_send = this.num_discovery_runs - 1;
                long last_send = timeout - timeout / (long)this.num_discovery_runs;
                long interval = last_send / (long)num_reqs_to_send;
                long i = 0L;
                long delay = interval;
                while (i < (long)num_reqs_to_send) {
                    Future<?> future = this.timer.schedule(find_method, delay, TimeUnit.MILLISECONDS);
                    List<Future<?>> list = this.discovery_req_futures;
                    synchronized (list) {
                        this.discovery_req_futures.add(future);
                    }
                    ++this.num_discovery_requests;
                    ++i;
                    delay += interval;
                }
            }
        } else {
            this.callFindMembersInAllDiscoveryProtocols(members, initial_discovery, rsps);
        }
        this.weedOutCompletedDiscoveryResponses();
        return rsps;
    }

    @ManagedOperation(description="Runs the discovery protocol to find initial members")
    public String findInitialMembersAsString() {
        Responses rsps = this.findMembers(null, false, false, 0L);
        if (!rsps.isDone()) {
            rsps.waitFor(300L);
        }
        if (rsps.isEmpty()) {
            return "<empty>";
        }
        StringBuilder sb = new StringBuilder();
        for (PingData rsp : rsps) {
            sb.append(rsp).append("\n");
        }
        return sb.toString();
    }

    @ManagedOperation(description="Reads logical-physical address mappings and logical name mappings from a file (or URL) and adds them to the local caches")
    public void addToCache(String filename) throws Exception {
        InputStream in = ConfiguratorFactory.getConfigStream(filename);
        List<PingData> list = this.read(in);
        if (list != null) {
            for (PingData data : list) {
                this.addDiscoveryResponseToCaches(data.getAddress(), data.getLogicalName(), data.getPhysicalAddr());
            }
        }
    }

    @ManagedOperation(description="Reads data from local caches and dumps them to a file")
    public void dumpCache(String output_filename) throws Exception {
        Map cache_contents = (Map)this.down_prot.down(new Event(88, false));
        ArrayList<PingData> list = new ArrayList<PingData>(cache_contents.size());
        for (Map.Entry entry : cache_contents.entrySet()) {
            Address addr = (Address)entry.getKey();
            PhysicalAddress phys_addr = (PhysicalAddress)entry.getValue();
            PingData data = new PingData(addr, true, NameCache.get(addr), phys_addr).coord(addr.equals(this.local_addr));
            list.add(data);
        }
        FileOutputStream out = new FileOutputStream(output_filename);
        this.write(list, out);
    }

    @Override
    public Object up(Event evt) {
        if (evt.getType() == 11) {
            Discovery d = this.findTopmostDiscoveryProtocol();
            return d.findMembers((List)evt.getArg(), false, true, 0L);
        }
        return this.up_prot.up(evt);
    }

    @Override
    public Object up(Message msg) {
        PingHeader hdr = (PingHeader)msg.getHeader(this.id);
        if (hdr == null) {
            return this.up_prot.up(msg);
        }
        if (this.is_leaving) {
            return null;
        }
        return this.handle(hdr, msg);
    }

    @Override
    public void up(MessageBatch batch) {
        if (this.is_leaving) {
            batch.removeIf(m4 -> m4.getHeader(this.id) != null, true);
        } else {
            Iterator<Message> it = batch.iterator();
            while (it.hasNext()) {
                Message msg = it.next();
                PingHeader hdr = (PingHeader)msg.getHeader(this.id);
                if (hdr == null) continue;
                it.remove();
                this.handle(hdr, msg);
            }
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    protected Object handle(PingHeader hdr, Message msg) {
        PingData data = this.readPingData(msg.getArray(), msg.getOffset(), msg.getLength());
        Address logical_addr = data != null ? data.getAddress() : msg.getSrc();
        switch (hdr.type) {
            case 1: {
                boolean drop_because_of_rank;
                if (this.cluster_name == null || hdr.cluster_name == null) {
                    this.log.warn("cluster_name (%s) or cluster_name of header (%s) is null; passing up discovery request from %s, but this should not be the case", this.cluster_name, hdr.cluster_name, msg.getSrc());
                } else if (!this.cluster_name.equals(hdr.cluster_name)) {
                    this.log.warn("%s: discarding discovery request for cluster '%s' from %s; our cluster name is '%s'. Please separate your clusters properly", logical_addr, hdr.cluster_name, msg.getSrc(), this.cluster_name);
                    return null;
                }
                if (data != null) {
                    this.addDiscoveryResponseToCaches(logical_addr, data.getLogicalName(), data.getPhysicalAddr());
                    this.discoveryRequestReceived(msg.getSrc(), data.getLogicalName(), data.getPhysicalAddr());
                    this.addResponse(data, false);
                }
                if (this.return_entire_cache) {
                    Map cache = (Map)this.down(new Event(88));
                    if (cache != null) {
                        for (Map.Entry entry : cache.entrySet()) {
                            Address addr = (Address)entry.getKey();
                            if (!addr.equals(this.local_addr) && (this.view == null || !this.view.containsMember(addr))) continue;
                            PhysicalAddress physical_addr = (PhysicalAddress)entry.getValue();
                            this.sendDiscoveryResponse(addr, physical_addr, NameCache.get(addr), msg.getSrc(), this.isCoord(addr));
                        }
                    }
                    return null;
                }
                Collection<? extends Address> mbrs = data != null ? data.mbrs() : null;
                boolean bl = drop_because_of_rank = this.max_rank_to_reply > 0 && hdr.initialDiscovery() && Util.getRank(this.view, this.local_addr) > this.max_rank_to_reply;
                if (drop_because_of_rank || mbrs != null && !mbrs.contains(this.local_addr)) {
                    return null;
                }
                PhysicalAddress physical_addr = (PhysicalAddress)this.down(new Event(87, this.local_addr));
                this.sendDiscoveryResponse(this.local_addr, physical_addr, NameCache.get(this.local_addr), msg.getSrc(), this.is_coord);
                return null;
            }
            case 2: {
                if (data != null) {
                    Discovery d = this.findTopmostDiscoveryProtocol();
                    this.log.trace("%s: received GET_MBRS_RSP from %s: %s%s", this.local_addr, msg.getSrc(), data, d != this ? ", delivering it to " + d.getClass().getSimpleName() : "");
                    d.handleDiscoveryResponse(data, msg.getSrc());
                }
                return null;
            }
        }
        this.log.warn("got PING header with unknown type %d", hdr.type);
        return null;
    }

    protected void callFindMembersInAllDiscoveryProtocols(List<Address> mbrs, boolean initial_discovery, Responses rsps) {
        for (Discovery p = this; p != null; p = p.getDownProtocol()) {
            if (!(p instanceof Discovery)) continue;
            p.findMembers(mbrs, initial_discovery, rsps);
        }
    }

    protected Discovery findTopmostDiscoveryProtocol() {
        Discovery ret = this;
        for (Discovery p = this; p != null; p = p.getUpProtocol()) {
            if (!(p instanceof Discovery)) continue;
            ret = p;
        }
        return ret;
    }

    protected void handleDiscoveryResponse(PingData data, Address sender) {
        Address logical_addr = data.getAddress() != null ? data.getAddress() : sender;
        this.addDiscoveryResponseToCaches(logical_addr, data.getLogicalName(), data.getPhysicalAddr());
        boolean overwrite = Objects.equals(logical_addr, sender);
        this.addResponse(data, overwrite);
    }

    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 12: {
                long timeout = (Long)evt.getArg();
                return this.findMembers(null, true, false, timeout);
            }
            case 13: {
                this.discovery_rsp_callback = (Consumer)evt.arg();
                return this.findMembers(null, false, false, 0L);
            }
            case 6: {
                View old_view = this.view;
                this.view = (View)evt.getArg();
                this.current_coord = this.view.getCoord();
                this.is_coord = Objects.equals(this.current_coord, this.local_addr);
                Object retval = this.down_prot.down(evt);
                if (this.send_cache_on_join && !this.isDynamic() && this.is_coord) {
                    ArrayList<Address> curr_mbrs = new ArrayList<Address>(this.view.getMembers());
                    List<Address> left_mbrs = View.leftMembers(old_view, this.view);
                    List<Address> new_mbrs = View.newMembers(old_view, this.view);
                    this.startCacheDissemination(curr_mbrs, left_mbrs, new_mbrs);
                }
                return retval;
            }
            case 16: {
                this.down_prot.down(evt);
                this.is_server = true;
                return null;
            }
            case 2: 
            case 80: 
            case 92: 
            case 93: {
                this.is_leaving = false;
                this.cluster_name = (String)evt.getArg();
                Object ret = this.down_prot.down(evt);
                this.handleConnect();
                return ret;
            }
            case 4: {
                this.is_leaving = true;
                this.handleDisconnect();
                return this.down_prot.down(evt);
            }
        }
        return this.down_prot.down(evt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected List<PingData> read(InputStream in) {
        ArrayList<PingData> retval = null;
        try {
            block7: while (true) {
                try {
                    while (true) {
                        boolean is_coordinator;
                        String name_str = Util.readToken(in);
                        String uuid_str = Util.readToken(in);
                        String addr_str = Util.readToken(in);
                        String coord_str = Util.readToken(in);
                        if (name_str == null || uuid_str == null || addr_str == null || coord_str == null) break block7;
                        UUID uuid = null;
                        try {
                            long tmp = Long.parseLong(uuid_str);
                            uuid = new UUID(0L, tmp);
                        }
                        catch (Throwable t2) {
                            uuid = UUID.fromString(uuid_str);
                        }
                        IpAddress phys_addr = new IpAddress(addr_str);
                        boolean bl = is_coordinator = coord_str.trim().equals("T") || coord_str.trim().equals("t");
                        if (retval == null) {
                            retval = new ArrayList<PingData>();
                        }
                        retval.add(new PingData(uuid, true, name_str, phys_addr).coord(is_coordinator));
                    }
                }
                catch (Throwable t3) {
                    this.log.error(Util.getMessage("FailedReadingLineOfInputStream"), t3);
                    continue;
                }
                break;
            }
            ArrayList<PingData> arrayList = retval;
            return arrayList;
        }
        finally {
            Util.close((Closeable)in);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void write(List<PingData> list, OutputStream out) throws Exception {
        try {
            for (PingData data : list) {
                String logical_name = data.getLogicalName();
                Address addr = data.getAddress();
                PhysicalAddress phys_addr = data.getPhysicalAddr();
                if (logical_name == null || addr == null || phys_addr == null) continue;
                out.write(logical_name.getBytes());
                out.write(WHITESPACE);
                out.write(Discovery.addressAsString(addr).getBytes());
                out.write(WHITESPACE);
                out.write(phys_addr.toString().getBytes());
                out.write(WHITESPACE);
                out.write(data.isCoord() ? String.format("T%n", new Object[0]).getBytes() : String.format("F%n", new Object[0]).getBytes());
            }
        }
        finally {
            Util.close((Closeable)out);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addResponse(PingData rsp, boolean overwrite) {
        if (this.discovery_rsp_callback != null) {
            try {
                this.discovery_rsp_callback.accept(rsp);
            }
            catch (Throwable t2) {
                this.log.error("%s: failed invoking callback for discovery response: %s", this.local_addr, t2);
            }
        }
        Map<Long, Responses> map = this.ping_responses;
        synchronized (map) {
            Iterator<Map.Entry<Long, Responses>> it = this.ping_responses.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Long, Responses> entry = it.next();
                long timestamp = entry.getKey();
                Responses rsps = entry.getValue();
                rsps.addResponse(rsp, overwrite);
                if (!rsps.isDone() && TimeUnit.MILLISECONDS.convert(System.nanoTime() - timestamp, TimeUnit.NANOSECONDS) <= this.discovery_rsp_expiry_time) continue;
                it.remove();
                rsps.done();
                this.clearRequestFutures();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ManagedOperation(description="Removes expired or completed responses")
    public void weedOutCompletedDiscoveryResponses() {
        Map<Long, Responses> map = this.ping_responses;
        synchronized (map) {
            Iterator<Map.Entry<Long, Responses>> it = this.ping_responses.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Long, Responses> entry = it.next();
                long timestamp = entry.getKey();
                Responses rsps = entry.getValue();
                if (!rsps.isDone() && TimeUnit.MILLISECONDS.convert(System.nanoTime() - timestamp, TimeUnit.NANOSECONDS) <= this.discovery_rsp_expiry_time) continue;
                it.remove();
                rsps.done();
                this.clearRequestFutures();
            }
        }
    }

    protected boolean addDiscoveryResponseToCaches(Address mbr, String logical_name, PhysicalAddress physical_addr) {
        if (mbr == null) {
            return false;
        }
        if (logical_name != null) {
            NameCache.add(mbr, logical_name);
        }
        if (physical_addr != null) {
            return (Boolean)this.down(new Event(89, new Tuple<Address, PhysicalAddress>(mbr, physical_addr)));
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void clearRequestFutures() {
        List<Future<?>> list = this.discovery_req_futures;
        synchronized (list) {
            this.discovery_req_futures.forEach(f -> f.cancel(true));
            this.discovery_req_futures.clear();
        }
    }

    protected synchronized void startCacheDissemination(List<Address> curr_mbrs, List<Address> left_mbrs, List<Address> new_mbrs) {
        this.timer.execute(new DiscoveryCacheDisseminationTask(curr_mbrs, left_mbrs, new_mbrs), this.sends_can_block);
    }

    protected byte[] serializeWithoutView(PingData data) {
        PingData clone = new PingData(data.getAddress(), data.isServer(), data.getLogicalName(), data.getPhysicalAddr()).coord(data.isCoord());
        try {
            return Util.streamableToByteBuffer(clone);
        }
        catch (Exception e) {
            this.log.error(Util.getMessage("ErrorSerializingPingData"), e);
            return null;
        }
    }

    protected static PingData deserialize(byte[] data) throws Exception {
        return Util.streamableFromByteBuffer(PingData::new, data);
    }

    public static ByteArray marshal(PingData data) {
        try {
            return Util.streamableToBuffer(data);
        }
        catch (Exception e) {
            return null;
        }
    }

    protected PingData readPingData(byte[] buffer, int offset, int length) {
        try {
            return buffer != null ? Util.streamableFromBuffer(PingData::new, buffer, offset, length) : null;
        }
        catch (Exception ex) {
            this.log.error("%s: failed reading PingData from message: %s", this.local_addr, ex);
            return null;
        }
    }

    protected void sendDiscoveryResponse(Address logical_addr, PhysicalAddress physical_addr, String logical_name, Address sender, boolean coord) {
        PingData data = new PingData(logical_addr, this.is_server, logical_name, physical_addr).coord(coord);
        Message rsp_msg = new BytesMessage(sender).setFlag(Message.Flag.OOB, Message.Flag.DONT_BUNDLE).putHeader(this.id, new PingHeader(2)).setArray(Discovery.marshal(data));
        if (this.stagger_timeout > 0L) {
            int view_size = this.view != null ? this.view.size() : 10;
            int rank = Util.getRank(this.view, this.local_addr);
            long sleep_time = rank == 0 ? Util.random(this.stagger_timeout) : this.stagger_timeout * (long)rank / (long)view_size - this.stagger_timeout / (long)view_size;
            this.timer.schedule(() -> {
                this.log.trace("%s: received GET_MBRS_REQ from %s, sending staggered response %s", this.local_addr, sender, data);
                this.down_prot.down(rsp_msg);
            }, sleep_time, TimeUnit.MILLISECONDS, this.sends_can_block);
            return;
        }
        this.log.trace("%s: received GET_MBRS_REQ from %s, sending response %s", this.local_addr, sender, data);
        this.down_prot.down(rsp_msg);
    }

    protected static String addressAsString(Address address) {
        if (address == null) {
            return "";
        }
        if (address instanceof UUID) {
            return ((UUID)address).toStringLong();
        }
        return address.toString();
    }

    protected boolean isCoord(Address member) {
        return member.equals(this.current_coord);
    }

    protected void disseminateDiscoveryInformation(List<Address> current_mbrs, List<Address> left_mbrs, List<Address> new_mbrs) {
        if (new_mbrs == null || new_mbrs.isEmpty()) {
            return;
        }
        if (this.local_addr != null) {
            current_mbrs.remove(this.local_addr);
        }
        if (left_mbrs != null) {
            current_mbrs.removeAll(left_mbrs);
        }
        HashSet<Address> info = new HashSet<Address>(current_mbrs);
        for (Address addr : info) {
            PhysicalAddress phys_addr = (PhysicalAddress)this.down_prot.down(new Event(87, addr));
            if (phys_addr == null) continue;
            boolean is_coordinator = this.isCoord(addr);
            for (Address target : new_mbrs) {
                this.sendDiscoveryResponse(addr, phys_addr, NameCache.get(addr), target, is_coordinator);
            }
        }
        HashSet<Address> targets = new HashSet<Address>(current_mbrs);
        targets.removeAll(new_mbrs);
        if (!targets.isEmpty()) {
            for (Address addr : new_mbrs) {
                PhysicalAddress phys_addr = (PhysicalAddress)this.down_prot.down(new Event(87, addr));
                if (phys_addr == null) continue;
                boolean is_coordinator = this.isCoord(addr);
                for (Address target : targets) {
                    this.sendDiscoveryResponse(addr, phys_addr, NameCache.get(addr), target, is_coordinator);
                }
            }
        }
    }

    protected class DiscoveryCacheDisseminationTask
    implements Runnable {
        protected final List<Address> curr_mbrs;
        protected final List<Address> left_mbrs;
        protected final List<Address> new_mbrs;

        public DiscoveryCacheDisseminationTask(List<Address> curr_mbrs, List<Address> left_mbrs, List<Address> new_mbrs) {
            this.curr_mbrs = curr_mbrs;
            this.left_mbrs = left_mbrs;
            this.new_mbrs = new_mbrs;
        }

        @Override
        public void run() {
            Discovery.this.disseminateDiscoveryInformation(this.curr_mbrs, this.left_mbrs, this.new_mbrs);
        }
    }
}

