package io.pravega.shared.protocol.netty;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.io.netty.buffer.ByteBuf;
import io.pravega.shaded.io.netty.buffer.ByteBufOutputStream;
import io.pravega.shaded.io.netty.channel.Channel;
import io.pravega.shaded.io.netty.channel.ChannelHandlerContext;
import io.pravega.shaded.io.netty.handler.codec.MessageToByteEncoder;
import io.pravega.shared.metrics.ClientMetricKeys;
import io.pravega.shared.metrics.MetricNotifier;
import io.pravega.shared.protocol.netty.WireCommands;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:io/pravega/shared/protocol/netty/CommandEncoder.class */
public class CommandEncoder extends MessageToByteEncoder<Object> {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(CommandEncoder.class);
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    private final Function<Long, AppendBatchSizeTracker> appendTracker;
    private final MetricNotifier metricNotifier;
    private final Map<Map.Entry<String, UUID>, Session> setupSegments;
    private final AtomicLong tokenCounter;
    private String segmentBeingAppendedTo;
    private UUID writerIdPerformingAppends;
    private int currentBlockSize;
    private int bytesLeftInBlock;
    private final Map<UUID, Session> pendingWrites;

    /* loaded from: input_file:io/pravega/shared/protocol/netty/CommandEncoder$BlockTimeout.class */
    private static final class BlockTimeout {
        private final long token;

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"token"})
        public BlockTimeout(long j) {
            this.token = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/shared/protocol/netty/CommandEncoder$BlockTimeouter.class */
    public final class BlockTimeouter implements Runnable {
        private final Channel channel;
        private final long token;

        @Override // java.lang.Runnable
        public void run() {
            if (CommandEncoder.this.tokenCounter.get() == this.token) {
                this.channel.writeAndFlush(new BlockTimeout(this.token));
            }
        }

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"channel", "token"})
        public BlockTimeouter(Channel channel, long j) {
            this.channel = channel;
            this.token = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/shared/protocol/netty/CommandEncoder$Session.class */
    public final class Session {
        private static final int MAX_EVENTS = 500;
        private static final int MAX_DATA_SIZE = 1048576;
        private final UUID id;
        private final long requestId;
        private final List<ByteBuf> pendingList = new ArrayList();
        private int pendingBytes = 0;
        private long lastEventNumber = -1;
        private int eventCount = 0;

        /* JADX INFO: Access modifiers changed from: private */
        public void record(Append append) {
            this.lastEventNumber = append.getEventNumber();
            this.eventCount += append.getEventCount();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isFree() {
            return this.eventCount == 0;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void write(ByteBuf byteBuf, ByteBuf byteBuf2) {
            CommandEncoder.this.pendingWrites.putIfAbsent(this.id, this);
            if (byteBuf.readableBytes() > 0) {
                this.pendingBytes += byteBuf.readableBytes();
                this.pendingList.add(byteBuf);
            }
            conditionalFlush(byteBuf2);
        }

        private void conditionalFlush(ByteBuf byteBuf) {
            if (this.pendingBytes > 1048576 || this.eventCount > 500) {
                CommandEncoder.this.breakCurrentAppend(byteBuf);
                flush(byteBuf);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flush(ByteBuf byteBuf) {
            if (isFree()) {
                return;
            }
            CommandEncoder.this.pendingWrites.remove(this.id);
            CommandEncoder.this.writeMessage(new WireCommands.AppendBlock(this.id), this.pendingBytes, byteBuf);
            if (this.pendingBytes > 0) {
                List<ByteBuf> list = this.pendingList;
                byteBuf.getClass();
                list.forEach(byteBuf::writeBytes);
                this.pendingList.clear();
            }
            flush(this.pendingBytes, null, byteBuf);
            this.pendingBytes = 0;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flush(int i, ByteBuf byteBuf, ByteBuf byteBuf2) {
            CommandEncoder.writeMessage(new WireCommands.AppendBlockEnd(this.id, i, byteBuf, this.eventCount, this.lastEventNumber, this.requestId), byteBuf2);
            this.eventCount = 0;
        }

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"id", "requestId"})
        public Session(UUID uuid, long j) {
            this.id = uuid;
            this.requestId = j;
        }
    }

    public CommandEncoder(Function<Long, AppendBatchSizeTracker> function) {
        this(function, MetricNotifier.NO_OP_METRIC_NOTIFIER);
    }

    private void flushAll(ByteBuf byteBuf) {
        if (this.pendingWrites.isEmpty()) {
            return;
        }
        new ArrayList(this.pendingWrites.values()).forEach(session -> {
            session.flush(byteBuf);
        });
    }

    @Override // io.pravega.shaded.io.netty.handler.codec.MessageToByteEncoder
    protected void encode(ChannelHandlerContext channelHandlerContext, Object obj, ByteBuf byteBuf) throws Exception {
        log.trace("Encoding message to send over the wire {}", obj);
        if (!(obj instanceof Append)) {
            if (obj instanceof WireCommands.SetupAppend) {
                breakCurrentAppend(byteBuf);
                flushAll(byteBuf);
                writeMessage((WireCommands.SetupAppend) obj, byteBuf);
                WireCommands.SetupAppend setupAppend = (WireCommands.SetupAppend) obj;
                this.setupSegments.put(new AbstractMap.SimpleImmutableEntry(setupAppend.getSegment(), setupAppend.getWriterId()), new Session(setupAppend.getWriterId(), setupAppend.getRequestId()));
                return;
            }
            if (obj instanceof BlockTimeout) {
                if (this.tokenCounter.get() == ((BlockTimeout) obj).token) {
                    breakCurrentAppend(byteBuf);
                    flushAll(byteBuf);
                    return;
                }
                return;
            }
            if (!(obj instanceof WireCommand)) {
                throw new IllegalArgumentException("Expected a wire command and found: " + obj);
            }
            breakCurrentAppend(byteBuf);
            flushAll(byteBuf);
            writeMessage((WireCommand) obj, byteBuf);
            return;
        }
        Append append = (Append) obj;
        Session session = this.setupSegments.get(new AbstractMap.SimpleImmutableEntry(append.segment, append.getWriterId()));
        validateAppend(append, session);
        ByteBuf slice = append.getData().slice();
        AppendBatchSizeTracker apply = this.appendTracker == null ? null : this.appendTracker.apply(Long.valueOf(append.getFlowId()));
        if (apply != null) {
            apply.recordAppend(append.getEventNumber(), slice.readableBytes());
        }
        if (!isChannelFree()) {
            session.record(append);
            if (!isChannelOwner(append.getWriterId(), append.getSegment())) {
                session.write(slice, byteBuf);
                return;
            } else if (this.bytesLeftInBlock > slice.readableBytes()) {
                continueAppend(slice, byteBuf);
                return;
            } else {
                completeAppend(slice.readSlice(this.bytesLeftInBlock), slice, byteBuf);
                flushAll(byteBuf);
                return;
            }
        }
        if (!session.isFree()) {
            session.record(append);
            session.write(slice, byteBuf);
            session.flush(byteBuf);
        } else {
            session.record(append);
            startAppend(channelHandlerContext, apply, append, byteBuf);
            continueAppend(slice, byteBuf);
            if (this.bytesLeftInBlock == 0) {
                completeAppend(null, byteBuf);
            }
        }
    }

    private void validateAppend(Append append, Session session) {
        if (append.getEventCount() <= 0) {
            throw new InvalidMessageException("Invalid eventCount : " + append.getEventCount() + " in the append for Writer id: " + append.getWriterId());
        }
        if (session == null || !session.id.equals(append.getWriterId())) {
            throw new InvalidMessageException("Sending appends without setting up the append. Append Writer id: " + append.getWriterId());
        }
        if (append.getEventNumber() <= session.lastEventNumber) {
            throw new InvalidMessageException("Events written out of order. Received: " + append.getEventNumber() + " following: " + session.lastEventNumber + " for Writer id: " + append.getWriterId());
        }
        if (append.isConditional()) {
            throw new IllegalArgumentException("Conditional appends should be written via a ConditionalAppend object.");
        }
    }

    private boolean isChannelFree() {
        return this.writerIdPerformingAppends == null;
    }

    private boolean isChannelOwner(UUID uuid, String str) {
        return uuid.equals(this.writerIdPerformingAppends) && str.equals(this.segmentBeingAppendedTo);
    }

    private void startAppend(ChannelHandlerContext channelHandlerContext, AppendBatchSizeTracker appendBatchSizeTracker, Append append, ByteBuf byteBuf) {
        int readableBytes = append.getData().readableBytes();
        int i = 0;
        if (appendBatchSizeTracker != null) {
            i = appendBatchSizeTracker.getAppendBlockSize();
            this.metricNotifier.updateSuccessMetric(ClientMetricKeys.CLIENT_APPEND_BLOCK_SIZE, StreamSegmentNameUtils.segmentTags(append.getSegment(), append.getWriterId().toString()), i);
        }
        this.segmentBeingAppendedTo = append.segment;
        this.writerIdPerformingAppends = append.writerId;
        if (channelHandlerContext == null || i <= readableBytes) {
            this.currentBlockSize = readableBytes;
            writeMessage(new WireCommands.AppendBlock(this.writerIdPerformingAppends), this.currentBlockSize, byteBuf);
        } else {
            this.currentBlockSize = i;
            writeMessage(new WireCommands.AppendBlock(this.writerIdPerformingAppends), this.currentBlockSize + 8, byteBuf);
            channelHandlerContext.executor().schedule((Runnable) new BlockTimeouter(channelHandlerContext.channel(), this.tokenCounter.incrementAndGet()), appendBatchSizeTracker.getBatchTimeout(), TimeUnit.MILLISECONDS);
        }
        this.bytesLeftInBlock = this.currentBlockSize;
    }

    private void continueAppend(ByteBuf byteBuf, ByteBuf byteBuf2) {
        this.bytesLeftInBlock -= byteBuf.readableBytes();
        byteBuf2.writeBytes(byteBuf);
    }

    private void completeAppend(ByteBuf byteBuf, ByteBuf byteBuf2) {
        this.setupSegments.get(new AbstractMap.SimpleImmutableEntry(this.segmentBeingAppendedTo, this.writerIdPerformingAppends)).flush(this.currentBlockSize - this.bytesLeftInBlock, byteBuf, byteBuf2);
        this.tokenCounter.incrementAndGet();
        this.bytesLeftInBlock = 0;
        this.currentBlockSize = 0;
        this.segmentBeingAppendedTo = null;
        this.writerIdPerformingAppends = null;
    }

    private void completeAppend(ByteBuf byteBuf, ByteBuf byteBuf2, ByteBuf byteBuf3) {
        writeMessage(new WireCommands.PartialEvent(byteBuf), byteBuf3);
        completeAppend(byteBuf2, byteBuf3);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void breakCurrentAppend(ByteBuf byteBuf) {
        if (isChannelFree()) {
            return;
        }
        writeMessage(new WireCommands.Padding(this.bytesLeftInBlock), byteBuf);
        completeAppend(null, byteBuf);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeMessage(WireCommands.AppendBlock appendBlock, int i, ByteBuf byteBuf) {
        try {
            int writerIndex = byteBuf.writerIndex();
            ByteBufOutputStream byteBufOutputStream = new ByteBufOutputStream(byteBuf);
            byteBufOutputStream.writeInt(appendBlock.getType().getCode());
            byteBufOutputStream.write(LENGTH_PLACEHOLDER);
            appendBlock.writeFields(byteBufOutputStream);
            byteBufOutputStream.flush();
            byteBufOutputStream.close();
            byteBuf.setInt(writerIndex + 4, ((byteBuf.writerIndex() - writerIndex) - 8) + i);
        } catch (IOException e) {
            throw e;
        }
    }

    @VisibleForTesting
    static int writeMessage(WireCommand wireCommand, ByteBuf byteBuf) {
        try {
            int writerIndex = byteBuf.writerIndex();
            ByteBufOutputStream byteBufOutputStream = new ByteBufOutputStream(byteBuf);
            byteBufOutputStream.writeInt(wireCommand.getType().getCode());
            byteBufOutputStream.write(LENGTH_PLACEHOLDER);
            wireCommand.writeFields(byteBufOutputStream);
            byteBufOutputStream.flush();
            byteBufOutputStream.close();
            int writerIndex2 = byteBuf.writerIndex();
            byteBuf.setInt(writerIndex + 4, (writerIndex2 - writerIndex) - 8);
            return writerIndex2 - writerIndex;
        } catch (IOException e) {
            throw e;
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    @ConstructorProperties({"appendTracker", "metricNotifier"})
    public CommandEncoder(Function<Long, AppendBatchSizeTracker> function, MetricNotifier metricNotifier) {
        this.setupSegments = new HashMap();
        this.tokenCounter = new AtomicLong(0L);
        this.pendingWrites = new HashMap();
        this.appendTracker = function;
        this.metricNotifier = metricNotifier;
    }
}
