package tech.ydb.topic.read.impl;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.Status;
import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.PartitionOffsets;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.TransactionMessageAccumulator;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.events.DataReceivedEventImpl;
import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings;

/* loaded from: input_file:tech/ydb/topic/read/impl/TransactionMessageAccumulatorImpl.class */
public class TransactionMessageAccumulatorImpl implements TransactionMessageAccumulator {
    private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class);
    private final AsyncReader reader;
    private final Map<String, Map<PartitionSession, PartitionRanges>> rangesByTopic = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tech/ydb/topic/read/impl/TransactionMessageAccumulatorImpl$PartitionRanges.class */
    public static class PartitionRanges {
        private final PartitionSession partitionSession;
        private final DisjointOffsetRangeSet ranges;

        private PartitionRanges(PartitionSession partitionSession) {
            this.ranges = new DisjointOffsetRangeSet();
            this.partitionSession = partitionSession;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void add(OffsetsRange offsetsRange) {
            try {
                synchronized (this.ranges) {
                    this.ranges.add(offsetsRange);
                }
            } catch (RuntimeException e) {
                String str = "Error adding new offset range to DeferredCommitter for partition session " + this.partitionSession.getId() + " (partition " + this.partitionSession.getPartitionId() + "): " + e.getMessage();
                TransactionMessageAccumulatorImpl.logger.error(str);
                throw new RuntimeException(str, e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<OffsetsRange> getOffsetsRanges() {
            List<OffsetsRange> rangesAndClear;
            synchronized (this.ranges) {
                rangesAndClear = this.ranges.getRangesAndClear();
            }
            return rangesAndClear;
        }
    }

    TransactionMessageAccumulatorImpl(AsyncReader asyncReader) {
        this.reader = asyncReader;
    }

    @Override // tech.ydb.topic.read.MessageAccumulator
    public void add(Message message) {
        this.rangesByTopic.computeIfAbsent(message.getPartitionSession().getPath(), str -> {
            return new ConcurrentHashMap();
        }).computeIfAbsent(message.getPartitionSession(), partitionSession -> {
            return new PartitionRanges(partitionSession);
        }).add(((MessageImpl) message).getOffsetsToCommit());
    }

    @Override // tech.ydb.topic.read.MessageAccumulator
    public void add(DataReceivedEvent dataReceivedEvent) {
        this.rangesByTopic.computeIfAbsent(dataReceivedEvent.getPartitionSession().getPath(), str -> {
            return new ConcurrentHashMap();
        }).computeIfAbsent(dataReceivedEvent.getPartitionSession(), partitionSession -> {
            return new PartitionRanges(partitionSession);
        }).add(((DataReceivedEventImpl) dataReceivedEvent).getOffsetsToCommit());
    }

    @Override // tech.ydb.topic.read.TransactionMessageAccumulator
    public CompletableFuture<Status> updateOffsetsInTransaction(YdbTransaction ydbTransaction, UpdateOffsetsInTransactionSettings updateOffsetsInTransactionSettings) {
        HashMap hashMap = new HashMap();
        this.rangesByTopic.forEach((str, map) -> {
            hashMap.put(str, map.entrySet().stream().map(entry -> {
                return new PartitionOffsets((PartitionSession) entry.getKey(), ((PartitionRanges) entry.getValue()).getOffsetsRanges());
            }).collect(Collectors.toList()));
        });
        return this.reader.updateOffsetsInTransaction(ydbTransaction, hashMap, updateOffsetsInTransactionSettings);
    }
}
