package technology.dice.dicewhere.lineprocessing;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import technology.dice.dicewhere.api.exceptions.LineParsingException;
import technology.dice.dicewhere.parsing.LineParser;
import technology.dice.dicewhere.parsing.ParsedLine;
import technology.dice.dicewhere.reading.RawLine;

/* loaded from: input_file:technology/dice/dicewhere/lineprocessing/LineProcessor.class */
public class LineProcessor implements Runnable {
    private static final int WORKER_BATCH_SIZE = 10000;
    private final ExecutorService executorService;
    private final ArrayBlockingQueue<RawLine> lines;
    private final LineParser parser;
    private final boolean retainOriginalLine;
    private final BlockingQueue<SerializedLine> destination;
    private final LineprocessorListenerForProvider progressListener;
    private final AtomicBoolean expectingMore = new AtomicBoolean(true);
    private final int workersCount;

    public LineProcessor(ExecutorService executorService, BlockingQueue<SerializedLine> blockingQueue, LineParser lineParser, boolean z, LineprocessorListenerForProvider lineprocessorListenerForProvider, int i) {
        this.lines = new ArrayBlockingQueue<>((i + 1) * WORKER_BATCH_SIZE);
        this.destination = blockingQueue;
        this.executorService = executorService;
        this.parser = lineParser;
        this.retainOriginalLine = z;
        this.progressListener = lineprocessorListenerForProvider;
        this.workersCount = i;
    }

    public void markDataComplete() {
        this.expectingMore.set(false);
    }

    public void addLine(RawLine rawLine) {
        try {
            this.lines.put(rawLine);
        } catch (InterruptedException e) {
            this.progressListener.enqueueError(rawLine, e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        long currentTimeMillis = System.currentTimeMillis();
        AtomicLong atomicLong = new AtomicLong();
        CompletableFuture[] completableFutureArr = new CompletableFuture[this.workersCount];
        while (true) {
            if (!this.expectingMore.get() && this.lines.size() <= 0) {
                this.progressListener.finished(atomicLong.get(), System.currentTimeMillis() - currentTimeMillis);
                return;
            }
            for (int i = 0; i < this.workersCount; i++) {
                try {
                    ArrayList arrayList = new ArrayList(WORKER_BATCH_SIZE);
                    Queues.drain(this.lines, arrayList, WORKER_BATCH_SIZE, 1L, TimeUnit.NANOSECONDS);
                    completableFutureArr[i] = CompletableFuture.supplyAsync(() -> {
                        return buildSerializedLineBatch(currentTimeMillis, arrayList);
                    }, this.executorService);
                } catch (InterruptedException e) {
                    this.progressListener.processorInterrupted(e);
                    throw new RuntimeException("Line processor interrupted", e);
                }
            }
            CompletableFuture.allOf(completableFutureArr);
            for (CompletableFuture completableFuture : completableFutureArr) {
                ((List) completableFuture.join()).forEach(serializedLine -> {
                    try {
                        this.destination.put(serializedLine);
                        atomicLong.getAndIncrement();
                        this.progressListener.lineProcessed(serializedLine, System.currentTimeMillis() - currentTimeMillis);
                    } catch (InterruptedException e2) {
                        this.progressListener.dequeueError(serializedLine, e2);
                    }
                });
            }
        }
    }

    private ImmutableList<SerializedLine> buildSerializedLineBatch(long j, Collection<RawLine> collection) {
        return (ImmutableList) collection.stream().flatMap(rawLine -> {
            return attemptParse(rawLine, j);
        }).collect(ImmutableList.toImmutableList());
    }

    private Stream<SerializedLine> attemptParse(RawLine rawLine, long j) {
        try {
            Stream<ParsedLine> parse = this.parser.parse(rawLine, this.retainOriginalLine);
            long currentTimeMillis = System.currentTimeMillis();
            return parse.flatMap(parsedLine -> {
                this.progressListener.lineParsed(parsedLine, currentTimeMillis - j);
                return attemptSerialize(parsedLine);
            });
        } catch (LineParsingException e) {
            this.progressListener.parseError(rawLine, e);
            return Stream.empty();
        } catch (Exception e2) {
            this.progressListener.parseError(rawLine, new LineParsingException(e2, rawLine));
            return Stream.empty();
        }
    }

    private Stream<SerializedLine> attemptSerialize(ParsedLine parsedLine) {
        try {
            return Stream.of(new SerializedLine(parsedLine.getStartIp(), parsedLine));
        } catch (Exception e) {
            this.progressListener.serializeError(parsedLine, e);
            return Stream.empty();
        }
    }
}
