package tech.ydb.topic.read.impl;

import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.SyncReader;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.settings.ReaderSettings;

/* loaded from: input_file:tech/ydb/topic/read/impl/SyncReaderImpl.class */
public class SyncReaderImpl extends ReaderImpl implements SyncReader {
    private static final Logger logger = LoggerFactory.getLogger(SyncReaderImpl.class);
    private static final int POLL_INTERVAL_SECONDS = 5;
    private final Queue<MessageBatchWrapper> batchesInQueue;
    private int currentMessageIndex;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tech/ydb/topic/read/impl/SyncReaderImpl$MessageBatchWrapper.class */
    public static class MessageBatchWrapper {
        private final List<Message> messages;
        private final CompletableFuture<Void> future;

        private MessageBatchWrapper(List<Message> list, CompletableFuture<Void> completableFuture) {
            this.messages = list;
            this.future = completableFuture;
        }
    }

    public SyncReaderImpl(TopicRpc topicRpc, ReaderSettings readerSettings) {
        super(topicRpc, readerSettings);
        this.batchesInQueue = new LinkedList();
        this.currentMessageIndex = 0;
    }

    @Override // tech.ydb.topic.read.SyncReader
    public void init() {
        initImpl();
    }

    @Override // tech.ydb.topic.read.SyncReader
    public void initAndWait() {
        initImpl().join();
    }

    @Override // tech.ydb.topic.read.SyncReader
    @Nullable
    public Message receive(long j, TimeUnit timeUnit) throws InterruptedException {
        if (this.isStopped.get()) {
            throw new RuntimeException("Reader was stopped");
        }
        synchronized (this.batchesInQueue) {
            if (this.batchesInQueue.isEmpty()) {
                Instant plusMillis = Instant.now().plusMillis(TimeUnit.MILLISECONDS.convert(j, timeUnit));
                while (this.batchesInQueue.isEmpty()) {
                    Instant now = Instant.now();
                    if (now.isAfter(plusMillis)) {
                        break;
                    }
                    long max = Math.max(1L, Duration.between(now, plusMillis).toMillis());
                    logger.trace("No messages in queue. Waiting for {} ms...", Long.valueOf(max));
                    this.batchesInQueue.wait(max);
                }
                if (this.batchesInQueue.isEmpty()) {
                    logger.trace("Still no messages in queue. Returning null");
                    return null;
                }
            }
            logger.trace("Taking a message with index {} from batch", Integer.valueOf(this.currentMessageIndex));
            MessageBatchWrapper element = this.batchesInQueue.element();
            Message message = (Message) element.messages.get(this.currentMessageIndex);
            this.currentMessageIndex++;
            if (this.currentMessageIndex >= element.messages.size()) {
                logger.debug("Batch is read. signalling core reader impl");
                this.batchesInQueue.remove();
                this.currentMessageIndex = 0;
                element.future.complete(null);
            }
            return message;
        }
    }

    @Override // tech.ydb.topic.read.SyncReader
    public Message receive() throws InterruptedException {
        Message receive;
        do {
            receive = receive(5L, TimeUnit.SECONDS);
        } while (receive == null);
        return receive;
    }

    @Override // tech.ydb.topic.read.impl.ReaderImpl
    protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent dataReceivedEvent) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (this.isStopped.get()) {
            completableFuture.completeExceptionally(new RuntimeException("Reader was stopped"));
            return completableFuture;
        }
        if (dataReceivedEvent.getMessages().isEmpty()) {
            completableFuture.completeExceptionally(new RuntimeException("Batch has no messages"));
            return completableFuture;
        }
        synchronized (this.batchesInQueue) {
            logger.debug("Putting a message batch into queue and notifying in case receive method is waiting");
            this.batchesInQueue.add(new MessageBatchWrapper(dataReceivedEvent.getMessages(), completableFuture));
            this.batchesInQueue.notify();
        }
        return completableFuture;
    }

    @Override // tech.ydb.topic.read.impl.ReaderImpl
    protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest startPartitionSessionRequest) {
        sendStartPartitionSessionResponse(startPartitionSessionRequest, null);
    }

    @Override // tech.ydb.topic.read.impl.ReaderImpl
    protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest stopPartitionSessionRequest, @Nullable Long l) {
        sendStopPartitionSessionResponse(stopPartitionSessionRequest.getPartitionSessionId());
    }

    @Override // tech.ydb.topic.read.impl.ReaderImpl
    protected void handleClosePartitionSession(tech.ydb.topic.read.PartitionSession partitionSession) {
        logger.debug("ClosePartitionSession event received. Ignoring.");
    }

    @Override // tech.ydb.topic.read.impl.ReaderImpl
    protected void handleCloseReader() {
        logger.debug("CloseReader event received. Ignoring.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // tech.ydb.topic.impl.GrpcStreamRetrier
    public CompletableFuture<Void> shutdownImpl() {
        return super.shutdownImpl();
    }

    @Override // tech.ydb.topic.read.SyncReader
    public void shutdown() {
        shutdownImpl().join();
    }
}
