package fr.boreal.forward_chaining.chase.rule_applier;

import fr.boreal.forward_chaining.chase.RuleApplicationStepResult;
import fr.boreal.forward_chaining.chase.rule_applier.body_to_query_transformer.BodyToQueryTransformer;
import fr.boreal.forward_chaining.chase.rule_applier.trigger_applier.TriggerApplier;
import fr.boreal.forward_chaining.chase.rule_applier.trigger_checker.TriggerChecker;
import fr.boreal.forward_chaining.chase.rule_applier.trigger_computer.TriggerComputer;
import fr.boreal.model.formula.api.FOFormula;
import fr.boreal.model.kb.api.FactBase;
import fr.boreal.model.logicalElements.api.Atom;
import fr.boreal.model.logicalElements.api.Substitution;
import fr.boreal.model.query.api.FOQuery;
import fr.boreal.model.rule.api.FORule;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:fr/boreal/forward_chaining/chase/rule_applier/MultiThreadRuleApplier.class */
public class MultiThreadRuleApplier extends AbstractRuleApplier {
    private Map<FORule, BlockingQueue<Substitution>> checkingQueues;
    private Map<FORule, BlockingQueue<Substitution>> applicationQueues;
    private Map<FORule, Boolean> producerFinished;
    private Map<FORule, Boolean> checkingFinished;
    private static final int MAX_WORKERS = 32;

    public MultiThreadRuleApplier(BodyToQueryTransformer bodyToQueryTransformer, TriggerComputer triggerComputer, TriggerChecker triggerChecker, TriggerApplier triggerApplier) {
        super(bodyToQueryTransformer, triggerComputer, triggerChecker, triggerApplier);
    }

    @Override // fr.boreal.forward_chaining.chase.rule_applier.RuleApplier
    public RuleApplicationStepResult apply(Collection<FORule> collection, FactBase factBase) {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Math.min(MAX_WORKERS, collection.size()));
        ExecutorService newFixedThreadPool2 = Executors.newFixedThreadPool(Math.min(MAX_WORKERS, collection.size()));
        this.checkingQueues = new ConcurrentHashMap();
        this.applicationQueues = new ConcurrentHashMap();
        this.producerFinished = new ConcurrentHashMap();
        this.checkingFinished = new ConcurrentHashMap();
        Map<FOQuery<?>, Collection<FORule>> groupRulesByBodyQuery = groupRulesByBodyQuery(collection);
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        ConcurrentHashMap.KeySetView newKeySet2 = ConcurrentHashMap.newKeySet();
        for (FORule fORule : collection) {
            this.checkingQueues.put(fORule, new LinkedBlockingQueue());
            this.applicationQueues.put(fORule, new LinkedBlockingQueue());
            this.producerFinished.put(fORule, false);
            this.checkingFinished.put(fORule, false);
        }
        for (FOQuery<?> fOQuery : groupRulesByBodyQuery.keySet()) {
            newCachedThreadPool.submit(() -> {
                processBody(fOQuery, factBase, (Collection) groupRulesByBodyQuery.get(fOQuery));
            });
        }
        for (FORule fORule2 : collection) {
            newFixedThreadPool.submit(() -> {
                processChecking(factBase, fORule2);
            });
            newFixedThreadPool2.submit(() -> {
                processApplication(factBase, newKeySet2, newKeySet, fORule2);
            });
        }
        newCachedThreadPool.shutdown();
        try {
            if (!newCachedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
                throw new RuntimeException("Timeout waiting for producers to finish.");
            }
            shutdownAndAwaitTermination(newFixedThreadPool);
            shutdownAndAwaitTermination(newFixedThreadPool2);
            return new RuleApplicationStepResult(newKeySet, newKeySet2);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Producer execution interrupted.", e);
        }
    }

    private void processBody(FOQuery fOQuery, FactBase factBase, Collection<FORule> collection) {
        this.computer.compute(fOQuery, factBase).forEachRemaining(substitution -> {
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                try {
                    this.checkingQueues.get((FORule) it.next()).put(substitution);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        Iterator<FORule> it = collection.iterator();
        while (it.hasNext()) {
            this.producerFinished.put(it.next(), true);
        }
    }

    private void processChecking(FactBase factBase, FORule fORule) {
        Substitution poll;
        BlockingQueue<Substitution> blockingQueue = this.checkingQueues.get(fORule);
        while (true) {
            try {
                poll = blockingQueue.poll(100L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (poll == null) {
                if (this.producerFinished.get(fORule).booleanValue() && blockingQueue.isEmpty()) {
                    this.checkingFinished.put(fORule, true);
                    return;
                }
            } else if (this.checker.check(fORule, poll, factBase)) {
                try {
                    this.applicationQueues.get(fORule).put(poll);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                }
            }
            Thread.currentThread().interrupt();
        }
    }

    private void processApplication(FactBase factBase, Set<Atom> set, Set<FORule> set2, FORule fORule) {
        FOFormula apply;
        BlockingQueue<Substitution> blockingQueue = this.applicationQueues.get(fORule);
        while (true) {
            try {
                Substitution poll = blockingQueue.poll(100L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    synchronized (this.applier) {
                        apply = this.applier.apply(fORule, poll, factBase);
                    }
                    if (apply != null) {
                        set.addAll(apply.asAtomSet());
                        set2.add(fORule);
                    }
                } else if (this.checkingFinished.get(fORule).booleanValue() && blockingQueue.isEmpty()) {
                    return;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void shutdownAndAwaitTermination(ExecutorService executorService) {
        executorService.shutdown();
        try {
            if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
            } else {
                throw new RuntimeException("Timeout waiting for worker to finish.");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Worker execution interrupted.", e);
        }
    }
}
