package org.easybatch.tutorials.advanced.cbrd.fruits;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.easybatch.core.api.Record;
import org.easybatch.core.dispatcher.ContentBasedRecordDispatcher;
import org.easybatch.core.dispatcher.ContentBasedRecordDispatcherBuilder;
import org.easybatch.core.dispatcher.PoisonRecordBroadcaster;
import org.easybatch.core.filter.PoisonRecordFilter;
import org.easybatch.core.impl.Engine;
import org.easybatch.core.impl.EngineBuilder;
import org.easybatch.core.reader.QueueRecordReader;
import org.easybatch.core.reader.StringRecordReader;

/* loaded from: input_file:org/easybatch/tutorials/advanced/cbrd/fruits/FruitsParallelProcessingTutorial.class */
public class FruitsParallelProcessingTutorial {
    private static final int THREAD_POOL_SIZE = 4;

    public static void main(String[] strArr) throws Exception {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        LinkedBlockingQueue linkedBlockingQueue3 = new LinkedBlockingQueue();
        ContentBasedRecordDispatcher build = new ContentBasedRecordDispatcherBuilder().when(new AppleRecordPredicate()).dispatchTo(linkedBlockingQueue).when(new OrangeRecordPredicate()).dispatchTo(linkedBlockingQueue2).otherwise(linkedBlockingQueue3).build();
        Engine build2 = EngineBuilder.aNewEngine().reader(new StringRecordReader("1,apple\n2,orange\n3,banana\n4,apple\n5,pear")).processor(build).batchProcessEventListener(new PoisonRecordBroadcaster(build)).build();
        Engine buildWorkerEngine = buildWorkerEngine(linkedBlockingQueue);
        Engine buildWorkerEngine2 = buildWorkerEngine(linkedBlockingQueue2);
        Engine buildWorkerEngine3 = buildWorkerEngine(linkedBlockingQueue3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        newFixedThreadPool.submit((Callable) build2);
        newFixedThreadPool.submit((Callable) buildWorkerEngine);
        newFixedThreadPool.submit((Callable) buildWorkerEngine2);
        newFixedThreadPool.submit((Callable) buildWorkerEngine3);
        newFixedThreadPool.shutdown();
    }

    public static Engine buildWorkerEngine(BlockingQueue<Record> blockingQueue) {
        return EngineBuilder.aNewEngine().reader(new QueueRecordReader(blockingQueue)).filter(new PoisonRecordFilter()).build();
    }
}
