package io.castled.core;

import io.castled.exceptions.CastledRuntimeException;
import io.castled.utils.ThreadUtils;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/castled/core/CastledBlockingQueue.class */
public class CastledBlockingQueue<T> {
    private static final Logger log = LoggerFactory.getLogger(CastledBlockingQueue.class);
    private final Consumer<T> consumer;
    private final ExecutorService executorService;
    private final BlockingQueue<T> blockingQueue;
    private volatile boolean shutDown = false;
    private boolean exitOnError;
    private volatile Exception failureException;

    public CastledBlockingQueue(Consumer<T> consumer, int i, int i2, boolean z) {
        this.exitOnError = false;
        this.consumer = decorateConsumer(consumer);
        this.executorService = Executors.newFixedThreadPool(i);
        log.info("Parallelism {} , Max Capacity {}", Integer.valueOf(i), Integer.valueOf(i2));
        this.blockingQueue = new ArrayBlockingQueue(i2);
        this.exitOnError = z;
        for (int i3 = 0; i3 < i; i3++) {
            this.executorService.execute(() -> {
                while (!this.shutDown) {
                    try {
                        T poll = this.blockingQueue.poll(1L, TimeUnit.SECONDS);
                        if (poll != null) {
                            this.consumer.accept(poll);
                        }
                    } catch (Exception e) {
                        log.error("Blocking queue Consumer failed", e);
                        if (z) {
                            this.failureException = e;
                            return;
                        }
                    }
                }
            });
        }
    }

    public Consumer<T> decorateConsumer(Consumer<T> consumer) {
        return consumer;
    }

    public void writePayload(T t, int i, TimeUnit timeUnit) throws TimeoutException {
        if (this.failureException != null) {
            shutdownNow();
            throw new CastledRuntimeException(this.failureException);
        }
        try {
            if (this.blockingQueue.offer(t, i, timeUnit)) {
            } else {
                throw new TimeoutException();
            }
        } catch (InterruptedException e) {
            log.error("Blocking queue write payload interrupted", e);
            throw new CastledRuntimeException(e);
        }
    }

    public void flush(long j) throws TimeoutException {
        while (!this.blockingQueue.isEmpty()) {
            if (this.exitOnError && this.failureException != null) {
                shutdownNow();
                throw new CastledRuntimeException(this.failureException);
            }
            ThreadUtils.interruptIgnoredSleep(500L);
        }
        this.shutDown = true;
        this.executorService.shutdown();
        try {
            this.executorService.awaitTermination(60L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            this.executorService.shutdownNow();
        }
        if (!this.exitOnError || this.failureException == null) {
            return;
        }
        shutdownNow();
        throw new CastledRuntimeException(this.failureException);
    }

    private void shutdownNow() {
        this.shutDown = true;
        this.executorService.shutdownNow();
    }
}
