/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.beam.direct.io;

import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.Pair;
import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BlockingQueueLogObserver
implements LogObserver,
BatchLogObserver {
    private static final Logger log = LoggerFactory.getLogger(BlockingQueueLogObserver.class);
    private final String name;
    private final AtomicReference<Throwable> error = new AtomicReference();
    private final AtomicLong watermark;
    private final BlockingQueue<Pair<StreamElement, LogObserver.OnNextContext>> queue;
    private final AtomicBoolean stopped = new AtomicBoolean();
    @Nullable
    private LogObserver.OnNextContext lastWrittenContext;
    @Nullable
    private LogObserver.OnNextContext lastReadContext;
    private long limit;
    private boolean cancelled = false;

    static BlockingQueueLogObserver create(String name, long startingWatermark) {
        return BlockingQueueLogObserver.create(name, Long.MAX_VALUE, startingWatermark);
    }

    static BlockingQueueLogObserver create(String name, long limit, long startingWatermark) {
        return new BlockingQueueLogObserver(name, limit, startingWatermark);
    }

    private BlockingQueueLogObserver(String name, long limit, long startingWatermark) {
        this.name = Objects.requireNonNull(name);
        this.watermark = new AtomicLong(startingWatermark);
        this.limit = limit;
        this.queue = new ArrayBlockingQueue<Pair<StreamElement, LogObserver.OnNextContext>>(100);
        log.debug("Created {}", (Object)this);
    }

    public boolean onError(Throwable error) {
        this.error.set(error);
        ExceptionUtils.unchecked((ExceptionUtils.ThrowingRunnable & Serializable)() -> this.putToQueue(null, null));
        return false;
    }

    public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
        if (log.isDebugEnabled()) {
            log.debug("Received next element {} at watermark {} offset {}", new Object[]{ingest, context.getWatermark(), context.getOffset()});
        }
        return this.enqueue(ingest, context);
    }

    public boolean onNext(StreamElement element, Partition partition) {
        log.debug("Received next element {} on partition {}", (Object)element, (Object)partition);
        return this.enqueue(element, null);
    }

    private boolean enqueue(StreamElement element, @Nullable LogObserver.OnNextContext context) {
        try {
            this.lastWrittenContext = context;
            if (this.limit-- > 0L) {
                return this.putToQueue(element, context);
            }
            log.debug("Terminating consumption of {} due to limit {} while enqueing {}", new Object[]{this.name, this.limit, element});
        }
        catch (InterruptedException ex) {
            log.warn("Interrupted while putting element {} to queue", (Object)element, (Object)ex);
            Thread.currentThread().interrupt();
        }
        return false;
    }

    public void onCancelled() {
        this.cancelled = true;
        log.debug("Cancelled {} consumption by request.", (Object)this.name);
    }

    public void onCompleted() {
        try {
            log.debug("Finished reading from observer {}", (Object)this.name);
            this.putToQueue(null, null);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            log.warn("Interrupted while passing end-of-stream.", (Throwable)ex);
        }
    }

    private boolean putToQueue(@Nullable StreamElement element, @Nullable LogObserver.OnNextContext context) throws InterruptedException {
        Pair p = Pair.of((Object)element, (Object)context);
        while (!this.stopped.get()) {
            if (!this.queue.offer((Pair<StreamElement, LogObserver.OnNextContext>)p, 50L, TimeUnit.MILLISECONDS)) continue;
            return true;
        }
        log.debug("Finishing consumption due to source being stopped");
        return false;
    }

    public void onIdle(LogObserver.OnIdleContext context) {
        if (this.queue.isEmpty()) {
            this.updateAndLogWatermark(context.getWatermark());
        }
    }

    @Nullable
    StreamElement take() {
        Pair taken = null;
        if (!this.stopped.get()) {
            taken = (Pair)this.queue.poll();
        }
        return this.consumeTaken(taken);
    }

    @Nullable
    StreamElement takeBlocking() throws InterruptedException {
        while (!this.stopped.get()) {
            Pair<StreamElement, LogObserver.OnNextContext> taken = this.queue.poll(50L, TimeUnit.MILLISECONDS);
            if (taken == null) continue;
            return this.consumeTaken(taken);
        }
        return null;
    }

    private StreamElement consumeTaken(@Nullable Pair<StreamElement, LogObserver.OnNextContext> taken) {
        if (taken != null && taken.getFirst() != null) {
            this.lastReadContext = (LogObserver.OnNextContext)taken.getSecond();
            if (this.lastReadContext != null) {
                this.updateAndLogWatermark(this.lastReadContext.getWatermark());
            }
            return (StreamElement)taken.getFirst();
        }
        return null;
    }

    @Nullable
    Throwable getError() {
        return this.error.get();
    }

    long getWatermark() {
        return this.watermark.get();
    }

    void stop() {
        this.stopped.set(true);
        ArrayList drop = new ArrayList();
        this.queue.drainTo(drop);
        drop.forEach(p -> Optional.ofNullable((LogObserver.OnNextContext)p.getSecond()).ifPresent(LogObserver.OffsetCommitter::nack));
    }

    void clearIncomingQueue() {
        this.queue.clear();
    }

    private void updateAndLogWatermark(long newWatermark) {
        if (!this.cancelled) {
            if (log.isDebugEnabled() && this.watermark.get() < newWatermark) {
                log.debug("Watermark updated from {} to {}", (Object)Instant.ofEpochMilli(this.watermark.get()), (Object)Instant.ofEpochMilli(newWatermark));
            }
            this.watermark.set(newWatermark);
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("name", (Object)this.name).add("limit", this.limit).toString();
    }

    public String getName() {
        return this.name;
    }

    @Nullable
    public LogObserver.OnNextContext getLastWrittenContext() {
        return this.lastWrittenContext;
    }

    @Nullable
    public LogObserver.OnNextContext getLastReadContext() {
        return this.lastReadContext;
    }
}

