package org.easybatch.tutorials.advanced.parallel;

import java.io.File;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.easybatch.core.api.Record;
import org.easybatch.core.dispatcher.PoisonRecordBroadcaster;
import org.easybatch.core.dispatcher.RoundRobinRecordDispatcher;
import org.easybatch.core.filter.PoisonRecordFilter;
import org.easybatch.core.impl.Engine;
import org.easybatch.core.impl.EngineBuilder;
import org.easybatch.core.reader.QueueRecordReader;
import org.easybatch.flatfile.FlatFileRecordReader;
import org.easybatch.tutorials.basic.helloworld.TweetProcessor;

/* loaded from: input_file:org/easybatch/tutorials/advanced/parallel/ParallelTutorialWithRecordDispatching.class */
public class ParallelTutorialWithRecordDispatching {
    private static final int THREAD_POOL_SIZE = 3;

    public static void main(String[] strArr) throws Exception {
        File file = new File(ParallelTutorialWithRecordDispatching.class.getResource("/org/easybatch/tutorials/advanced/parallel/tweets.csv").toURI());
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        RoundRobinRecordDispatcher roundRobinRecordDispatcher = new RoundRobinRecordDispatcher(Arrays.asList(linkedBlockingQueue, linkedBlockingQueue2));
        Engine build = EngineBuilder.aNewEngine().reader(new FlatFileRecordReader(file)).processor(roundRobinRecordDispatcher).batchProcessEventListener(new PoisonRecordBroadcaster(roundRobinRecordDispatcher)).build();
        Engine buildWorkerEngine = buildWorkerEngine(linkedBlockingQueue);
        Engine buildWorkerEngine2 = buildWorkerEngine(linkedBlockingQueue2);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        newFixedThreadPool.submit((Callable) build);
        newFixedThreadPool.submit((Callable) buildWorkerEngine);
        newFixedThreadPool.submit((Callable) buildWorkerEngine2);
        newFixedThreadPool.shutdown();
    }

    public static Engine buildWorkerEngine(BlockingQueue<Record> blockingQueue) {
        return EngineBuilder.aNewEngine().reader(new QueueRecordReader(blockingQueue)).filter(new PoisonRecordFilter()).processor(new TweetProcessor()).build();
    }
}
