package org.easybatch.core.reader;

import java.util.concurrent.BlockingQueue;
import org.easybatch.core.record.PoisonRecord;
import org.easybatch.core.record.Record;

/* loaded from: input_file:org/easybatch/core/reader/BlockingQueueRecordReader.class */
public class BlockingQueueRecordReader<T extends Record> implements RecordReader {
    private boolean stop;
    private int poisonRecords;
    private int totalPoisonRecords;
    private BlockingQueue<T> queue;

    public BlockingQueueRecordReader(BlockingQueue<T> blockingQueue) {
        this(blockingQueue, 1);
    }

    public BlockingQueueRecordReader(BlockingQueue<T> blockingQueue, int i) {
        this.queue = blockingQueue;
        this.totalPoisonRecords = i;
    }

    @Override // org.easybatch.core.reader.RecordReader
    public void open() {
        this.poisonRecords = 0;
    }

    @Override // org.easybatch.core.reader.RecordReader
    public boolean hasNextRecord() {
        return !this.stop;
    }

    @Override // org.easybatch.core.reader.RecordReader
    public T readNextRecord() throws RecordReadingException {
        try {
            T take = this.queue.take();
            if (take instanceof PoisonRecord) {
                this.poisonRecords++;
            }
            this.stop = this.poisonRecords == this.totalPoisonRecords;
            return take;
        } catch (InterruptedException e) {
            throw new RecordReadingException("Unable to read next record from the queue", e);
        }
    }

    @Override // org.easybatch.core.reader.RecordReader
    public Long getTotalRecords() {
        return null;
    }

    @Override // org.easybatch.core.reader.RecordReader
    public String getDataSourceName() {
        return "In-Memory Queue";
    }

    @Override // org.easybatch.core.reader.RecordReader
    public void close() {
    }
}
