package org.fiolino.common.processing.sink;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.fiolino.common.container.Container;

/* loaded from: input_file:org/fiolino/common/processing/sink/ThreadsafeAggregatingSink.class */
public final class ThreadsafeAggregatingSink<T> extends ThreadsafeChainedSink<T, List<T>> {
    private static final Logger logger = Logger.getLogger(ThreadsafeAggregatingSink.class.getName());
    private final BlockingQueue<T> queue;
    private final Lock lock;
    private volatile Container lastMetadata;

    public ThreadsafeAggregatingSink(ThreadsafeSink<List<T>> threadsafeSink, int i) {
        super(threadsafeSink);
        this.lock = new ReentrantLock();
        this.lastMetadata = Container.empty();
        this.queue = new ArrayBlockingQueue(i);
    }

    @Override // org.fiolino.common.processing.sink.Sink
    public void accept(T t, Container container) throws Exception {
        while (!this.queue.offer(t)) {
            if (this.lock.tryLock()) {
                try {
                    if (this.queue.offer(t)) {
                        this.lastMetadata = container;
                        this.lock.unlock();
                        return;
                    } else {
                        ArrayList arrayList = new ArrayList(this.queue);
                        this.queue.clear();
                        this.lock.unlock();
                        getTarget().accept(arrayList, container);
                    }
                } catch (Throwable th) {
                    this.lock.unlock();
                    throw th;
                }
            } else {
                try {
                    TimeUnit.MILLISECONDS.sleep(20L);
                } catch (InterruptedException e) {
                    logger.log(Level.WARNING, () -> {
                        return "Interrupted while waiting for queue. Discarding " + t;
                    });
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
        this.lastMetadata = container;
    }

    @Override // org.fiolino.common.processing.sink.ChainedSink, org.fiolino.common.processing.sink.Sink
    public void commit(Container container) throws Exception {
        ArrayList arrayList = new ArrayList(this.queue);
        this.queue.removeAll(arrayList);
        if (!arrayList.isEmpty()) {
            getTarget().accept(arrayList, this.lastMetadata);
        }
        this.lastMetadata = Container.empty();
        super.commit(container);
    }
}
