package step.datapool;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import step.core.execution.ExecutionContext;

/* loaded from: input_file:step/datapool/DataSet.class */
public abstract class DataSet<T> {
    private static final Logger logger = LoggerFactory.getLogger(DataSet.class);
    protected final T configuration;
    protected ExecutionContext context;
    protected LinkedBlockingQueue<DataPoolRow> writeQueue;
    protected ExecutorService writeQueueProcessor;
    protected boolean isRowCommitEnabled = false;
    protected volatile boolean closing;

    public DataSet(T t) {
        this.configuration = t;
    }

    public void enableRowCommit(boolean z) {
        this.isRowCommitEnabled = z;
    }

    public void init() {
        this.closing = false;
        this.writeQueue = new LinkedBlockingQueue<>(this.context.getConfiguration().getPropertyAsInteger("datasets.write.queue.maxsize", 1000).intValue());
        this.writeQueueProcessor = Executors.newFixedThreadPool(1, new BasicThreadFactory.Builder().namingPattern("dataset-write-thread-%d").build());
        this.writeQueueProcessor.submit(new Runnable() { // from class: step.datapool.DataSet.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        DataPoolRow poll = DataSet.this.writeQueue.poll(100L, TimeUnit.MILLISECONDS);
                        if (poll != null) {
                            if (DataSet.this.isRowCommitEnabled) {
                                poll.waitForCommit();
                            }
                            try {
                                DataSet.this.writeRow(poll);
                            } catch (IOException e) {
                                DataSet.logger.error("Error while writing row" + poll.toString(), e);
                            }
                        } else if (DataSet.this.closing) {
                            return;
                        }
                    } catch (InterruptedException e2) {
                        DataSet.logger.error("Error while running queue processor thread", e2);
                        return;
                    }
                }
            }
        });
    }

    public abstract void reset();

    public void close() {
        this.closing = true;
        this.writeQueueProcessor.shutdown();
        try {
            if (!this.writeQueueProcessor.awaitTermination(1L, TimeUnit.MINUTES)) {
                logger.error("Timeout while waiting for write queue processor to terminate");
            }
        } catch (InterruptedException e) {
            logger.error("Error while waiting for write queue processor to terminate", e);
        }
    }

    public final synchronized DataPoolRow next() {
        Object next_ = next_();
        DataPoolRow dataPoolRow = next_ != null ? new DataPoolRow(next_) : null;
        if (dataPoolRow != null) {
            this.writeQueue.offer(dataPoolRow);
        }
        return dataPoolRow;
    }

    public abstract Object next_();

    public abstract void addRow(Object obj);

    public void save() {
    }

    protected void setContext(ExecutionContext executionContext) {
        this.context = executionContext;
    }

    public void writeRow(DataPoolRow dataPoolRow) throws IOException {
    }
}
