package dragon.network.messages.service.termtopo;

import dragon.DragonInvalidStateException;
import dragon.network.DragonTopologyException;
import dragon.network.Node;
import dragon.network.comms.IComms;
import dragon.network.messages.service.ServiceMessage;
import dragon.network.operations.DragonInvalidContext;
import dragon.network.operations.Ops;
import dragon.network.operations.RemoveTopoGroupOp;
import dragon.network.operations.TermTopoGroupOp;
import dragon.topology.DragonTopology;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:dragon/network/messages/service/termtopo/TermTopoSMsg.class */
public class TermTopoSMsg extends ServiceMessage {
    private static final long serialVersionUID = -7620075913134267391L;
    private static final Logger log = LogManager.getLogger(TermTopoSMsg.class);
    public String topologyId;
    public boolean purge;

    public TermTopoSMsg(String str, boolean z) {
        super(ServiceMessage.ServiceMessageType.TERMINATE_TOPOLOGY);
        this.topologyId = str;
        this.purge = z;
    }

    @Override // dragon.network.messages.Message
    public void process() {
        Node inst = Node.inst();
        IComms comms = inst.getComms();
        if (!inst.getLocalClusters().containsKey(this.topologyId)) {
            client(new TermTopoErrorSMsg(this.topologyId, "topology does not exist"));
            return;
        }
        DragonTopology topology = inst.getLocalClusters().get(this.topologyId).getTopology();
        if (!this.purge) {
            try {
                Ops.inst().newTermTopoGroupOp(this.topologyId, op -> {
                    progress("stopping spouts and waiting up to [" + (inst.getConf().getDragonServiceTimeoutMs() / 1000) + "] seconds for bolts to finish...");
                }, op2 -> {
                    try {
                        Ops.inst().newRemoveTopoGroupOp(this, topology, op2 -> {
                            progress("waiting for up to [" + (inst.getConf().getDragonServiceTimeoutMs() / 1000) + "] seconds...");
                        }, op3 -> {
                            client(new TopoTermdSMsg(this.topologyId));
                        }, (op4, str) -> {
                            client(new TermTopoErrorSMsg(this.topologyId, str));
                        }).onRunning(op5 -> {
                            try {
                                inst.removeTopo(this.topologyId, this.purge);
                                ((RemoveTopoGroupOp) op5).receiveSuccess(comms.getMyNodeDesc());
                            } catch (DragonTopologyException e) {
                                ((RemoveTopoGroupOp) op5).receiveError(comms.getMyNodeDesc(), e.getMessage());
                            }
                            progress("removing topology from memory");
                        }).onTimeout(inst.getTimer(), inst.getConf().getDragonServiceTimeoutMs(), TimeUnit.MILLISECONDS, op6 -> {
                            op6.fail("timed out removing topology from memory");
                        });
                    } catch (DragonInvalidContext e) {
                        client(new TermTopoErrorSMsg(this.topologyId, e.getMessage()));
                    }
                }, (op3, str) -> {
                    client(new TermTopoErrorSMsg(this.topologyId, str));
                }).onRunning(op4 -> {
                    try {
                        inst.terminateTopology(this.topologyId, (TermTopoGroupOp) op4);
                    } catch (DragonInvalidStateException | DragonTopologyException e) {
                        ((TermTopoGroupOp) op4).fail(e.getMessage());
                    }
                }).onTimeout(inst.getTimer(), inst.getConf().getDragonServiceTimeoutMs(), TimeUnit.MILLISECONDS, op5 -> {
                    progress("topology is not finishing, possibly mis-behaving user code... will try purging...");
                    op5.cancel();
                    this.purge = true;
                    try {
                        Ops.inst().newRemoveTopoGroupOp(this, topology, op5 -> {
                            progress("waiting for up to [" + (inst.getConf().getDragonServiceTimeoutMs() / 1000) + "] seconds...");
                        }, op6 -> {
                            client(new TopoTermdSMsg(this.topologyId));
                        }, (op7, str2) -> {
                            client(new TermTopoErrorSMsg(this.topologyId, str2));
                        }).onRunning(op8 -> {
                            try {
                                inst.removeTopo(this.topologyId, false);
                                ((RemoveTopoGroupOp) op8).receiveSuccess(comms.getMyNodeDesc());
                            } catch (DragonTopologyException e) {
                                ((RemoveTopoGroupOp) op8).receiveError(comms.getMyNodeDesc(), e.getMessage());
                            }
                            progress("removing topology from memory");
                        }).onTimeout(inst.getTimer(), inst.getConf().getDragonServiceTimeoutMs(), TimeUnit.MILLISECONDS, op9 -> {
                            op9.fail("timed out removing topology from memory");
                        });
                    } catch (DragonInvalidContext e) {
                        log.error("should not have an invalid context when purging: " + e.getMessage());
                        e.printStackTrace();
                        client(new TermTopoErrorSMsg(this.topologyId, e.getMessage()));
                    }
                });
                return;
            } catch (DragonInvalidContext e) {
                client(new TermTopoErrorSMsg(this.topologyId, e.getMessage()));
                return;
            }
        }
        try {
            Ops.inst().newRemoveTopoGroupOp(this, topology, op6 -> {
                progress("waiting for up to [" + (inst.getConf().getDragonServiceTimeoutMs() / 1000) + "] seconds...");
            }, op7 -> {
                client(new TopoTermdSMsg(this.topologyId));
            }, (op8, str2) -> {
                client(new TermTopoErrorSMsg(this.topologyId, str2));
            }).onRunning(op9 -> {
                try {
                    inst.removeTopo(this.topologyId, this.purge);
                    ((RemoveTopoGroupOp) op9).receiveSuccess(comms.getMyNodeDesc());
                } catch (DragonTopologyException e2) {
                    ((RemoveTopoGroupOp) op9).receiveError(comms.getMyNodeDesc(), e2.getMessage());
                }
            }).onTimeout(inst.getTimer(), inst.getConf().getDragonServiceTimeoutMs(), TimeUnit.MILLISECONDS, op10 -> {
                op10.fail("timed out removing topology from memory, possibly some machines are overloaded");
            });
        } catch (DragonInvalidContext e2) {
            log.error("should not have an invalid context when purging: " + e2.getMessage());
            e2.printStackTrace();
            client(new TermTopoErrorSMsg(this.topologyId, e2.getMessage()));
        }
    }
}
