package io.annot8.common.implementations.pipelines;

import io.annot8.common.implementations.factories.ItemCreator;
import io.annot8.common.implementations.factories.NotifyingItemFactory;
import io.annot8.core.components.Processor;
import io.annot8.core.components.Resource;
import io.annot8.core.components.Source;
import io.annot8.core.components.responses.ProcessorResponse;
import io.annot8.core.components.responses.SourceResponse;
import io.annot8.core.data.Item;
import io.annot8.core.exceptions.Annot8Exception;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/annot8/common/implementations/pipelines/SimplePipeline.class */
public class SimplePipeline implements Pipeline {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimplePipeline.class);
    private final String id = UUID.randomUUID().toString();
    private final NotifyingItemFactory itemFactory;
    private final ItemQueue itemQueue;
    private final Map<String, Resource> resources;
    private final List<Source> sources;
    private final List<Processor> processors;

    public SimplePipeline(ItemCreator itemCreator, ItemQueue itemQueue, Map<String, Resource> map, List<Source> list, List<Processor> list2) {
        this.itemFactory = new NotifyingItemFactory(itemCreator);
        this.itemQueue = itemQueue;
        this.resources = map;
        this.sources = list;
        this.processors = list2;
        NotifyingItemFactory notifyingItemFactory = this.itemFactory;
        Objects.requireNonNull(itemQueue);
        notifyingItemFactory.registerListener(itemQueue::add);
    }

    public String getId() {
        return this.id;
    }

    @Override // io.annot8.common.implementations.pipelines.Pipeline
    public void run() {
        Iterator<Source> it = this.sources.iterator();
        while (it.hasNext()) {
            process(it.next());
        }
    }

    private void process(Source source) {
        while (true) {
            SourceResponse.Status status = source.read(this.itemFactory).getStatus();
            processItemQueue();
            if (status != SourceResponse.Status.OK && status != SourceResponse.Status.EMPTY) {
                close();
                return;
            }
        }
    }

    private void processItemQueue() {
        if (this.itemQueue == null) {
            return;
        }
        while (this.itemQueue.hasItems()) {
            processItem(this.itemQueue.next());
        }
    }

    private void processItem(Item item) {
        for (Processor processor : this.processors) {
            try {
                ProcessorResponse.Status status = processor.process(item).getStatus();
                if (status == ProcessorResponse.Status.OK) {
                    if (item.isDiscarded()) {
                        LOGGER.warn("Item discarded, stopping processing");
                        return;
                    }
                } else if (status == ProcessorResponse.Status.PROCESSOR_ERROR) {
                    LOGGER.error("Pipeline problem, exiting");
                    System.exit(1);
                } else if (status == ProcessorResponse.Status.ITEM_ERROR) {
                    LOGGER.error("Item problem, skipping rest of pipeline");
                    return;
                }
            } catch (Annot8Exception e) {
                LOGGER.error("Failed to process data item with processor {}", processor.getClass().getName(), e);
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.sources.forEach((v0) -> {
            v0.close();
        });
        this.processors.forEach((v0) -> {
            v0.close();
        });
        this.resources.values().forEach((v0) -> {
            v0.close();
        });
    }
}
