/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.controller.messagebus.app.impl;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.Identifier;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventSourceTopic
implements DataTreeChangeListener<Node>,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
    private final NotificationPattern notificationPattern;
    private final EventSourceService sourceService;
    private final Pattern nodeIdPattern;
    private final TopicId topicId;
    private ListenerRegistration<?> listenerRegistration;
    private final CopyOnWriteArraySet<InstanceIdentifier<?>> joinedEventSources = new CopyOnWriteArraySet();

    public static EventSourceTopic create(NotificationPattern notificationPattern, String nodeIdRegexPattern, EventSourceTopology eventSourceTopology) {
        EventSourceTopic est = new EventSourceTopic(notificationPattern, nodeIdRegexPattern, eventSourceTopology.getEventSourceService());
        est.registerListner(eventSourceTopology);
        est.notifyExistingNodes(eventSourceTopology);
        return est;
    }

    private EventSourceTopic(NotificationPattern notificationPattern, String nodeIdRegexPattern, EventSourceService sourceService) {
        this.notificationPattern = (NotificationPattern)Preconditions.checkNotNull((Object)notificationPattern);
        this.sourceService = (EventSourceService)Preconditions.checkNotNull((Object)sourceService);
        this.nodeIdPattern = Pattern.compile(nodeIdRegexPattern);
        this.topicId = new TopicId(EventSourceTopic.getUUIDIdent());
        this.listenerRegistration = null;
        LOG.info("EventSourceTopic created - topicId {}", (Object)this.topicId.getValue());
    }

    public TopicId getTopicId() {
        return this.topicId;
    }

    public void onDataTreeChanged(Collection<DataTreeModification<Node>> changes) {
        for (DataTreeModification<Node> change : changes) {
            DataObjectModification rootNode = change.getRootNode();
            switch (rootNode.getModificationType()) {
                case WRITE: 
                case SUBTREE_MODIFIED: {
                    Node node = (Node)rootNode.getDataAfter();
                    if (!this.getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) break;
                    this.notifyNode(change.getRootPath().getRootIdentifier());
                    break;
                }
            }
        }
    }

    public void notifyNode(InstanceIdentifier<?> nodeId) {
        LOG.debug("Notify node: {}", nodeId);
        try {
            RpcResult rpcResultJoinTopic = (RpcResult)this.sourceService.joinTopic(this.getJoinTopicInputArgument(nodeId)).get();
            if (!rpcResultJoinTopic.isSuccessful()) {
                for (RpcError err : rpcResultJoinTopic.getErrors()) {
                    LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}", new Object[]{this.getTopicId().getValue(), nodeId.toString(), err.toString()});
                }
            } else {
                this.joinedEventSources.add(nodeId);
            }
        }
        catch (Exception e) {
            LOG.error("Could not invoke join topic for node {}", nodeId);
        }
    }

    private void notifyExistingNodes(EventSourceTopology eventSourceTopology) {
        LOG.debug("Notify existing nodes");
        final Pattern nodeRegex = this.nodeIdPattern;
        final ReadOnlyTransaction tx = eventSourceTopology.getDataBroker().newReadOnlyTransaction();
        CheckedFuture future = tx.read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH);
        Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<Optional<Topology>>(){

            public void onSuccess(Optional<Topology> data) {
                List nodes;
                if (data.isPresent() && (nodes = ((Topology)data.get()).getNode()) != null) {
                    for (Node node : nodes) {
                        if (!nodeRegex.matcher(node.getNodeId().getValue()).matches()) continue;
                        EventSourceTopic.this.notifyNode((InstanceIdentifier<?>)EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, (Identifier)node.getKey()));
                    }
                }
                tx.close();
            }

            public void onFailure(Throwable t) {
                LOG.error("Can not notify existing nodes", t);
                tx.close();
            }
        });
    }

    private JoinTopicInput getJoinTopicInputArgument(InstanceIdentifier<?> path) {
        NodeRef nodeRef = new NodeRef(path);
        JoinTopicInput jti = new JoinTopicInputBuilder().setNode(nodeRef.getValue()).setTopicId(this.topicId).setNotificationPattern(this.notificationPattern).build();
        return jti;
    }

    public Pattern getNodeIdRegexPattern() {
        return this.nodeIdPattern;
    }

    private DisJoinTopicInput getDisJoinTopicInputArgument(InstanceIdentifier<?> eventSourceNodeId) {
        NodeRef nodeRef = new NodeRef(eventSourceNodeId);
        DisJoinTopicInput dji = new DisJoinTopicInputBuilder().setNode(nodeRef.getValue()).setTopicId(this.topicId).build();
        return dji;
    }

    private void registerListner(EventSourceTopology eventSourceTopology) {
        this.listenerRegistration = eventSourceTopology.getDataBroker().registerDataTreeChangeListener(new DataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class)), (DataTreeChangeListener)this);
    }

    @Override
    public void close() {
        if (this.listenerRegistration != null) {
            this.listenerRegistration.close();
        }
        for (InstanceIdentifier<?> eventSourceNodeId : this.joinedEventSources) {
            try {
                RpcResult result = (RpcResult)this.sourceService.disJoinTopic(this.getDisJoinTopicInputArgument(eventSourceNodeId)).get();
                if (result.isSuccessful()) continue;
                for (RpcError err : result.getErrors()) {
                    LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}", new Object[]{this.getTopicId().getValue(), eventSourceNodeId, err.toString()});
                }
            }
            catch (InterruptedException | ExecutionException ex) {
                LOG.error("Can not close event source topic / destroy topic {} on node {}.", new Object[]{this.topicId.getValue(), eventSourceNodeId, ex});
            }
        }
        this.joinedEventSources.clear();
    }

    private static String getUUIDIdent() {
        UUID uuid = UUID.randomUUID();
        return uuid.toString();
    }
}

