package dragon.topology.base;

import dragon.Constants;
import dragon.LocalCluster;
import dragon.grouping.AbstractGrouping;
import dragon.network.Router;
import dragon.topology.DestComponentMap;
import dragon.topology.GroupingsSet;
import dragon.topology.StreamMap;
import dragon.tuple.Fields;
import dragon.tuple.NetworkTask;
import dragon.tuple.Tuple;
import dragon.tuple.Values;
import dragon.utils.ComponentTaskBuffer;
import dragon.utils.NetworkTaskBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:dragon/topology/base/Collector.class */
public class Collector {
    private static final Logger log = LogManager.getLogger((Class<?>) Collector.class);
    private final ComponentTaskBuffer outputQueues;
    private final LocalCluster localCluster;
    private final Component component;
    private final int totalBufferSpace;
    private boolean emitted;
    private final Router router;
    private final HashSet<Integer> doneTaskIds;
    private final long linger_ms;
    private final int bundleSize;
    private long nextExpire;
    private HashMap<String, HashMap<String, HashMap<HashSet<Integer>, TupleBundle>>> bundleMap;
    private PriorityQueue<TupleBundle> bundleQueue;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dragon/topology/base/Collector$TupleBundle.class */
    public class TupleBundle {
        public Long expireTime;
        public Tuple[] tuples;
        public int size = 0;
        public String componentId;
        public String streamId;
        public HashSet<Integer> taskIds;

        public TupleBundle(String str, String str2, HashSet<Integer> hashSet) {
            this.expireTime = Long.valueOf(Instant.now().toEpochMilli() + Collector.this.linger_ms);
            if (Collector.this.nextExpire > this.expireTime.longValue()) {
                Collector.this.nextExpire = this.expireTime.longValue();
            }
            this.tuples = new Tuple[Collector.this.bundleSize];
            this.componentId = str;
            this.streamId = str2;
            this.taskIds = hashSet;
        }

        public void add(Tuple tuple) {
            Tuple[] tupleArr = this.tuples;
            int i = this.size;
            this.size = i + 1;
            tupleArr[i] = tuple;
        }
    }

    public Collector(Component component, LocalCluster localCluster, int i) {
        this.component = component;
        this.localCluster = localCluster;
        if (localCluster.getNode() != null) {
            this.router = localCluster.getNode().getRouter();
        } else {
            this.router = null;
        }
        this.linger_ms = localCluster.getConf().getDragonTupleBundleLingerMS();
        this.bundleSize = localCluster.getConf().getDragonTupleBundleSize();
        this.outputQueues = new ComponentTaskBuffer(i);
        this.bundleMap = new HashMap<>();
        this.nextExpire = Instant.now().toEpochMilli() + this.linger_ms;
        this.bundleQueue = new PriorityQueue<>(new Comparator<TupleBundle>() { // from class: dragon.topology.base.Collector.1
            @Override // java.util.Comparator
            public int compare(TupleBundle tupleBundle, TupleBundle tupleBundle2) {
                return tupleBundle.expireTime.compareTo(tupleBundle2.expireTime);
            }
        });
        DestComponentMap destComponentMap = localCluster.getTopology().getDestComponentMap(component.getComponentId());
        int i2 = 0;
        if (destComponentMap != null) {
            for (String str : destComponentMap.keySet()) {
                this.bundleMap.put(str, new HashMap<>());
                for (String str2 : ((StreamMap) destComponentMap.get(str)).keySet()) {
                    this.outputQueues.create(str, str2);
                    this.bundleMap.get(str).put(str2, new HashMap<>());
                    i2 += i;
                }
            }
        }
        this.totalBufferSpace = i2;
        this.doneTaskIds = new HashSet<>();
    }

    public int getTotalBufferSpace() {
        return this.totalBufferSpace;
    }

    public NetworkTaskBuffer getQueue(String str, String str2) {
        return this.outputQueues.get(str).get(str2);
    }

    public ComponentTaskBuffer getComponentTaskBuffer() {
        return this.outputQueues;
    }

    @Deprecated
    public synchronized List<Integer> emit(Tuple tuple, Values values) {
        return emit(values);
    }

    @Deprecated
    public synchronized List<Integer> emit(String str, Tuple tuple, Values values) {
        return emit(str, values);
    }

    public synchronized List<Integer> emit(Values values) {
        return emit(Constants.DEFAULT_STREAM, values);
    }

    public synchronized void expireTupleBundles() {
        long epochMilli = Instant.now().toEpochMilli();
        while (this.bundleQueue.size() > 0 && this.bundleQueue.peek().expireTime.longValue() <= epochMilli) {
            TupleBundle poll = this.bundleQueue.poll();
            transmit(poll.tuples, poll.taskIds, poll.componentId, poll.streamId);
            this.bundleMap.get(poll.componentId).get(poll.streamId).remove(poll.taskIds);
        }
        if (this.bundleQueue.size() > 0) {
            this.nextExpire = this.bundleQueue.peek().expireTime.longValue();
        } else {
            this.nextExpire = epochMilli + this.linger_ms;
        }
    }

    public synchronized void expireAllTupleBundles() {
        log.debug("expiring all tuple bundles");
        while (this.bundleQueue.size() > 0) {
            TupleBundle poll = this.bundleQueue.poll();
            transmit(poll.tuples, poll.taskIds, poll.componentId, poll.streamId);
            this.bundleMap.get(poll.componentId).get(poll.streamId).remove(poll.taskIds);
        }
        this.nextExpire = Instant.now().toEpochMilli() + this.linger_ms;
    }

    private synchronized void expireAllUpTo(TupleBundle tupleBundle) {
        TupleBundle poll;
        do {
            poll = this.bundleQueue.poll();
            transmit(poll.tuples, poll.taskIds, poll.componentId, poll.streamId);
            this.bundleMap.get(poll.componentId).get(poll.streamId).remove(poll.taskIds);
        } while (poll != tupleBundle);
        this.nextExpire = Instant.now().toEpochMilli() + this.linger_ms;
    }

    private void transmit(Tuple[] tupleArr, HashSet<Integer> hashSet, String str, String str2) {
        HashSet<Integer> hashSet2 = new HashSet<>();
        Iterator<Integer> it = hashSet.iterator();
        while (it.hasNext()) {
            Integer next = it.next();
            if (!this.localCluster.getBolts().containsKey(str) || !this.localCluster.getBolts().get(str).containsKey(next)) {
                hashSet2.add(next);
            }
        }
        HashSet<Integer> hashSet3 = new HashSet<>(hashSet);
        hashSet3.removeAll(hashSet2);
        if (!hashSet2.isEmpty()) {
            NetworkTask networkTask = new NetworkTask();
            networkTask.init(tupleArr, hashSet2, str, this.localCluster.getTopologyId());
            try {
                this.router.put(networkTask);
            } catch (InterruptedException e) {
                log.info("interrupted");
                return;
            }
        }
        if (hashSet3.isEmpty()) {
            return;
        }
        NetworkTaskBuffer queue = getQueue(str, str2);
        boolean isEmpty = queue.isEmpty();
        HashMap<Integer, Bolt> hashMap = this.localCluster.getBolts().get(str);
        if (isEmpty) {
            this.doneTaskIds.clear();
            Iterator<Integer> it2 = hashSet3.iterator();
            while (it2.hasNext()) {
                Integer next2 = it2.next();
                if (hashMap.get(next2).getInputCollector().getQueue().offer(tupleArr)) {
                    this.doneTaskIds.add(next2);
                }
            }
            hashSet3.removeAll(this.doneTaskIds);
        }
        if (hashSet3.isEmpty()) {
            return;
        }
        NetworkTask networkTask2 = new NetworkTask();
        networkTask2.init(tupleArr, hashSet3, str, this.localCluster.getTopologyId());
        try {
            queue.put(networkTask2);
            if (queue.size() == 1) {
                this.localCluster.outputPending(queue);
            }
        } catch (InterruptedException e2) {
            log.info("interrupted");
        }
    }

    private void transmit(Tuple tuple, List<Integer> list, String str, String str2) {
        TupleBundle tupleBundle;
        HashSet<Integer> hashSet = new HashSet<>(list);
        if (this.bundleMap.get(str).get(str2).containsKey(hashSet)) {
            tupleBundle = this.bundleMap.get(str).get(str2).get(hashSet);
        } else {
            tupleBundle = new TupleBundle(str, str2, hashSet);
            this.bundleMap.get(str).get(str2).put(hashSet, tupleBundle);
            this.bundleQueue.add(tupleBundle);
        }
        tupleBundle.add(tuple);
        if (tupleBundle.size == tupleBundle.tuples.length) {
            expireAllUpTo(tupleBundle);
        }
    }

    public synchronized List<Integer> emit(String str, Values values) {
        ArrayList arrayList = new ArrayList();
        if (this.component.isClosed()) {
            log.error("spontaneous tuple emission after close, topology may not terminate properly");
            return arrayList;
        }
        Fields fields = this.component.getOutputFieldsDeclarer().getFields(str);
        if (fields == null) {
            throw new DragonEmitRuntimeException("no fields have been declared for [" + this.component.getComponentId() + "] on stream [" + str + "] however it is attempting to emit on that stream");
        }
        if (values.size() != fields.getFieldNames().length) {
            throw new DragonEmitRuntimeException("the number of values in [" + values + "] does not match the number of fields [" + fields.getFieldNamesAsString() + "]");
        }
        Tuple tuple = new Tuple();
        tuple.setFields(fields.copy());
        tuple.setValues(values);
        tuple.setSourceComponent(this.component.getComponentId());
        tuple.setSourceTaskId(Integer.valueOf(this.component.getTaskId()));
        tuple.setSourceStreamId(str);
        this.component.incEmitted(1L);
        this.localCluster.getTopology().getComponentDestSet(this.component.getComponentId(), str).forEach((str2, groupingsSet) -> {
            groupingsSet.forEach(abstractGrouping -> {
                List<Integer> chooseTasks = abstractGrouping.chooseTasks(this.component.getTaskId(), values);
                arrayList.addAll(chooseTasks);
                this.component.incTransferred(arrayList.size());
                transmit(tuple, chooseTasks, str2, str);
            });
        });
        setEmit();
        return arrayList;
    }

    public synchronized void emitDirect(int i, Values values) {
        emitDirect(i, Constants.DEFAULT_STREAM, values);
    }

    public synchronized void emitDirect(int i, String str, Values values) {
        Fields fieldsDirect = this.component.getOutputFieldsDeclarer().getFieldsDirect(str);
        if (this.component.isClosed()) {
            log.error("spontaneous tuple emission after close, topology may not terminate properly");
            return;
        }
        if (fieldsDirect == null) {
            throw new DragonEmitRuntimeException("no fields have been declared for [" + this.component.getComponentId() + "] on stream [" + str + "] however it is attempting to emit on that stream");
        }
        if (values.size() != fieldsDirect.getFieldNames().length) {
            throw new DragonEmitRuntimeException("the number of values in [" + values + "] does not match the number of fields [" + fieldsDirect.getFieldNamesAsString() + "]");
        }
        Tuple tuple = new Tuple();
        tuple.setFields(fieldsDirect.copy());
        tuple.setValues(values);
        tuple.setSourceComponent(this.component.getComponentId());
        tuple.setSourceComponent(this.component.getComponentId());
        tuple.setSourceTaskId(Integer.valueOf(this.component.getTaskId()));
        tuple.setSourceStreamId(str);
        this.component.incEmitted(1L);
        this.localCluster.getTopology().getComponentDestSet(this.component.getComponentId(), str).forEach((str2, groupingsSet) -> {
            ArrayList arrayList = new ArrayList();
            arrayList.add(Integer.valueOf(i));
            this.component.incTransferred(1L);
            transmit(tuple, arrayList, str2, str);
        });
        setEmit();
    }

    @Deprecated
    public synchronized void emitDirect(int i, String str, Tuple tuple, Values values) {
        emitDirect(i, Constants.DEFAULT_STREAM, values);
    }

    public void resetEmit() {
        this.emitted = false;
    }

    public boolean didEmit() {
        return this.emitted;
    }

    public void setEmit() {
        this.emitted = true;
    }

    public void emitTerminateTuple() {
        if (this.localCluster.getTopology().getTopology().get(this.component.getComponentId()) == null) {
            return;
        }
        for (String str : this.localCluster.getTopology().getTopology().get(this.component.getComponentId()).keySet()) {
            StreamMap streamMap = this.localCluster.getTopology().getTopology().get(this.component.getComponentId()).get(str);
            for (String str2 : streamMap.keySet()) {
                GroupingsSet groupingsSet = streamMap.get(Constants.SYSTEM_STREAM_ID);
                Tuple tuple = new Tuple();
                tuple.setFields(new Fields(Constants.SYSTEM_TUPLE_FIELDS));
                tuple.setSourceComponent(this.component.getComponentId());
                tuple.setSourceStreamId(str2);
                tuple.setSourceTaskId(Integer.valueOf(this.component.getTaskId()));
                tuple.setType(Tuple.Type.TERMINATE);
                Iterator<AbstractGrouping> it = groupingsSet.iterator();
                while (it.hasNext()) {
                    transmit(tuple, it.next().chooseTasks(this.component.getTaskId(), null), str, str2);
                }
            }
        }
    }

    public long getLinger_ms() {
        return this.linger_ms;
    }

    public int getBundleSize() {
        return this.bundleSize;
    }

    public long getNextExpire() {
        return this.nextExpire;
    }
}
