package tech.ydb.topic.read.impl;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.Status;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.read.PartitionOffsets;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.events.ReadEventHandler;
import tech.ydb.topic.read.events.ReaderClosedEvent;
import tech.ydb.topic.read.impl.events.CommitOffsetAcknowledgementEventImpl;
import tech.ydb.topic.read.impl.events.PartitionSessionClosedEventImpl;
import tech.ydb.topic.read.impl.events.StartPartitionSessionEventImpl;
import tech.ydb.topic.read.impl.events.StopPartitionSessionEventImpl;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;
import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings;

/* loaded from: input_file:tech/ydb/topic/read/impl/AsyncReaderImpl.class */
public class AsyncReaderImpl extends ReaderImpl implements AsyncReader {
    private static final Logger logger = LoggerFactory.getLogger(AsyncReaderImpl.class);
    private static final int DEFAULT_HANDLER_THREAD_COUNT = 4;
    private final Executor handlerExecutor;
    private final ExecutorService defaultHandlerExecutorService;
    private final ReadEventHandler eventHandler;

    public AsyncReaderImpl(TopicRpc topicRpc, ReaderSettings readerSettings, ReadEventHandlersSettings readEventHandlersSettings) {
        super(topicRpc, readerSettings);
        this.eventHandler = readEventHandlersSettings.getEventHandler();
        if (readEventHandlersSettings.getExecutor() != null) {
            logger.debug("Using handler executor provided by user");
            this.defaultHandlerExecutorService = null;
            this.handlerExecutor = readEventHandlersSettings.getExecutor();
        } else {
            logger.debug("Using default handler executor");
            this.defaultHandlerExecutorService = Executors.newFixedThreadPool(DEFAULT_HANDLER_THREAD_COUNT);
            this.handlerExecutor = this.defaultHandlerExecutorService;
        }
    }

    @Override // tech.ydb.topic.read.AsyncReader
    public CompletableFuture<Void> init() {
        return initImpl();
    }

    @Override // tech.ydb.topic.read.AsyncReader
    public CompletableFuture<Status> updateOffsetsInTransaction(YdbTransaction ydbTransaction, Map<String, List<PartitionOffsets>> map, UpdateOffsetsInTransactionSettings updateOffsetsInTransactionSettings) {
        if (ydbTransaction.isActive()) {
            return sendUpdateOffsetsInTransaction(ydbTransaction, map, updateOffsetsInTransactionSettings);
        }
        throw new IllegalArgumentException("Transaction is not active. Can only read topic messages in already running transactions from other services");
    }

    @Override // tech.ydb.topic.read.impl.ReaderImpl
    protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent dataReceivedEvent) {
        return CompletableFuture.runAsync(() -> {
            try {
                this.eventHandler.onMessages(dataReceivedEvent);
            } catch (Exception e) {
                String str = "Error in user DataReceivedEvent callback: " + e;
                logger.error(str);
                shutdownImpl(str).join();
                throw e;
            }
        }, this.handlerExecutor);
    }

    @Override // tech.ydb.topic.read.impl.ReaderImpl
    protected void handleCommitResponse(long j, PartitionSession partitionSession) {
        this.handlerExecutor.execute(() -> {
            this.eventHandler.onCommitResponse(new CommitOffsetAcknowledgementEventImpl(partitionSession, j));
        });
    }

    @Override // tech.ydb.topic.read.impl.ReaderImpl
    protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest startPartitionSessionRequest, PartitionSession partitionSession, Consumer<StartPartitionSessionSettings> consumer) {
        this.handlerExecutor.execute(() -> {
            YdbTopic.OffsetsRange partitionOffsets = startPartitionSessionRequest.getPartitionOffsets();
            this.eventHandler.onStartPartitionSession(new StartPartitionSessionEventImpl(partitionSession, startPartitionSessionRequest.getCommittedOffset(), new OffsetsRangeImpl(partitionOffsets.getStart(), partitionOffsets.getEnd()), consumer));
        });
    }

    @Override // tech.ydb.topic.read.impl.ReaderImpl
    protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest stopPartitionSessionRequest, PartitionSession partitionSession, Runnable runnable) {
        StopPartitionSessionEventImpl stopPartitionSessionEventImpl = new StopPartitionSessionEventImpl(partitionSession, stopPartitionSessionRequest.getCommittedOffset(), runnable);
        this.handlerExecutor.execute(() -> {
            this.eventHandler.onStopPartitionSession(stopPartitionSessionEventImpl);
        });
    }

    @Override // tech.ydb.topic.read.impl.ReaderImpl
    protected void handleClosePartitionSession(PartitionSession partitionSession) {
        PartitionSessionClosedEventImpl partitionSessionClosedEventImpl = new PartitionSessionClosedEventImpl(partitionSession);
        this.handlerExecutor.execute(() -> {
            this.eventHandler.onPartitionSessionClosed(partitionSessionClosedEventImpl);
        });
    }

    protected void handleReaderClosed() {
        this.handlerExecutor.execute(() -> {
            this.eventHandler.onReaderClosed(new ReaderClosedEvent());
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // tech.ydb.topic.read.impl.ReaderImpl, tech.ydb.topic.impl.GrpcStreamRetrier
    public void onShutdown(String str) {
        super.onShutdown(str);
        handleReaderClosed();
        if (this.defaultHandlerExecutorService != null) {
            logger.debug("Shutting down default handler executor");
            this.defaultHandlerExecutorService.shutdown();
        }
    }

    @Override // tech.ydb.topic.read.AsyncReader
    public CompletableFuture<Void> shutdown() {
        return shutdownImpl();
    }
}
