package tech.ydb.topic.read.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.impl.GrpcStreamRetrier;
import tech.ydb.topic.read.PartitionOffsets;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.PartitionSessionImpl;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;
import tech.ydb.topic.settings.TopicReadSettings;
import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings;

/* loaded from: input_file:tech/ydb/topic/read/impl/ReaderImpl.class */
public abstract class ReaderImpl extends GrpcStreamRetrier {
    private static final Logger logger = LoggerFactory.getLogger(ReaderImpl.class);
    private static final int DEFAULT_DECOMPRESSION_THREAD_COUNT = 4;
    private volatile ReadSessionImpl session;
    private final ReaderSettings settings;
    private final TopicRpc topicRpc;
    private final Executor decompressionExecutor;
    private final ExecutorService defaultDecompressionExecutorService;
    private final AtomicReference<CompletableFuture<Void>> initResultFutureRef;
    private final AtomicLong seqNumberCounter;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:tech/ydb/topic/read/impl/ReaderImpl$ReadSessionImpl.class */
    public class ReadSessionImpl extends ReadSession {
        protected String sessionId;
        private final String fullId;
        private final AtomicLong sizeBytesToRequest;
        private final Map<Long, PartitionSessionImpl> partitionSessions;

        private ReadSessionImpl() {
            super(ReaderImpl.this.topicRpc);
            this.sessionId = "";
            this.sizeBytesToRequest = new AtomicLong(0L);
            this.partitionSessions = new ConcurrentHashMap();
            this.fullId = ReaderImpl.this.id + '.' + ReaderImpl.this.seqNumberCounter.incrementAndGet();
        }

        @Override // tech.ydb.topic.impl.Session
        public void startAndInitialize() {
            ReaderImpl.logger.debug("[{}] Session {} startAndInitialize called", this.fullId, this.sessionId);
            start(this::processMessage).whenComplete(this::closeDueToError);
            YdbTopic.StreamReadMessage.InitRequest.Builder newBuilder = YdbTopic.StreamReadMessage.InitRequest.newBuilder();
            if (ReaderImpl.this.settings.getConsumerName() != null) {
                newBuilder.setConsumer(ReaderImpl.this.settings.getConsumerName());
            }
            ReaderImpl.this.settings.getTopics().forEach(topicReadSettings -> {
                YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.Builder path = YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.newBuilder().setPath(topicReadSettings.getPath());
                if (topicReadSettings.getPartitionIds() != null && !topicReadSettings.getPartitionIds().isEmpty()) {
                    path.addAllPartitionIds(topicReadSettings.getPartitionIds());
                }
                if (topicReadSettings.getMaxLag() != null) {
                    path.setMaxLag(ProtobufUtils.durationToProto(topicReadSettings.getMaxLag()));
                }
                if (topicReadSettings.getReadFrom() != null) {
                    path.setReadFrom(ProtobufUtils.instantToProto(topicReadSettings.getReadFrom()));
                }
                newBuilder.addTopicsReadSettings(path);
            });
            if (ReaderImpl.this.settings.getReaderName() != null && !ReaderImpl.this.settings.getReaderName().isEmpty()) {
                newBuilder.setReaderName(ReaderImpl.this.settings.getReaderName());
            }
            send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setInitRequest(newBuilder).build());
        }

        private void sendReadRequest() {
            long andSet = this.sizeBytesToRequest.getAndSet(0L);
            if (andSet <= 0) {
                ReaderImpl.logger.debug("[{}] Nothing to request in DataRequest. sizeBytesToRequest == {}", this.fullId, Long.valueOf(andSet));
            } else {
                ReaderImpl.logger.debug("[{}] Sending DataRequest with {} bytes", this.fullId, Long.valueOf(andSet));
                send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setReadRequest(YdbTopic.StreamReadMessage.ReadRequest.newBuilder().setBytesSize(andSet).build()).build());
            }
        }

        private void sendStartPartitionSessionResponse(PartitionSessionImpl partitionSessionImpl, StartPartitionSessionSettings startPartitionSessionSettings) {
            if (!this.isWorking.get()) {
                ReaderImpl.logger.info("[{}] Need to send StartPartitionSessionResponse for partition session {} (partition {}), but reading session is already closed", new Object[]{this.fullId, Long.valueOf(partitionSessionImpl.getId()), Long.valueOf(partitionSessionImpl.getPartitionId())});
                return;
            }
            if (!this.partitionSessions.containsKey(Long.valueOf(partitionSessionImpl.getId()))) {
                ReaderImpl.logger.info("[{}] Need to send StartPartitionSessionResponse for partition session {} (partition {}), but have no such partition session active", new Object[]{this.fullId, Long.valueOf(partitionSessionImpl.getId()), Long.valueOf(partitionSessionImpl.getPartitionId())});
                return;
            }
            YdbTopic.StreamReadMessage.StartPartitionSessionResponse.Builder partitionSessionId = YdbTopic.StreamReadMessage.StartPartitionSessionResponse.newBuilder().setPartitionSessionId(partitionSessionImpl.getId());
            Long l = null;
            Long l2 = null;
            if (startPartitionSessionSettings != null) {
                l = startPartitionSessionSettings.getReadOffset();
                if (l != null) {
                    partitionSessionId.setReadOffset(l.longValue());
                    partitionSessionImpl.setLastReadOffset(l.longValue());
                }
                l2 = startPartitionSessionSettings.getCommitOffset();
                if (l2 != null) {
                    partitionSessionId.setCommitOffset(l2.longValue());
                    partitionSessionImpl.setLastCommittedOffset(l2.longValue());
                }
            }
            ReaderImpl.logger.info("[{}] Sending StartPartitionSessionResponse for partition session {} (partition {}) with readOffset {} and commitOffset {}", new Object[]{this.fullId, Long.valueOf(partitionSessionImpl.getId()), Long.valueOf(partitionSessionImpl.getPartitionId()), l, l2});
            send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setStartPartitionSessionResponse(partitionSessionId.build()).build());
        }

        private void sendStopPartitionSessionResponse(long j) {
            if (!this.isWorking.get()) {
                ReaderImpl.logger.info("[{}] Need to send StopPartitionSessionResponse for partition session {}, but reading session is already closed", this.fullId, Long.valueOf(j));
                return;
            }
            PartitionSessionImpl remove = this.partitionSessions.remove(Long.valueOf(j));
            if (remove != null) {
                remove.shutdown();
                ReaderImpl.logger.info("[{}] Sending StopPartitionSessionResponse for partition session {} (partition {})", new Object[]{this.fullId, Long.valueOf(j), Long.valueOf(remove.getPartitionId())});
            } else {
                ReaderImpl.logger.warn("[{}] Sending StopPartitionSessionResponse for partition session {}, but have no such partition session active", this.fullId, Long.valueOf(j));
            }
            send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setStopPartitionSessionResponse(YdbTopic.StreamReadMessage.StopPartitionSessionResponse.newBuilder().setPartitionSessionId(j).build()).build());
        }

        private void sendCommitOffsetRequest(long j, long j2, List<OffsetsRange> list) {
            if (this.isWorking.get()) {
                YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.Builder partitionSessionId = YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder().setPartitionSessionId(j);
                list.forEach(offsetsRange -> {
                    partitionSessionId.addOffsets(YdbTopic.OffsetsRange.newBuilder().setStart(offsetsRange.getStart()).setEnd(offsetsRange.getEnd()));
                });
                send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setCommitOffsetRequest(YdbTopic.StreamReadMessage.CommitOffsetRequest.newBuilder().addCommitOffsets(partitionSessionId)).build());
            } else if (ReaderImpl.logger.isInfoEnabled()) {
                StringBuilder append = new StringBuilder("[").append(this.fullId).append("] Need to send CommitRequest for partition session ").append(j).append(" (partition ").append(j2).append(") with offset ranges ");
                for (int i = 0; i < list.size(); i++) {
                    if (i > 0) {
                        append.append(", ");
                    }
                    OffsetsRange offsetsRange2 = list.get(i);
                    append.append("[").append(offsetsRange2.getStart()).append(",").append(offsetsRange2.getEnd()).append(")");
                }
                append.append(", but reading session is already closed");
                ReaderImpl.logger.info(append.toString());
            }
        }

        private void closePartitionSessions() {
            this.partitionSessions.values().forEach(this::closePartitionSession);
            this.partitionSessions.clear();
        }

        private void closePartitionSession(PartitionSessionImpl partitionSessionImpl) {
            partitionSessionImpl.shutdown();
            ReaderImpl.this.handleClosePartitionSession(partitionSessionImpl.getSessionInfo());
        }

        private void onInitResponse(YdbTopic.StreamReadMessage.InitResponse initResponse) {
            this.sessionId = initResponse.getSessionId();
            if (ReaderImpl.this.initResultFutureRef.get() != null) {
                ((CompletableFuture) ReaderImpl.this.initResultFutureRef.get()).complete(null);
            }
            this.sizeBytesToRequest.set(ReaderImpl.this.settings.getMaxMemoryUsageBytes());
            ReaderImpl.logger.info("[{}] Session {} initialized. Requesting {} bytes...", new Object[]{this.fullId, this.sessionId, Long.valueOf(ReaderImpl.this.settings.getMaxMemoryUsageBytes())});
            sendReadRequest();
        }

        private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest startPartitionSessionRequest) {
            long partitionSessionId = startPartitionSessionRequest.getPartitionSession().getPartitionSessionId();
            long partitionId = startPartitionSessionRequest.getPartitionSession().getPartitionId();
            ReaderImpl.logger.info("[{}] Received StartPartitionSessionRequest: partition session {} (partition {}) with committedOffset {} and partitionOffsets [{}-{})", new Object[]{this.fullId, Long.valueOf(partitionSessionId), Long.valueOf(partitionId), Long.valueOf(startPartitionSessionRequest.getCommittedOffset()), Long.valueOf(startPartitionSessionRequest.getPartitionOffsets().getStart()), Long.valueOf(startPartitionSessionRequest.getPartitionOffsets().getEnd())});
            PartitionSessionImpl.Builder decompressionExecutor = PartitionSessionImpl.newBuilder().setId(partitionSessionId).setPath(startPartitionSessionRequest.getPartitionSession().getPath()).setPartitionId(partitionId).setCommittedOffset(startPartitionSessionRequest.getCommittedOffset()).setPartitionOffsets(new OffsetsRangeImpl(startPartitionSessionRequest.getPartitionOffsets().getStart(), startPartitionSessionRequest.getPartitionOffsets().getEnd())).setDecompressionExecutor(ReaderImpl.this.decompressionExecutor);
            ReaderImpl readerImpl = ReaderImpl.this;
            PartitionSessionImpl build = decompressionExecutor.setDataEventCallback(readerImpl::handleDataReceivedEvent).setCommitFunction(list -> {
                sendCommitOffsetRequest(partitionSessionId, partitionId, list);
            }).build();
            this.partitionSessions.put(Long.valueOf(build.getId()), build);
            ReaderImpl.this.handleStartPartitionSessionRequest(startPartitionSessionRequest, build.getSessionInfo(), startPartitionSessionSettings -> {
                sendStartPartitionSessionResponse(build, startPartitionSessionSettings);
            });
        }

        protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPartitionSessionRequest stopPartitionSessionRequest) {
            if (!stopPartitionSessionRequest.getGraceful()) {
                PartitionSessionImpl remove = this.partitionSessions.remove(Long.valueOf(stopPartitionSessionRequest.getPartitionSessionId()));
                if (remove == null) {
                    ReaderImpl.logger.info("[{}] Received force StopPartitionSessionRequest for partition session {}, but have no such partition session running", this.fullId, Long.valueOf(stopPartitionSessionRequest.getPartitionSessionId()));
                    return;
                } else {
                    ReaderImpl.logger.info("[{}] Received force StopPartitionSessionRequest for partition session {} (partition {})", new Object[]{this.fullId, Long.valueOf(remove.getId()), Long.valueOf(remove.getPartitionId())});
                    closePartitionSession(remove);
                    return;
                }
            }
            PartitionSessionImpl partitionSessionImpl = this.partitionSessions.get(Long.valueOf(stopPartitionSessionRequest.getPartitionSessionId()));
            if (partitionSessionImpl != null) {
                ReaderImpl.logger.info("[{}] Received graceful StopPartitionSessionRequest for partition session {} (partition {})", new Object[]{this.fullId, Long.valueOf(partitionSessionImpl.getId()), Long.valueOf(partitionSessionImpl.getPartitionId())});
                ReaderImpl.this.handleStopPartitionSession(stopPartitionSessionRequest, partitionSessionImpl.getSessionInfo(), () -> {
                    sendStopPartitionSessionResponse(stopPartitionSessionRequest.getPartitionSessionId());
                });
            } else {
                ReaderImpl.logger.error("[{}] Received graceful StopPartitionSessionRequest for partition session {}, but have no such partition session active", this.fullId, Long.valueOf(stopPartitionSessionRequest.getPartitionSessionId()));
                closeDueToError(null, new RuntimeException("Restarting read session due to receiving StopPartitionSessionRequest with PartitionSessionId " + stopPartitionSessionRequest.getPartitionSessionId() + " that SDK knows nothing about"));
            }
        }

        private void onReadResponse(YdbTopic.StreamReadMessage.ReadResponse readResponse) {
            long bytesSize = readResponse.getBytesSize();
            ReaderImpl.logger.trace("[{}] Received ReadResponse of {} bytes", this.fullId, Long.valueOf(bytesSize));
            ArrayList arrayList = new ArrayList();
            readResponse.getPartitionDataList().forEach(partitionData -> {
                long partitionSessionId = partitionData.getPartitionSessionId();
                PartitionSessionImpl partitionSessionImpl = this.partitionSessions.get(Long.valueOf(partitionSessionId));
                if (partitionSessionImpl != null) {
                    arrayList.add(partitionSessionImpl.addBatches(partitionData.getBatchesList()));
                } else {
                    ReaderImpl.logger.info("[{}] Received PartitionData for unknown(most likely already closed) PartitionSessionId={}", this.fullId, Long.valueOf(partitionSessionId));
                }
            });
            CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).whenComplete((r9, th) -> {
                if (th != null) {
                    ReaderImpl.logger.error("[{}] Exception while waiting for batches to be read:", this.fullId, th);
                }
                if (!this.isWorking.get()) {
                    ReaderImpl.logger.trace("[{}] Finished handling ReadResponse of {} bytes. Read session is already closed -- no need to send ReadRequest", this.fullId, Long.valueOf(bytesSize));
                    return;
                }
                ReaderImpl.logger.trace("[{}] Finished handling ReadResponse of {} bytes. Sending ReadRequest...", this.fullId, Long.valueOf(bytesSize));
                this.sizeBytesToRequest.addAndGet(bytesSize);
                sendReadRequest();
            });
        }

        protected void onCommitOffsetResponse(YdbTopic.StreamReadMessage.CommitOffsetResponse commitOffsetResponse) {
            ReaderImpl.logger.trace("[{}] Received CommitOffsetResponse", this.fullId);
            for (YdbTopic.StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset partitionCommittedOffset : commitOffsetResponse.getPartitionsCommittedOffsetsList()) {
                PartitionSessionImpl partitionSessionImpl = this.partitionSessions.get(Long.valueOf(partitionCommittedOffset.getPartitionSessionId()));
                if (partitionSessionImpl != null) {
                    partitionSessionImpl.handleCommitResponse(partitionCommittedOffset.getCommittedOffset());
                    ReaderImpl.this.handleCommitResponse(partitionCommittedOffset.getCommittedOffset(), partitionSessionImpl.getSessionInfo());
                } else {
                    ReaderImpl.logger.info("[{}] Received CommitOffsetResponse for unknown (most likely already closed) partition session with id={}", this.fullId, Long.valueOf(partitionCommittedOffset.getPartitionSessionId()));
                }
            }
        }

        protected void onPartitionSessionStatusResponse(YdbTopic.StreamReadMessage.PartitionSessionStatusResponse partitionSessionStatusResponse) {
            PartitionSessionImpl partitionSessionImpl = this.partitionSessions.get(Long.valueOf(partitionSessionStatusResponse.getPartitionSessionId()));
            Logger logger = ReaderImpl.logger;
            Object[] objArr = new Object[6];
            objArr[0] = this.fullId;
            objArr[1] = Long.valueOf(partitionSessionStatusResponse.getPartitionSessionId());
            objArr[2] = partitionSessionImpl == null ? "unknown" : Long.valueOf(partitionSessionImpl.getPartitionId());
            objArr[3] = Long.valueOf(partitionSessionStatusResponse.getPartitionOffsets().getStart());
            objArr[ReaderImpl.DEFAULT_DECOMPRESSION_THREAD_COUNT] = Long.valueOf(partitionSessionStatusResponse.getPartitionOffsets().getEnd());
            objArr[5] = Long.valueOf(partitionSessionStatusResponse.getCommittedOffset());
            logger.info("[{}] Received PartitionSessionStatusResponse: partition session {} (partition {}). Partition offsets: [{}, {}). Committed offset: {}", objArr);
        }

        private void processMessage(YdbTopic.StreamReadMessage.FromServer fromServer) {
            if (!this.isWorking.get()) {
                ReaderImpl.logger.debug("[{}] processMessage called, but read session is already closed", this.fullId);
                return;
            }
            ReaderImpl.logger.debug("[{}] processMessage called", this.fullId);
            if (fromServer.getStatus() != StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
                Status of = Status.of(StatusCode.fromProto(fromServer.getStatus()), Issue.fromPb(fromServer.getIssuesList()));
                ReaderImpl.logger.warn("[{}] Got non-success status in processMessage method: {}", this.fullId, of);
                closeDueToError(of, null);
                return;
            }
            ReaderImpl.this.reconnectCounter.set(0);
            if (fromServer.hasInitResponse()) {
                onInitResponse(fromServer.getInitResponse());
                return;
            }
            if (fromServer.hasStartPartitionSessionRequest()) {
                onStartPartitionSessionRequest(fromServer.getStartPartitionSessionRequest());
                return;
            }
            if (fromServer.hasStopPartitionSessionRequest()) {
                onStopPartitionSessionRequest(fromServer.getStopPartitionSessionRequest());
                return;
            }
            if (fromServer.hasReadResponse()) {
                onReadResponse(fromServer.getReadResponse());
                return;
            }
            if (fromServer.hasCommitOffsetResponse()) {
                onCommitOffsetResponse(fromServer.getCommitOffsetResponse());
                return;
            }
            if (fromServer.hasPartitionSessionStatusResponse()) {
                onPartitionSessionStatusResponse(fromServer.getPartitionSessionStatusResponse());
            } else if (fromServer.hasUpdateTokenResponse()) {
                ReaderImpl.logger.debug("[{}] Received UpdateTokenResponse", this.fullId);
            } else {
                ReaderImpl.logger.error("[{}] Unhandled message from server: {}", this.fullId, fromServer);
            }
        }

        protected void closeDueToError(Status status, Throwable th) {
            ReaderImpl.logger.info("[{}] Session {} closeDueToError called", this.fullId, this.sessionId);
            if (shutdown()) {
                ReaderImpl.this.onSessionClosed(status, th);
            }
        }

        @Override // tech.ydb.topic.impl.SessionBase
        protected void onStop() {
            ReaderImpl.logger.debug("[{}] Session {} onStop called", this.fullId, this.sessionId);
            closePartitionSessions();
        }
    }

    public ReaderImpl(TopicRpc topicRpc, ReaderSettings readerSettings) {
        super(topicRpc.getScheduler());
        this.initResultFutureRef = new AtomicReference<>(null);
        this.seqNumberCounter = new AtomicLong(0L);
        this.topicRpc = topicRpc;
        this.settings = readerSettings;
        this.session = new ReadSessionImpl();
        if (readerSettings.getDecompressionExecutor() != null) {
            this.defaultDecompressionExecutorService = null;
            this.decompressionExecutor = readerSettings.getDecompressionExecutor();
        } else {
            this.defaultDecompressionExecutorService = Executors.newFixedThreadPool(DEFAULT_DECOMPRESSION_THREAD_COUNT);
            this.decompressionExecutor = this.defaultDecompressionExecutorService;
        }
        StringBuilder sb = new StringBuilder("Reader");
        if (readerSettings.getReaderName() != null && !readerSettings.getReaderName().isEmpty()) {
            sb.append(" \"").append(readerSettings.getReaderName()).append("\"");
        }
        sb.append(" (generated id ").append(this.id).append(")");
        sb.append(" created for topic(s): ");
        for (TopicReadSettings topicReadSettings : readerSettings.getTopics()) {
            if (topicReadSettings != readerSettings.getTopics().get(0)) {
                sb.append(", ");
            }
            sb.append("\"").append(topicReadSettings.getPath()).append("\"");
        }
        if (readerSettings.getConsumerName() != null) {
            sb.append(" and Consumer: \"").append(readerSettings.getConsumerName()).append("\"");
        } else {
            sb.append(" without a Consumer");
        }
        logger.info(sb.toString());
    }

    @Override // tech.ydb.topic.impl.GrpcStreamRetrier
    protected Logger getLogger() {
        return logger;
    }

    @Override // tech.ydb.topic.impl.GrpcStreamRetrier
    protected String getStreamName() {
        return "Reader";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> initImpl() {
        logger.info("[{}] initImpl called", this.id);
        if (this.initResultFutureRef.compareAndSet(null, new CompletableFuture<>())) {
            this.session.startAndInitialize();
        } else {
            logger.warn("[{}] Init is called on this reader more than once. Nothing is done", this.id);
        }
        return this.initResultFutureRef.get();
    }

    protected abstract CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent dataReceivedEvent);

    protected abstract void handleCommitResponse(long j, PartitionSession partitionSession);

    protected abstract void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest startPartitionSessionRequest, PartitionSession partitionSession, Consumer<StartPartitionSessionSettings> consumer);

    protected abstract void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest stopPartitionSessionRequest, PartitionSession partitionSession, Runnable runnable);

    protected abstract void handleClosePartitionSession(PartitionSession partitionSession);

    @Override // tech.ydb.topic.impl.GrpcStreamRetrier
    protected void onStreamReconnect() {
        this.session = new ReadSessionImpl();
        this.session.startAndInitialize();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // tech.ydb.topic.impl.GrpcStreamRetrier
    public void onShutdown(String str) {
        this.session.shutdown();
        if (this.initResultFutureRef.get() != null && !this.initResultFutureRef.get().isDone()) {
            this.initResultFutureRef.get().completeExceptionally(new RuntimeException(str));
        }
        if (this.defaultDecompressionExecutorService != null) {
            this.defaultDecompressionExecutorService.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Status> sendUpdateOffsetsInTransaction(YdbTransaction ydbTransaction, Map<String, List<PartitionOffsets>> map, UpdateOffsetsInTransactionSettings updateOffsetsInTransactionSettings) {
        if (map.isEmpty()) {
            throw new IllegalArgumentException("Empty topic list to update in transaction");
        }
        if (logger.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder("Updating ");
            boolean z = true;
            for (Map.Entry<String, List<PartitionOffsets>> entry : map.entrySet()) {
                if (entry.getValue().isEmpty()) {
                    throw new IllegalArgumentException("Empty offsets range to update in transaction");
                }
                for (PartitionOffsets partitionOffsets : entry.getValue()) {
                    if (z) {
                        z = false;
                    } else {
                        sb.append(", ");
                    }
                    sb.append("offsets [").append(partitionOffsets.getOffsets().get(0).getStart()).append("..").append(partitionOffsets.getOffsets().get(partitionOffsets.getOffsets().size() - 1).getEnd()).append(") for partition ").append(partitionOffsets.getPartitionSession().getPartitionId()).append(" [topic ").append(entry.getKey()).append("]");
                }
            }
            logger.debug(sb.toString());
        }
        ReadSessionImpl readSessionImpl = this.session;
        ydbTransaction.getStatusFuture().whenComplete((status, th) -> {
            if (th != null) {
                readSessionImpl.closeDueToError(null, new RuntimeException("Restarting read session due to transaction " + ydbTransaction.getId() + " with partition offsets from read session " + readSessionImpl.fullId + " was not committed with reason: " + th));
            } else {
                if (status.isSuccess()) {
                    return;
                }
                readSessionImpl.closeDueToError(null, new RuntimeException("Restarting read session due to transaction " + ydbTransaction.getId() + " with partition offsets from read session " + readSessionImpl.fullId + " was not committed with status: " + status));
            }
        });
        YdbTopic.UpdateOffsetsInTransactionRequest.Builder consumer = YdbTopic.UpdateOffsetsInTransactionRequest.newBuilder().setTx(YdbTopic.TransactionIdentity.newBuilder().setId(ydbTransaction.getId()).setSession(ydbTransaction.getSessionId())).setConsumer(this.settings.getConsumerName());
        map.forEach((str, list) -> {
            YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.Builder path = YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.newBuilder().setPath(str);
            list.forEach(partitionOffsets2 -> {
                YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets.Builder partitionId = YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets.newBuilder().setPartitionId(partitionOffsets2.getPartitionSession().getPartitionId());
                partitionOffsets2.getOffsets().forEach(offsetsRange -> {
                    partitionId.addPartitionOffsets(YdbTopic.OffsetsRange.newBuilder().setStart(offsetsRange.getStart()).setEnd(offsetsRange.getEnd()).build());
                });
                path.addPartitions(partitionId);
            });
            consumer.addTopics(path);
        });
        return this.topicRpc.updateOffsetsInTransaction(consumer.build(), GrpcRequestSettings.newBuilder().withDeadline(updateOffsetsInTransactionSettings.getRequestTimeout()).withTraceId(updateOffsetsInTransactionSettings.getTraceId() == null ? UUID.randomUUID().toString() : updateOffsetsInTransactionSettings.getTraceId()).build());
    }
}
