package io.pravega.client.connection.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.pravega.common.io.StreamHelpers;
import io.pravega.shared.NameUtils;
import io.pravega.shared.metrics.ClientMetricKeys;
import io.pravega.shared.metrics.MetricNotifier;
import io.pravega.shared.protocol.netty.Append;
import io.pravega.shared.protocol.netty.AppendBatchSizeTracker;
import io.pravega.shared.protocol.netty.InvalidMessageException;
import io.pravega.shared.protocol.netty.WireCommand;
import io.pravega.shared.protocol.netty.WireCommandType;
import io.pravega.shared.protocol.netty.WireCommands;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.io.OutputStream;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/connection/impl/CommandEncoder.class */
public class CommandEncoder {

    @VisibleForTesting
    static final int MAX_QUEUED_EVENTS = 500;

    @VisibleForTesting
    static final int MAX_QUEUED_SIZE = 1048576;
    private final Function<Long, AppendBatchSizeTracker> appendTracker;
    private final MetricNotifier metricNotifier;

    @GuardedBy("$lock")
    private String segmentBeingAppendedTo;

    @GuardedBy("$lock")
    private UUID writerIdPerformingAppends;

    @GuardedBy("$lock")
    private int currentBlockSize;

    @GuardedBy("$lock")
    private int bytesLeftInBlock;
    private final OutputStream output;

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(CommandEncoder.class);
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private final Object $lock = new Object[0];

    @GuardedBy("$lock")
    private final Map<Map.Entry<String, UUID>, Session> setupSegments = new HashMap();
    private final AtomicLong tokenCounter = new AtomicLong(0);

    @GuardedBy("$lock")
    private final Map<UUID, Session> pendingWrites = new HashMap();
    private final ByteBuf buffer = Unpooled.buffer(MAX_QUEUED_SIZE);

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:io/pravega/client/connection/impl/CommandEncoder$Session.class */
    public final class Session {
        private final UUID id;
        private final long requestId;
        private final List<ByteBuf> pendingList = new ArrayList();
        private int pendingBytes = 0;
        private long lastEventNumber = -1;
        private int eventCount = 0;

        /* JADX INFO: Access modifiers changed from: private */
        public void record(Append append) {
            this.lastEventNumber = append.getEventNumber();
            this.eventCount += append.getEventCount();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isFree() {
            return this.eventCount == 0;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void write(ByteBuf byteBuf) throws IOException {
            CommandEncoder.this.pendingWrites.putIfAbsent(this.id, this);
            if (byteBuf.readableBytes() > 0) {
                this.pendingBytes += byteBuf.readableBytes();
                this.pendingList.add(byteBuf);
            }
            conditionalFlush();
        }

        private void conditionalFlush() throws IOException {
            if (this.pendingBytes > CommandEncoder.MAX_QUEUED_SIZE || this.eventCount > CommandEncoder.MAX_QUEUED_EVENTS) {
                CommandEncoder.this.breakCurrentAppend();
                CommandEncoder.this.flushAllToBuffer();
                CommandEncoder.this.flushBuffer();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flushToBuffer() {
            if (isFree()) {
                return;
            }
            CommandEncoder.this.pendingWrites.remove(this.id);
            CommandEncoder.this.writeMessage(new WireCommands.AppendBlock(this.id), this.pendingBytes);
            if (this.pendingBytes > 0) {
                List<ByteBuf> list = this.pendingList;
                ByteBuf byteBuf = CommandEncoder.this.buffer;
                byteBuf.getClass();
                list.forEach(byteBuf::writeBytes);
                this.pendingList.clear();
            }
            flushToBuffer(this.pendingBytes, null);
            this.pendingBytes = 0;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flushToBuffer(int i, ByteBuf byteBuf) {
            CommandEncoder.writeMessage((WireCommand) new WireCommands.AppendBlockEnd(this.id, i, byteBuf, this.eventCount, this.lastEventNumber, this.requestId), CommandEncoder.this.buffer);
            this.eventCount = 0;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        @ConstructorProperties({"id", "requestId"})
        public Session(UUID uuid, long j) {
            this.id = uuid;
            this.requestId = j;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public UUID getId() {
            return this.id;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flushAllToBuffer() {
        if (this.pendingWrites.isEmpty()) {
            return;
        }
        new ArrayList(this.pendingWrites.values()).forEach(session -> {
            session.flushToBuffer();
        });
    }

    public void write(WireCommand wireCommand) throws IOException {
        synchronized (this.$lock) {
            if (wireCommand instanceof WireCommands.SetupAppend) {
                breakCurrentAppend();
                flushAllToBuffer();
                writeMessage(wireCommand, this.buffer);
                WireCommands.SetupAppend setupAppend = (WireCommands.SetupAppend) wireCommand;
                this.setupSegments.put(new AbstractMap.SimpleImmutableEntry(setupAppend.getSegment(), setupAppend.getWriterId()), new Session(setupAppend.getWriterId(), setupAppend.getRequestId()));
                flushBuffer();
            } else if (wireCommand instanceof WireCommands.Hello) {
                Preconditions.checkState(isChannelFree());
                Preconditions.checkState(this.pendingWrites.isEmpty());
                writeMessage(wireCommand, this.buffer);
                flushBuffer();
            } else {
                breakCurrentAppend();
                flushAllToBuffer();
                writeMessage(wireCommand, this.buffer);
                flushBuffer();
            }
        }
    }

    public void write(Append append) throws IOException {
        synchronized (this.$lock) {
            Session session = this.setupSegments.get(new AbstractMap.SimpleImmutableEntry(append.getSegment(), append.getWriterId()));
            validateAppend(append, session);
            ByteBuf slice = append.getData().slice();
            AppendBatchSizeTracker apply = this.appendTracker == null ? null : this.appendTracker.apply(Long.valueOf(append.getFlowId()));
            if (apply != null) {
                apply.recordAppend(append.getEventNumber(), slice.readableBytes());
            }
            if (!isChannelFree()) {
                session.record(append);
                if (!isChannelOwner(append.getWriterId(), append.getSegment())) {
                    session.write(slice);
                } else if (this.bytesLeftInBlock > slice.readableBytes()) {
                    continueAppend(slice);
                } else {
                    completeAppend(slice.readSlice(this.bytesLeftInBlock), slice);
                    flushAllToBuffer();
                    flushBuffer();
                }
            } else if (session.isFree()) {
                session.record(append);
                startAppend(apply, append);
                continueAppend(slice);
                if (this.bytesLeftInBlock == 0) {
                    completeAppend(null);
                    flushBuffer();
                }
            } else {
                session.record(append);
                session.write(slice);
                session.flushToBuffer();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @GuardedBy("$lock")
    public void flushBuffer() throws IOException {
        this.buffer.getBytes(this.buffer.readerIndex(), this.output, this.buffer.readableBytes());
        this.buffer.clear();
    }

    @VisibleForTesting
    static void validateAppend(Append append, Session session) {
        if (append.getEventCount() <= 0) {
            throw new InvalidMessageException("Invalid eventCount : " + append.getEventCount() + " in the append for Writer id: " + append.getWriterId());
        }
        if (session == null || !session.getId().equals(append.getWriterId())) {
            throw new InvalidMessageException("Sending appends without setting up the append. Append Writer id: " + append.getWriterId());
        }
        if (append.getEventNumber() <= session.lastEventNumber) {
            throw new InvalidMessageException("Events written out of order. Received: " + append.getEventNumber() + " following: " + session.lastEventNumber + " for Writer id: " + append.getWriterId());
        }
        if (append.isConditional()) {
            throw new IllegalArgumentException("Conditional appends should be written via a ConditionalAppend object.");
        }
    }

    private boolean isChannelFree() {
        return this.writerIdPerformingAppends == null;
    }

    private boolean isChannelOwner(UUID uuid, String str) {
        return uuid.equals(this.writerIdPerformingAppends) && str.equals(this.segmentBeingAppendedTo);
    }

    private void startAppend(AppendBatchSizeTracker appendBatchSizeTracker, Append append) {
        int readableBytes = append.getData().readableBytes();
        int appendBlockSize = appendBatchSizeTracker.getAppendBlockSize();
        if (this.metricNotifier != null && !this.metricNotifier.equals(MetricNotifier.NO_OP_METRIC_NOTIFIER)) {
            this.metricNotifier.updateSuccessMetric(ClientMetricKeys.CLIENT_APPEND_BLOCK_SIZE, NameUtils.segmentTags(append.getSegment(), append.getWriterId().toString()), appendBlockSize);
        }
        this.segmentBeingAppendedTo = append.getSegment();
        this.writerIdPerformingAppends = append.getWriterId();
        if (appendBlockSize > readableBytes) {
            this.currentBlockSize = appendBlockSize;
            writeMessage(new WireCommands.AppendBlock(this.writerIdPerformingAppends), this.currentBlockSize + 8);
            this.tokenCounter.incrementAndGet();
        } else {
            this.currentBlockSize = readableBytes;
            writeMessage(new WireCommands.AppendBlock(this.writerIdPerformingAppends), this.currentBlockSize);
        }
        this.bytesLeftInBlock = this.currentBlockSize;
    }

    private void continueAppend(ByteBuf byteBuf) {
        this.bytesLeftInBlock -= byteBuf.readableBytes();
        this.buffer.writeBytes(byteBuf);
    }

    private void completeAppend(ByteBuf byteBuf) {
        this.setupSegments.get(new AbstractMap.SimpleImmutableEntry(this.segmentBeingAppendedTo, this.writerIdPerformingAppends)).flushToBuffer(this.currentBlockSize - this.bytesLeftInBlock, byteBuf);
        this.tokenCounter.incrementAndGet();
        this.bytesLeftInBlock = 0;
        this.currentBlockSize = 0;
        this.segmentBeingAppendedTo = null;
        this.writerIdPerformingAppends = null;
    }

    private void completeAppend(ByteBuf byteBuf, ByteBuf byteBuf2) {
        writeMessage((WireCommand) new WireCommands.PartialEvent(byteBuf), this.buffer);
        completeAppend(byteBuf2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void breakCurrentAppend() {
        if (isChannelFree()) {
            return;
        }
        writePadding();
        completeAppend(null);
    }

    private void writePadding() {
        this.buffer.writeInt(WireCommandType.PADDING.getCode());
        this.buffer.writeInt(this.bytesLeftInBlock);
        this.buffer.writeZero(this.bytesLeftInBlock);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeMessage(WireCommands.AppendBlock appendBlock, int i) {
        try {
            int writerIndex = this.buffer.writerIndex();
            ByteBufOutputStream byteBufOutputStream = new ByteBufOutputStream(this.buffer);
            byteBufOutputStream.writeInt(appendBlock.getType().getCode());
            byteBufOutputStream.write(LENGTH_PLACEHOLDER);
            appendBlock.writeFields(byteBufOutputStream);
            byteBufOutputStream.flush();
            byteBufOutputStream.close();
            this.buffer.setInt(writerIndex + 4, ((this.buffer.writerIndex() - writerIndex) - 8) + i);
        } catch (IOException e) {
            throw e;
        }
    }

    @VisibleForTesting
    static int writeMessage(WireCommand wireCommand, ByteBuf byteBuf) {
        try {
            int writerIndex = byteBuf.writerIndex();
            ByteBufOutputStream byteBufOutputStream = new ByteBufOutputStream(byteBuf);
            byteBufOutputStream.writeInt(wireCommand.getType().getCode());
            byteBufOutputStream.write(LENGTH_PLACEHOLDER);
            wireCommand.writeFields(byteBufOutputStream);
            byteBufOutputStream.flush();
            byteBufOutputStream.close();
            int writerIndex2 = byteBuf.writerIndex();
            byteBuf.setInt(writerIndex + 4, (writerIndex2 - writerIndex) - 8);
            return writerIndex2 - writerIndex;
        } catch (IOException e) {
            throw e;
        }
    }

    public long batchTimeout(long j) {
        long j2;
        synchronized (this.$lock) {
            j2 = this.tokenCounter.get();
            if (j2 == j) {
                try {
                    breakCurrentAppend();
                    flushAllToBuffer();
                    flushBuffer();
                } catch (IOException e) {
                    log.error("Failed to time out block. Closeing connection.", e);
                    StreamHelpers.closeQuietly(this.output, log, "Closing output failed", new Object[0]);
                }
            }
        }
        return j2;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    @ConstructorProperties({"appendTracker", "metricNotifier", "output"})
    public CommandEncoder(Function<Long, AppendBatchSizeTracker> function, MetricNotifier metricNotifier, OutputStream outputStream) {
        this.appendTracker = function;
        this.metricNotifier = metricNotifier;
        this.output = outputStream;
    }
}
