package tech.ydb.topic.write.impl;

import java.util.Deque;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.impl.GrpcStreamRetrier;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.utils.Encoder;
import tech.ydb.topic.write.InitResult;
import tech.ydb.topic.write.Message;
import tech.ydb.topic.write.QueueOverflowException;
import tech.ydb.topic.write.WriteAck;

/* loaded from: input_file:tech/ydb/topic/write/impl/WriterImpl.class */
public abstract class WriterImpl extends GrpcStreamRetrier {
    private static final Logger logger = LoggerFactory.getLogger(WriterImpl.class);
    private WriteSessionImpl session;
    private final WriterSettings settings;
    private final TopicRpc topicRpc;
    private final AtomicReference<CompletableFuture<InitResult>> initResultFutureRef;
    private final Queue<IncomingMessage> incomingQueue;
    private final Queue<EnqueuedMessage> encodingMessages;
    private final Queue<EnqueuedMessage> sendingQueue;
    private final Deque<EnqueuedMessage> sentMessages;
    private final AtomicBoolean writeRequestInProgress;
    private final Executor compressionExecutor;
    private final long maxSendBufferMemorySize;
    private final AtomicLong seqNumberCounter;
    private Boolean isSeqNoProvided;
    private int currentInFlightCount;
    private long availableSizeBytes;
    private CompletableFuture<WriteAck> lastAcceptedMessageFuture;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: tech.ydb.topic.write.impl.WriterImpl$1, reason: invalid class name */
    /* loaded from: input_file:tech/ydb/topic/write/impl/WriterImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$Skipped$Reason;
        static final /* synthetic */ int[] $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$MessageWriteStatusCase = new int[YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.MessageWriteStatusCase.values().length];

        static {
            try {
                $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$MessageWriteStatusCase[YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.MessageWriteStatusCase.WRITTEN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$MessageWriteStatusCase[YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.MessageWriteStatusCase.SKIPPED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$Skipped$Reason = new int[YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.Skipped.Reason.values().length];
            try {
                $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$Skipped$Reason[YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.Skipped.Reason.REASON_ALREADY_WRITTEN.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$Skipped$Reason[YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.Skipped.Reason.REASON_UNSPECIFIED.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tech/ydb/topic/write/impl/WriterImpl$IncomingMessage.class */
    public static class IncomingMessage {
        private final EnqueuedMessage message;
        private final CompletableFuture<Void> future;

        private IncomingMessage(EnqueuedMessage enqueuedMessage) {
            this.future = new CompletableFuture<>();
            this.message = enqueuedMessage;
        }

        /* synthetic */ IncomingMessage(EnqueuedMessage enqueuedMessage, AnonymousClass1 anonymousClass1) {
            this(enqueuedMessage);
        }
    }

    /* loaded from: input_file:tech/ydb/topic/write/impl/WriterImpl$WriteSessionImpl.class */
    private class WriteSessionImpl extends WriteSession {
        protected String sessionId;
        private final String fullId;
        private final MessageSender messageSender;
        private final AtomicBoolean isInitialized;

        private WriteSessionImpl() {
            super(WriterImpl.this.topicRpc);
            this.sessionId = "";
            this.isInitialized = new AtomicBoolean(false);
            this.fullId = WriterImpl.this.id + '.' + WriterImpl.this.seqNumberCounter.incrementAndGet();
            this.messageSender = new MessageSender(WriterImpl.this.settings);
        }

        @Override // tech.ydb.topic.impl.Session
        public void startAndInitialize() {
            WriterImpl.logger.debug("[{}] Session {} startAndInitialize called", this.fullId, this.sessionId);
            start(this::processMessage).whenComplete(this::onSessionClosing);
            YdbTopic.StreamWriteMessage.InitRequest.Builder path = YdbTopic.StreamWriteMessage.InitRequest.newBuilder().setPath(WriterImpl.this.settings.getTopicPath());
            String producerId = WriterImpl.this.settings.getProducerId();
            if (producerId != null) {
                path.setProducerId(producerId);
            }
            String messageGroupId = WriterImpl.this.settings.getMessageGroupId();
            Long partitionId = WriterImpl.this.settings.getPartitionId();
            if (messageGroupId != null) {
                if (partitionId != null) {
                    throw new IllegalArgumentException("Both MessageGroupId and PartitionId are set in WriterSettings");
                }
                path.setMessageGroupId(messageGroupId);
            } else if (partitionId != null) {
                path.setPartitionId(partitionId.longValue());
            }
            send(YdbTopic.StreamWriteMessage.FromClient.newBuilder().setInitRequest(path).build());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void sendDataRequestIfNeeded() {
            while (this.isInitialized.get()) {
                if (!this.isWorking.get()) {
                    WriterImpl.logger.debug("[{}] Can't send data: current session has been already stopped", this.fullId);
                    return;
                }
                if (WriterImpl.this.sendingQueue.isEmpty()) {
                    WriterImpl.logger.trace("[{}] Nothing to send -- sendingQueue is empty", this.fullId);
                    return;
                }
                if (!WriterImpl.this.writeRequestInProgress.compareAndSet(false, true)) {
                    WriterImpl.logger.debug("[{}] Send request is already in progress", this.fullId);
                    return;
                }
                LinkedList linkedList = new LinkedList(WriterImpl.this.sendingQueue);
                if (linkedList.isEmpty()) {
                    WriterImpl.logger.debug("[{}] Nothing to send -- sendingQueue is empty #2", this.fullId);
                } else {
                    WriterImpl.this.sendingQueue.removeAll(linkedList);
                    WriterImpl.this.sentMessages.addAll(linkedList);
                    this.messageSender.sendMessages(linkedList);
                    WriterImpl.logger.debug("[{}] Sent {} messages to server", this.fullId, Integer.valueOf(linkedList.size()));
                }
                if (!WriterImpl.this.writeRequestInProgress.compareAndSet(true, false)) {
                    WriterImpl.logger.error("[{}] Couldn't turn off writeRequestInProgress. Should not happen", this.fullId);
                }
            }
            WriterImpl.logger.debug("[{}] Can't send data: current session is not yet initialized", this.fullId);
        }

        private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse initResponse) {
            this.sessionId = initResponse.getSessionId();
            WriterImpl.logger.info("[{}] Session {} initialized", this.fullId, this.sessionId);
            long lastSeqNo = initResponse.getLastSeqNo();
            long j = lastSeqNo;
            if (!WriterImpl.this.sentMessages.isEmpty()) {
                j = Math.max(lastSeqNo, ((EnqueuedMessage) WriterImpl.this.sentMessages.getLast()).getSeqNo().longValue());
            }
            this.messageSender.setSession(this);
            this.messageSender.setSeqNo(j);
            if (!WriterImpl.this.sentMessages.isEmpty()) {
                WriterImpl.logger.info("Resending {} messages that haven't received ack's yet into new session...", Integer.valueOf(WriterImpl.this.sentMessages.size()));
                this.messageSender.sendMessages(WriterImpl.this.sentMessages);
            }
            if (WriterImpl.this.initResultFutureRef.get() != null) {
                ((CompletableFuture) WriterImpl.this.initResultFutureRef.get()).complete(new InitResult(lastSeqNo));
            }
            this.isInitialized.set(true);
            sendDataRequestIfNeeded();
        }

        private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse writeResponse) {
            int i = 0;
            long j = 0;
            for (YdbTopic.StreamWriteMessage.WriteResponse.WriteAck writeAck : writeResponse.getAcksList()) {
                while (true) {
                    EnqueuedMessage enqueuedMessage = (EnqueuedMessage) WriterImpl.this.sentMessages.peek();
                    if (enqueuedMessage != null) {
                        if (enqueuedMessage.getSeqNo().longValue() == writeAck.getSeqNo()) {
                            processWriteAck(enqueuedMessage, writeAck);
                            i++;
                            j += enqueuedMessage.getSizeBytes();
                            WriterImpl.this.sentMessages.remove();
                            break;
                        }
                        if (enqueuedMessage.getSeqNo().longValue() >= writeAck.getSeqNo()) {
                            WriterImpl.logger.info("[{}] Received an ack with seqNo {} which is older than the oldest message with seqNo {} waiting for ack", new Object[]{this.fullId, Long.valueOf(writeAck.getSeqNo()), enqueuedMessage.getSeqNo()});
                            break;
                        }
                        WriterImpl.logger.warn("[{}] Received an ack for seqNo {}, but the oldest seqNo waiting for ack is {}", new Object[]{this.fullId, Long.valueOf(writeAck.getSeqNo()), enqueuedMessage.getSeqNo()});
                        enqueuedMessage.getFuture().completeExceptionally(new RuntimeException("Didn't get ack from server for this message"));
                        i++;
                        j += enqueuedMessage.getSizeBytes();
                        WriterImpl.this.sentMessages.remove();
                    }
                }
            }
            WriterImpl.this.free(i, j);
        }

        private void processMessage(YdbTopic.StreamWriteMessage.FromServer fromServer) {
            WriterImpl.logger.debug("[{}] processMessage called", this.fullId);
            if (fromServer.getStatus() != StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
                WriterImpl.logger.warn("[{}] Got non-success status in processMessage method: {}", this.fullId, fromServer);
                WriterImpl.this.onSessionClosed(Status.of(StatusCode.fromProto(fromServer.getStatus())).withIssues(new Issue[]{Issue.of("Got a message with non-success status: " + fromServer, Issue.Severity.ERROR)}), null);
                return;
            }
            WriterImpl.this.reconnectCounter.set(0);
            if (fromServer.hasInitResponse()) {
                onInitResponse(fromServer.getInitResponse());
            } else if (fromServer.hasWriteResponse()) {
                onWriteResponse(fromServer.getWriteResponse());
            }
        }

        private void processWriteAck(EnqueuedMessage enqueuedMessage, YdbTopic.StreamWriteMessage.WriteResponse.WriteAck writeAck) {
            WriteAck writeAck2;
            switch (AnonymousClass1.$SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$MessageWriteStatusCase[writeAck.getMessageWriteStatusCase().ordinal()]) {
                case 1:
                    writeAck2 = new WriteAck(writeAck.getSeqNo(), WriteAck.State.WRITTEN, new WriteAck.Details(writeAck.getWritten().getOffset()));
                    break;
                case 2:
                    switch (AnonymousClass1.$SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$Skipped$Reason[writeAck.getSkipped().getReason().ordinal()]) {
                        case 1:
                            writeAck2 = new WriteAck(writeAck.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null);
                            break;
                        case 2:
                        default:
                            enqueuedMessage.getFuture().completeExceptionally(new RuntimeException("Unknown WriteAck skipped reason"));
                            return;
                    }
                default:
                    enqueuedMessage.getFuture().completeExceptionally(new RuntimeException("Unknown WriteAck state"));
                    return;
            }
            enqueuedMessage.getFuture().complete(writeAck2);
        }

        private void onSessionClosing(Status status, Throwable th) {
            WriterImpl.logger.info("[{}] Session {} onSessionClosing called", this.fullId, this.sessionId);
            if (this.isWorking.get()) {
                shutdown();
                WriterImpl.this.onSessionClosed(status, th);
            }
        }

        @Override // tech.ydb.topic.impl.SessionBase
        protected void onStop() {
            WriterImpl.logger.debug("[{}] Session {} onStop called", this.fullId, this.sessionId);
        }

        /* synthetic */ WriteSessionImpl(WriterImpl writerImpl, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    public WriterImpl(TopicRpc topicRpc, WriterSettings writerSettings, Executor executor) {
        super(topicRpc.getScheduler());
        this.initResultFutureRef = new AtomicReference<>(null);
        this.incomingQueue = new LinkedList();
        this.encodingMessages = new LinkedList();
        this.sendingQueue = new ConcurrentLinkedQueue();
        this.sentMessages = new ConcurrentLinkedDeque();
        this.writeRequestInProgress = new AtomicBoolean();
        this.seqNumberCounter = new AtomicLong(0L);
        this.isSeqNoProvided = null;
        this.currentInFlightCount = 0;
        this.topicRpc = topicRpc;
        this.settings = writerSettings;
        this.session = new WriteSessionImpl(this, null);
        this.availableSizeBytes = writerSettings.getMaxSendBufferMemorySize();
        this.maxSendBufferMemorySize = writerSettings.getMaxSendBufferMemorySize();
        this.compressionExecutor = executor;
        logger.info("Writer (generated id " + this.id + ") created for topic \"" + writerSettings.getTopicPath() + "\" with producerId \"" + writerSettings.getProducerId() + "\" and messageGroupId \"" + writerSettings.getMessageGroupId() + "\"");
    }

    @Override // tech.ydb.topic.impl.GrpcStreamRetrier
    protected Logger getLogger() {
        return logger;
    }

    @Override // tech.ydb.topic.impl.GrpcStreamRetrier
    protected String getStreamName() {
        return "Writer";
    }

    public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage enqueuedMessage, boolean z) {
        synchronized (this.incomingQueue) {
            if (this.currentInFlightCount >= this.settings.getMaxSendBufferMessagesCount()) {
                if (z) {
                    logger.trace("[{}] Rejecting a message due to reaching message queue in-flight limit", this.id);
                    CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                    completableFuture.completeExceptionally(new QueueOverflowException("Message queue in-flight limit reached"));
                    return completableFuture;
                }
                logger.debug("[{}] Message queue in-flight limit reached. Putting the message into incoming waiting queue", this.id);
            } else if (this.availableSizeBytes <= enqueuedMessage.getMessage().getData().length) {
                if (z) {
                    logger.trace("[{}] Rejecting a message due to reaching message queue size limit", this.id);
                    CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
                    completableFuture2.completeExceptionally(new QueueOverflowException("Message queue size limit reached"));
                    return completableFuture2;
                }
                logger.debug("[{}] Message queue size limit reached. Putting the message into incoming waiting queue", this.id);
            } else if (this.incomingQueue.isEmpty()) {
                logger.trace("[{}] Putting a message into the queue right now, enough space in send buffer", this.id);
                acceptMessageIntoSendingQueue(enqueuedMessage);
                return CompletableFuture.completedFuture(null);
            }
            IncomingMessage incomingMessage = new IncomingMessage(enqueuedMessage, null);
            this.incomingQueue.add(incomingMessage);
            return incomingMessage.future;
        }
    }

    private void acceptMessageIntoSendingQueue(EnqueuedMessage enqueuedMessage) {
        this.lastAcceptedMessageFuture = enqueuedMessage.getFuture();
        this.currentInFlightCount++;
        this.availableSizeBytes -= enqueuedMessage.getUncompressedSizeBytes();
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] Accepted 1 message of {} uncompressed bytes. Current In-flight: {}, AvailableSizeBytes: {} ({} / {} acquired)", new Object[]{this.id, Long.valueOf(enqueuedMessage.getUncompressedSizeBytes()), Integer.valueOf(this.currentInFlightCount), Long.valueOf(this.availableSizeBytes), Long.valueOf(this.maxSendBufferMemorySize - this.availableSizeBytes), Long.valueOf(this.maxSendBufferMemorySize)});
        }
        this.encodingMessages.add(enqueuedMessage);
        CompletableFuture.runAsync(() -> {
            encode(enqueuedMessage);
        }, this.compressionExecutor).thenRunAsync(() -> {
            boolean z = false;
            synchronized (this.incomingQueue) {
                while (true) {
                    EnqueuedMessage peek = this.encodingMessages.peek();
                    if (peek == null || !(peek.isCompressed() || this.settings.getCodec() == Codec.RAW)) {
                        break;
                    }
                    this.encodingMessages.remove();
                    if (peek.isCompressed()) {
                        if (logger.isTraceEnabled()) {
                            logger.trace("[{}] Message compressed from {} to {} bytes", new Object[]{this.id, Long.valueOf(peek.getUncompressedSizeBytes()), Long.valueOf(peek.getCompressedSizeBytes())});
                        }
                        free(0, peek.getUncompressedSizeBytes() - peek.getCompressedSizeBytes());
                    }
                    logger.debug("[{}] Adding message to sending queue", this.id);
                    this.sendingQueue.add(peek);
                    z = true;
                }
            }
            if (z) {
                this.session.sendDataRequestIfNeeded();
            }
        }).exceptionally(th -> {
            logger.error("[{}] Exception while encoding message: ", this.id, th);
            free(1, enqueuedMessage.getSizeBytes());
            enqueuedMessage.getFuture().completeExceptionally(th);
            return null;
        });
    }

    private void encode(EnqueuedMessage enqueuedMessage) {
        logger.trace("[{}] Started encoding message", this.id);
        if (this.settings.getCodec() == Codec.RAW) {
            return;
        }
        enqueuedMessage.getMessage().setData(Encoder.encode(this.settings.getCodec(), enqueuedMessage.getMessage().getData()));
        enqueuedMessage.setCompressedSizeBytes(enqueuedMessage.getMessage().getData().length);
        enqueuedMessage.setCompressed(true);
        logger.trace("[{}] Successfully finished encoding message", this.id);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<InitResult> initImpl() {
        logger.info("[{}] initImpl called", this.id);
        if (this.initResultFutureRef.compareAndSet(null, new CompletableFuture<>())) {
            this.session.startAndInitialize();
        } else {
            logger.warn("[{}] Init is called on this writer more than once. Nothing is done", this.id);
        }
        return this.initResultFutureRef.get();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<CompletableFuture<WriteAck>> sendImpl(Message message, boolean z) {
        if (this.isStopped.get()) {
            throw new RuntimeException("Writer is already stopped");
        }
        if (this.isSeqNoProvided == null) {
            this.isSeqNoProvided = Boolean.valueOf(message.getSeqNo() != null);
        } else {
            if (message.getSeqNo() != null && !this.isSeqNoProvided.booleanValue()) {
                throw new RuntimeException("SeqNo was provided for a message after it had not been provided for another message. SeqNo should either be provided for all messages or none of them.");
            }
            if (message.getSeqNo() == null && this.isSeqNoProvided.booleanValue()) {
                throw new RuntimeException("SeqNo was not provided for a message after it had been provided for another message. SeqNo should either be provided for all messages or none of them.");
            }
        }
        EnqueuedMessage enqueuedMessage = new EnqueuedMessage(message);
        return tryToEnqueue(enqueuedMessage, z).thenApply(r3 -> {
            return enqueuedMessage.getFuture();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public CompletableFuture<Void> flushImpl() {
        CompletableFuture completedFuture;
        if (this.lastAcceptedMessageFuture == null) {
            return CompletableFuture.completedFuture(null);
        }
        synchronized (this.incomingQueue) {
            completedFuture = this.lastAcceptedMessageFuture.isDone() ? CompletableFuture.completedFuture(null) : this.lastAcceptedMessageFuture.thenApply((Function<? super WriteAck, ? extends U>) writeAck -> {
                return null;
            });
        }
        return completedFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void free(int i, long j) {
        synchronized (this.incomingQueue) {
            this.currentInFlightCount -= i;
            this.availableSizeBytes += j;
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Freed {} bytes in {} messages. Current In-flight: {}, current availableSize: {} ({} / {} acquired)", new Object[]{this.id, Long.valueOf(j), Integer.valueOf(i), Integer.valueOf(this.currentInFlightCount), Long.valueOf(this.availableSizeBytes), Long.valueOf(this.maxSendBufferMemorySize - this.availableSizeBytes), Long.valueOf(this.maxSendBufferMemorySize)});
            }
            if (j > 0 && !this.incomingQueue.isEmpty()) {
                while (true) {
                    IncomingMessage peek = this.incomingQueue.peek();
                    if (peek == null) {
                        logger.trace("[{}] All messages from incomingQueue are accepted into send buffer", this.id);
                        break;
                    } else {
                        if (peek.message.getUncompressedSizeBytes() > this.availableSizeBytes || this.currentInFlightCount >= this.settings.getMaxSendBufferMessagesCount()) {
                            break;
                        }
                        logger.trace("[{}] Putting a message into send buffer after freeing some space", this.id);
                        if (peek.future.complete(null)) {
                            acceptMessageIntoSendingQueue(peek.message);
                        }
                        this.incomingQueue.remove();
                    }
                }
                logger.trace("[{}] There are messages in incomingQueue still, but no space in send buffer", this.id);
            }
        }
    }

    @Override // tech.ydb.topic.impl.GrpcStreamRetrier
    protected void onStreamReconnect() {
        this.session = new WriteSessionImpl(this, null);
        this.session.startAndInitialize();
    }

    @Override // tech.ydb.topic.impl.GrpcStreamRetrier
    protected void onShutdown(String str) {
        this.session.shutdown();
        if (this.initResultFutureRef.get() == null || this.initResultFutureRef.get().isDone()) {
            return;
        }
        this.initResultFutureRef.get().completeExceptionally(new RuntimeException(str));
    }
}
