package hu.akarnokd.reactive4java.scheduler;

import hu.akarnokd.reactive4java.base.Action1;
import hu.akarnokd.reactive4java.base.Scheduler;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:hu/akarnokd/reactive4java/scheduler/SingleLaneExecutor.class */
public final class SingleLaneExecutor<T> implements Closeable {
    final Scheduler pool;
    final Action1<? super T> action;
    final AtomicInteger wip = new AtomicInteger();
    final BlockingQueue<T> queue = new LinkedBlockingQueue();
    final Runnable processor = new Runnable() { // from class: hu.akarnokd.reactive4java.scheduler.SingleLaneExecutor.1
        @Override // java.lang.Runnable
        public synchronized void run() {
            int size;
            do {
                LinkedList linkedList = new LinkedList();
                SingleLaneExecutor.this.queue.drainTo(linkedList);
                size = linkedList.size();
                for (Object obj : linkedList) {
                    if (Thread.currentThread().isInterrupted()) {
                        SingleLaneExecutor.this.wip.addAndGet(-size);
                        return;
                    }
                    SingleLaneExecutor.this.action.invoke(obj);
                }
            } while (SingleLaneExecutor.this.wip.addAndGet(-size) > 0);
        }
    };
    final AtomicReference<Closeable> future = new AtomicReference<>();

    public SingleLaneExecutor(Scheduler scheduler, Action1<? super T> action1) {
        if (scheduler == null) {
            throw new IllegalArgumentException("pool is null");
        }
        if (action1 == null) {
            throw new IllegalArgumentException("action is null");
        }
        this.action = action1;
        this.pool = scheduler;
    }

    public void add(T t) {
        this.queue.add(t);
        if (this.wip.incrementAndGet() == 1) {
            this.future.set(this.pool.schedule(this.processor));
        }
    }

    public void add(Iterable<? extends T> iterable) {
        Iterator<? extends T> it = iterable.iterator();
        while (it.hasNext()) {
            add((SingleLaneExecutor<T>) it.next());
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Closeable andSet = this.future.getAndSet(null);
        if (andSet != null) {
            try {
                andSet.close();
            } catch (IOException e) {
            }
        }
        LinkedList linkedList = new LinkedList();
        this.queue.drainTo(linkedList);
        this.wip.addAndGet(-linkedList.size());
    }

    public static <T> SingleLaneExecutor<T> create(Scheduler scheduler, Action1<? super T> action1) {
        return new SingleLaneExecutor<>(scheduler, action1);
    }
}
