package io.streamthoughts.azkarra.api.events;

import io.streamthoughts.azkarra.api.events.callback.LimitQueueCallback;
import io.streamthoughts.azkarra.api.events.callback.LimitedQueueCallback;
import io.streamthoughts.azkarra.api.events.callback.QueueCallback;
import io.streamthoughts.azkarra.api.model.KV;
import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:io/streamthoughts/azkarra/api/events/BasicBlockingRecordQueue.class */
public class BasicBlockingRecordQueue<K, V> implements BlockingRecordQueue<K, V> {
    private static final int DEFAULT_QUEUE_SIZE_LIMIT = 10000;
    private final BlockingQueue<KV<K, V>> blockingQueue;
    private final Duration offerWaitDuration;
    private final AtomicBoolean isClosed;
    private LimitQueueCallback callback;

    public BasicBlockingRecordQueue() {
        this(DEFAULT_QUEUE_SIZE_LIMIT);
    }

    public BasicBlockingRecordQueue(int i) {
        this(i, Duration.ofMillis(100L), LimitHandlers.NO_OP);
    }

    private BasicBlockingRecordQueue(int i, Duration duration, LimitHandler limitHandler) {
        this.isClosed = new AtomicBoolean(false);
        if (i <= 0) {
            throw new IllegalArgumentException("queueSizeLimit must be superior to 0, was :" + i);
        }
        this.blockingQueue = new LinkedBlockingQueue(i);
        this.offerWaitDuration = duration;
        this.callback = new LimitedQueueCallback(i);
        this.callback.setQueue(this);
        this.callback.setLimitHandler(limitHandler);
    }

    @Override // io.streamthoughts.azkarra.api.events.BlockingRecordQueue
    public void setLimitHandler(LimitHandler limitHandler) {
        Objects.requireNonNull(limitHandler, "limitHandler cannot be null");
        this.callback.setLimitHandler(limitHandler);
    }

    @Override // io.streamthoughts.azkarra.api.events.BlockingRecordQueue
    public void setQueueCallback(final QueueCallback queueCallback) {
        Objects.requireNonNull(queueCallback, "callback cannot be null");
        final LimitQueueCallback limitQueueCallback = this.callback;
        this.callback = new LimitQueueCallback() { // from class: io.streamthoughts.azkarra.api.events.BasicBlockingRecordQueue.1
            @Override // io.streamthoughts.azkarra.api.events.callback.LimitQueueCallback
            public void setLimitHandler(LimitHandler limitHandler) {
                limitQueueCallback.setLimitHandler(limitHandler);
            }

            @Override // io.streamthoughts.azkarra.api.events.callback.LimitQueueCallback
            public void setQueue(BlockingRecordQueue blockingRecordQueue) {
                limitQueueCallback.setQueue(blockingRecordQueue);
            }

            @Override // io.streamthoughts.azkarra.api.events.callback.QueueCallback
            public void onQueued() {
                queueCallback.onQueued();
                limitQueueCallback.onQueued();
            }

            @Override // io.streamthoughts.azkarra.api.events.callback.QueueCallback
            public void onClosed() {
                queueCallback.onClosed();
                limitQueueCallback.onClosed();
            }
        };
    }

    @Override // io.streamthoughts.azkarra.api.events.BlockingRecordQueue
    public KV<K, V> poll(Duration duration) throws InterruptedException {
        return this.blockingQueue.poll(duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // io.streamthoughts.azkarra.api.events.BlockingRecordQueue
    public KV<K, V> poll() {
        return this.blockingQueue.poll();
    }

    @Override // io.streamthoughts.azkarra.api.events.BlockingRecordQueue
    public void drainTo(Collection<? super KV<K, V>> collection) {
        this.blockingQueue.drainTo(collection);
    }

    @Override // io.streamthoughts.azkarra.api.events.BlockingRecordQueue
    public int size() {
        return this.blockingQueue.size();
    }

    @Override // io.streamthoughts.azkarra.api.events.BlockingRecordQueue
    public boolean isEmpty() {
        return this.blockingQueue.isEmpty();
    }

    @Override // io.streamthoughts.azkarra.api.events.BlockingRecordQueue
    public void open() {
        this.isClosed.set(false);
    }

    @Override // io.streamthoughts.azkarra.api.events.BlockingRecordQueue
    public void close() {
        this.isClosed.set(true);
        if (this.callback != null) {
            this.callback.onClosed();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:12:0x002a, code lost:
    
        if (r6.callback == null) goto L12;
     */
    /* JADX WARN: Code restructure failed: missing block: B:13:0x002d, code lost:
    
        r6.callback.onQueued();
     */
    @Override // io.streamthoughts.azkarra.api.events.BlockingRecordQueue
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void send(io.streamthoughts.azkarra.api.model.KV<K, V> r7) {
        /*
            r6 = this;
            r0 = r7
            if (r0 != 0) goto L5
            return
        L5:
            r0 = r6
            java.util.concurrent.atomic.AtomicBoolean r0 = r0.isClosed     // Catch: java.lang.InterruptedException -> L3c
            boolean r0 = r0.get()     // Catch: java.lang.InterruptedException -> L3c
            if (r0 != 0) goto L39
            r0 = r6
            java.util.concurrent.BlockingQueue<io.streamthoughts.azkarra.api.model.KV<K, V>> r0 = r0.blockingQueue     // Catch: java.lang.InterruptedException -> L3c
            r1 = r7
            r2 = r6
            java.time.Duration r2 = r2.offerWaitDuration     // Catch: java.lang.InterruptedException -> L3c
            long r2 = r2.toMillis()     // Catch: java.lang.InterruptedException -> L3c
            java.util.concurrent.TimeUnit r3 = java.util.concurrent.TimeUnit.MILLISECONDS     // Catch: java.lang.InterruptedException -> L3c
            boolean r0 = r0.offer(r1, r2, r3)     // Catch: java.lang.InterruptedException -> L3c
            if (r0 == 0) goto L5
            r0 = r6
            io.streamthoughts.azkarra.api.events.callback.LimitQueueCallback r0 = r0.callback     // Catch: java.lang.InterruptedException -> L3c
            if (r0 == 0) goto L39
            r0 = r6
            io.streamthoughts.azkarra.api.events.callback.LimitQueueCallback r0 = r0.callback     // Catch: java.lang.InterruptedException -> L3c
            r0.onQueued()     // Catch: java.lang.InterruptedException -> L3c
            goto L39
        L39:
            goto L43
        L3c:
            r8 = move-exception
            java.lang.Thread r0 = java.lang.Thread.currentThread()
            r0.interrupt()
        L43:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: io.streamthoughts.azkarra.api.events.BasicBlockingRecordQueue.send(io.streamthoughts.azkarra.api.model.KV):void");
    }

    @Override // io.streamthoughts.azkarra.api.events.BlockingRecordQueue
    public void clear() {
        this.blockingQueue.clear();
    }
}
