package cz.o2.proxima.beam.direct.io;

import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.Pair;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cz/o2/proxima/beam/direct/io/BlockingQueueLogObserver.class */
public class BlockingQueueLogObserver implements LogObserver, BatchLogObserver {
    private static final Logger log = LoggerFactory.getLogger(BlockingQueueLogObserver.class);
    private final String name;
    private final AtomicLong watermark;

    @Nullable
    private LogObserver.OnNextContext lastWrittenContext;

    @Nullable
    private LogObserver.OnNextContext lastReadContext;
    private long limit;
    private final AtomicReference<Throwable> error = new AtomicReference<>();
    AtomicBoolean stopped = new AtomicBoolean();
    private final BlockingQueue<Pair<StreamElement, LogObserver.OnNextContext>> queue = new ArrayBlockingQueue(100);

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BlockingQueueLogObserver create(String str, long j) {
        return create(str, Long.MAX_VALUE, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BlockingQueueLogObserver create(String str, long j, long j2) {
        return new BlockingQueueLogObserver(str, j, j2);
    }

    private BlockingQueueLogObserver(String str, long j, long j2) {
        this.name = (String) Objects.requireNonNull(str);
        this.watermark = new AtomicLong(j2);
        this.limit = j;
    }

    public boolean onError(Throwable th) {
        this.error.set(th);
        ExceptionUtils.unchecked(() -> {
            putToQueue(null, null);
        });
        return false;
    }

    public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
        this.watermark.set(onNextContext.getWatermark());
        log.trace("Received next element {} at watermark {}", streamElement, this.watermark);
        return enqueue(streamElement, onNextContext);
    }

    public boolean onNext(StreamElement streamElement, Partition partition) {
        log.trace("Received next element {} on partition {}", streamElement, partition);
        return enqueue(streamElement, null);
    }

    /*  JADX ERROR: Failed to decode insn: 0x000A: MOVE_MULTI, method: cz.o2.proxima.beam.direct.io.BlockingQueueLogObserver.enqueue(cz.o2.proxima.storage.StreamElement, cz.o2.proxima.direct.commitlog.LogObserver$OnNextContext):boolean
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:110)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    private boolean enqueue(cz.o2.proxima.storage.StreamElement r9, cz.o2.proxima.direct.commitlog.LogObserver.OnNextContext r10) {
        /*
            r8 = this;
            r0 = r8
            r1 = r10
            r0.lastWrittenContext = r1
            r0 = r8
            r1 = r0
            long r1 = r1.limit
            // decode failed: arraycopy: source index -1 out of bounds for object array[8]
            r2 = 1
            long r1 = r1 - r2
            r0.limit = r1
            r0 = 0
            int r-1 = (r-1 > r0 ? 1 : (r-1 == r0 ? 0 : -1))
            if (r-1 <= 0) goto L1c
            r-1 = r8
            r0 = r9
            r1 = r10
            r-1.putToQueue(r0, r1)
            return r-1
            goto L26
            r11 = move-exception
            java.lang.Thread r0 = java.lang.Thread.currentThread()
            r0.interrupt()
            r-1 = 0
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: cz.o2.proxima.beam.direct.io.BlockingQueueLogObserver.enqueue(cz.o2.proxima.storage.StreamElement, cz.o2.proxima.direct.commitlog.LogObserver$OnNextContext):boolean");
    }

    public void onCancelled() {
        log.debug("Cancelled {} consumption by request.", this.name);
    }

    public void onCompleted() {
        try {
            log.debug("Finished reading from observer {}", this.name);
            putToQueue(null, null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn("Interrupted while passing end-of-stream.", e);
        }
    }

    private boolean putToQueue(@Nullable StreamElement streamElement, @Nullable LogObserver.OnNextContext onNextContext) throws InterruptedException {
        Pair<StreamElement, LogObserver.OnNextContext> of = Pair.of(streamElement, onNextContext);
        while (!this.stopped.get()) {
            if (this.queue.offer(of, 50L, TimeUnit.MILLISECONDS)) {
                return true;
            }
        }
        return false;
    }

    public void onIdle(LogObserver.OnIdleContext onIdleContext) {
        this.watermark.set(onIdleContext.getWatermark());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public StreamElement take() {
        Pair<StreamElement, LogObserver.OnNextContext> pair = null;
        if (!this.stopped.get()) {
            pair = this.queue.poll();
        }
        return consumeTaken(pair);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public StreamElement takeBlocking() throws InterruptedException {
        while (!this.stopped.get()) {
            Pair<StreamElement, LogObserver.OnNextContext> poll = this.queue.poll(50L, TimeUnit.MILLISECONDS);
            if (poll != null) {
                return consumeTaken(poll);
            }
        }
        return null;
    }

    private StreamElement consumeTaken(@Nullable Pair<StreamElement, LogObserver.OnNextContext> pair) {
        if (pair == null || pair.getFirst() == null) {
            return null;
        }
        this.lastReadContext = (LogObserver.OnNextContext) pair.getSecond();
        return (StreamElement) pair.getFirst();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public Throwable getError() {
        return this.error.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getWatermark() {
        return this.watermark.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.stopped.set(true);
        ArrayList arrayList = new ArrayList();
        this.queue.drainTo(arrayList);
        arrayList.forEach(pair -> {
            ((LogObserver.OnNextContext) pair.getSecond()).nack();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearIncomingQueue() {
        this.queue.clear();
    }

    @Nullable
    public LogObserver.OnNextContext getLastWrittenContext() {
        return this.lastWrittenContext;
    }

    @Nullable
    public LogObserver.OnNextContext getLastReadContext() {
        return this.lastReadContext;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2068544018:
                if (implMethodName.equals("lambda$onError$1f11a8eb$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/beam/direct/io/BlockingQueueLogObserver") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    BlockingQueueLogObserver blockingQueueLogObserver = (BlockingQueueLogObserver) serializedLambda.getCapturedArg(0);
                    return () -> {
                        putToQueue(null, null);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
