/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.transaction;

import cz.o2.proxima.direct.LogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.Pair;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreadPooledObserver
implements CommitLogObserver {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ThreadPooledObserver.class);
    private final CommitLogObserver delegate;
    private final List<BlockingQueue<Pair<StreamElement, CommitLogObserver.OnNextContext>>> workQueues = new ArrayList<BlockingQueue<Pair<StreamElement, CommitLogObserver.OnNextContext>>>();
    private final List<AtomicBoolean> processing = new ArrayList<AtomicBoolean>();
    private final List<Future<?>> futures = new ArrayList();

    public ThreadPooledObserver(ExecutorService executorService, CommitLogObserver requestObserver, int parallelism) {
        this.delegate = requestObserver;
        for (int i = 0; i < parallelism; ++i) {
            ArrayBlockingQueue queue = new ArrayBlockingQueue(50);
            this.workQueues.add(queue);
            AtomicBoolean processingFlag = new AtomicBoolean();
            this.processing.add(processingFlag);
            this.futures.add(executorService.submit(() -> this.processQueue(queue, processingFlag)));
        }
    }

    private void processQueue(BlockingQueue<Pair<StreamElement, CommitLogObserver.OnNextContext>> queue, AtomicBoolean processingFlag) {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                ExceptionUtils.ignoringInterrupted((ExceptionUtils.ThrowingRunnable & Serializable)() -> {
                    Pair polled = (Pair)queue.take();
                    processingFlag.set(true);
                    this.delegate.onNext((StreamElement)polled.getFirst(), (LogObserver.OnNextContext)polled.getSecond());
                    processingFlag.set(false);
                });
            }
        }
        catch (Throwable err) {
            log.error("Error processing input queue.", err);
            this.onError(err);
        }
    }

    @Override
    public void onCompleted() {
        this.waitTillQueueEmpty();
        this.exitThreads();
        if (!Thread.currentThread().isInterrupted()) {
            this.delegate.onCompleted();
        }
    }

    @Override
    public void onCancelled() {
        this.waitTillQueueEmpty();
        this.exitThreads();
        if (!Thread.currentThread().isInterrupted()) {
            this.delegate.onCancelled();
        }
    }

    @Override
    public boolean onError(Throwable error) {
        this.waitTillQueueEmpty();
        this.exitThreads();
        return this.delegate.onError(error);
    }

    private void exitThreads() {
        this.futures.forEach(f -> f.cancel(true));
    }

    @Override
    public boolean onNext(StreamElement ingest, CommitLogObserver.OnNextContext context) {
        return !ExceptionUtils.ignoringInterrupted((ExceptionUtils.ThrowingRunnable & Serializable)() -> this.workQueues.get((ingest.getKey().hashCode() & Integer.MAX_VALUE) % this.workQueues.size()).put((Pair<StreamElement, CommitLogObserver.OnNextContext>)Pair.of((Object)ingest, (Object)context)));
    }

    @Override
    public void onRepartition(CommitLogObserver.OnRepartitionContext context) {
        this.waitTillQueueEmpty();
        this.delegate.onRepartition(context);
    }

    @Override
    public void onIdle(CommitLogObserver.OnIdleContext context) {
        if (this.workQueueEmpty()) {
            this.delegate.onIdle(context);
        }
    }

    private boolean workQueueEmpty() {
        return this.workQueues.stream().allMatch(Collection::isEmpty);
    }

    private boolean anyInProgress() {
        return this.processing.stream().anyMatch(AtomicBoolean::get);
    }

    private void waitTillQueueEmpty() {
        while (this.anyInProgress() && !this.workQueueEmpty() && !Thread.currentThread().isInterrupted()) {
            ExceptionUtils.ignoringInterrupted((ExceptionUtils.ThrowingRunnable & Serializable)() -> TimeUnit.MILLISECONDS.sleep(100L));
        }
    }
}

