/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.connection.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.pravega.common.io.StreamHelpers;
import io.pravega.shared.NameUtils;
import io.pravega.shared.metrics.ClientMetricKeys;
import io.pravega.shared.metrics.MetricNotifier;
import io.pravega.shared.protocol.netty.Append;
import io.pravega.shared.protocol.netty.AppendBatchSizeTracker;
import io.pravega.shared.protocol.netty.InvalidMessageException;
import io.pravega.shared.protocol.netty.WireCommand;
import io.pravega.shared.protocol.netty.WireCommandType;
import io.pravega.shared.protocol.netty.WireCommands;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.io.OutputStream;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandEncoder {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(CommandEncoder.class);
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private final Object $lock = new Object[0];
    @VisibleForTesting
    static final int MAX_QUEUED_EVENTS = 500;
    @VisibleForTesting
    static final int MAX_QUEUED_SIZE = 0x100000;
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    private final Function<Long, AppendBatchSizeTracker> appendTracker;
    private final MetricNotifier metricNotifier;
    @GuardedBy(value="$lock")
    private final Map<Map.Entry<String, UUID>, Session> setupSegments = new HashMap<Map.Entry<String, UUID>, Session>();
    private final AtomicLong tokenCounter = new AtomicLong(0L);
    @GuardedBy(value="$lock")
    private String segmentBeingAppendedTo;
    @GuardedBy(value="$lock")
    private UUID writerIdPerformingAppends;
    @GuardedBy(value="$lock")
    private int currentBlockSize;
    @GuardedBy(value="$lock")
    private int bytesLeftInBlock;
    @GuardedBy(value="$lock")
    private final Map<UUID, Session> pendingWrites = new HashMap<UUID, Session>();
    private final OutputStream output;
    private final ByteBuf buffer = Unpooled.buffer(0x100000);

    private void flushAllToBuffer() {
        if (!this.pendingWrites.isEmpty()) {
            ArrayList<Session> sessions = new ArrayList<Session>(this.pendingWrites.values());
            sessions.forEach(session -> ((Session)session).flushToBuffer());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(WireCommand msg) throws IOException {
        Object object = this.$lock;
        synchronized (object) {
            if (msg instanceof WireCommands.SetupAppend) {
                this.breakCurrentAppend();
                this.flushAllToBuffer();
                CommandEncoder.writeMessage((WireCommands.SetupAppend)msg, this.buffer);
                WireCommands.SetupAppend setup = (WireCommands.SetupAppend)msg;
                this.setupSegments.put(new AbstractMap.SimpleImmutableEntry<String, UUID>(setup.getSegment(), setup.getWriterId()), new Session(setup.getWriterId(), setup.getRequestId()));
                this.flushBuffer();
            } else if (msg instanceof WireCommands.Hello) {
                Preconditions.checkState(this.isChannelFree());
                Preconditions.checkState(this.pendingWrites.isEmpty());
                CommandEncoder.writeMessage(msg, this.buffer);
                this.flushBuffer();
            } else {
                this.breakCurrentAppend();
                this.flushAllToBuffer();
                CommandEncoder.writeMessage(msg, this.buffer);
                this.flushBuffer();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(Append append) throws IOException {
        Object object = this.$lock;
        synchronized (object) {
            AppendBatchSizeTracker blockSizeSupplier;
            Session session = this.setupSegments.get(new AbstractMap.SimpleImmutableEntry<String, UUID>(append.getSegment(), append.getWriterId()));
            CommandEncoder.validateAppend(append, session);
            ByteBuf data = append.getData().slice();
            AppendBatchSizeTracker appendBatchSizeTracker = blockSizeSupplier = this.appendTracker == null ? null : this.appendTracker.apply(append.getFlowId());
            if (blockSizeSupplier != null) {
                blockSizeSupplier.recordAppend(append.getEventNumber(), data.readableBytes());
            }
            if (this.isChannelFree()) {
                if (session.isFree()) {
                    session.record(append);
                    this.startAppend(blockSizeSupplier, append);
                    this.continueAppend(data);
                    if (this.bytesLeftInBlock == 0) {
                        this.completeAppend(null);
                        this.flushBuffer();
                    }
                } else {
                    session.record(append);
                    session.write(data);
                    session.flushToBuffer();
                }
            } else {
                session.record(append);
                if (this.isChannelOwner(append.getWriterId(), append.getSegment())) {
                    if (this.bytesLeftInBlock > data.readableBytes()) {
                        this.continueAppend(data);
                    } else {
                        ByteBuf dataInsideBlock = data.readSlice(this.bytesLeftInBlock);
                        this.completeAppend(dataInsideBlock, data);
                        this.flushAllToBuffer();
                        this.flushBuffer();
                    }
                } else {
                    session.write(data);
                }
            }
        }
    }

    @GuardedBy(value="$lock")
    private void flushBuffer() throws IOException {
        this.buffer.getBytes(this.buffer.readerIndex(), this.output, this.buffer.readableBytes());
        this.buffer.clear();
    }

    @VisibleForTesting
    static void validateAppend(Append append, Session session) {
        if (append.getEventCount() <= 0) {
            throw new InvalidMessageException("Invalid eventCount : " + append.getEventCount() + " in the append for Writer id: " + append.getWriterId());
        }
        if (session == null || !session.getId().equals(append.getWriterId())) {
            throw new InvalidMessageException("Sending appends without setting up the append. Append Writer id: " + append.getWriterId());
        }
        if (append.getEventNumber() <= session.lastEventNumber) {
            throw new InvalidMessageException("Events written out of order. Received: " + append.getEventNumber() + " following: " + session.lastEventNumber + " for Writer id: " + append.getWriterId());
        }
        if (append.isConditional()) {
            throw new IllegalArgumentException("Conditional appends should be written via a ConditionalAppend object.");
        }
    }

    private boolean isChannelFree() {
        return this.writerIdPerformingAppends == null;
    }

    private boolean isChannelOwner(UUID writerID, String segment) {
        return writerID.equals(this.writerIdPerformingAppends) && segment.equals(this.segmentBeingAppendedTo);
    }

    private void startAppend(AppendBatchSizeTracker blockSizeSupplier, Append append) {
        int msgSize = append.getData().readableBytes();
        int blockSize = blockSizeSupplier.getAppendBlockSize();
        if (this.metricNotifier != null && !this.metricNotifier.equals(MetricNotifier.NO_OP_METRIC_NOTIFIER)) {
            this.metricNotifier.updateSuccessMetric(ClientMetricKeys.CLIENT_APPEND_BLOCK_SIZE, NameUtils.segmentTags(append.getSegment(), append.getWriterId().toString()), blockSize);
        }
        this.segmentBeingAppendedTo = append.getSegment();
        this.writerIdPerformingAppends = append.getWriterId();
        if (blockSize > msgSize) {
            this.currentBlockSize = blockSize;
            this.writeMessage(new WireCommands.AppendBlock(this.writerIdPerformingAppends), this.currentBlockSize + 8);
            this.tokenCounter.incrementAndGet();
        } else {
            this.currentBlockSize = msgSize;
            this.writeMessage(new WireCommands.AppendBlock(this.writerIdPerformingAppends), this.currentBlockSize);
        }
        this.bytesLeftInBlock = this.currentBlockSize;
    }

    private void continueAppend(ByteBuf data) {
        this.bytesLeftInBlock -= data.readableBytes();
        this.buffer.writeBytes(data);
    }

    private void completeAppend(ByteBuf pendingData) {
        Session session = this.setupSegments.get(new AbstractMap.SimpleImmutableEntry<String, UUID>(this.segmentBeingAppendedTo, this.writerIdPerformingAppends));
        session.flushToBuffer(this.currentBlockSize - this.bytesLeftInBlock, pendingData);
        this.tokenCounter.incrementAndGet();
        this.bytesLeftInBlock = 0;
        this.currentBlockSize = 0;
        this.segmentBeingAppendedTo = null;
        this.writerIdPerformingAppends = null;
    }

    private void completeAppend(ByteBuf data, ByteBuf pendingData) {
        CommandEncoder.writeMessage(new WireCommands.PartialEvent(data), this.buffer);
        this.completeAppend(pendingData);
    }

    private void breakCurrentAppend() {
        if (this.isChannelFree()) {
            return;
        }
        this.writePadding();
        this.completeAppend(null);
    }

    private void writePadding() {
        this.buffer.writeInt(WireCommandType.PADDING.getCode());
        this.buffer.writeInt(this.bytesLeftInBlock);
        this.buffer.writeZero(this.bytesLeftInBlock);
    }

    private void writeMessage(WireCommands.AppendBlock block, int blockSize) {
        int startIdx = this.buffer.writerIndex();
        ByteBufOutputStream bout = new ByteBufOutputStream(this.buffer);
        bout.writeInt(block.getType().getCode());
        bout.write(LENGTH_PLACEHOLDER);
        block.writeFields(bout);
        bout.flush();
        bout.close();
        int endIdx = this.buffer.writerIndex();
        int fieldsSize = endIdx - startIdx - 8;
        this.buffer.setInt(startIdx + 4, fieldsSize + blockSize);
    }

    @VisibleForTesting
    static int writeMessage(WireCommand msg, ByteBuf destination) {
        int startIdx = destination.writerIndex();
        ByteBufOutputStream bout = new ByteBufOutputStream(destination);
        bout.writeInt(msg.getType().getCode());
        bout.write(LENGTH_PLACEHOLDER);
        msg.writeFields(bout);
        bout.flush();
        bout.close();
        int endIdx = destination.writerIndex();
        int fieldsSize = endIdx - startIdx - 8;
        destination.setInt(startIdx + 4, fieldsSize);
        return endIdx - startIdx;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long batchTimeout(long token) {
        Object object = this.$lock;
        synchronized (object) {
            long result = this.tokenCounter.get();
            try {
                if (result == token) {
                    this.breakCurrentAppend();
                    this.flushAllToBuffer();
                    this.flushBuffer();
                }
            }
            catch (IOException e) {
                log.error("Failed to time out block. Closeing connection.", e);
                StreamHelpers.closeQuietly(this.output, log, "Closing output failed", new Object[0]);
            }
            return result;
        }
    }

    @ConstructorProperties(value={"appendTracker", "metricNotifier", "output"})
    @SuppressFBWarnings(justification="generated code")
    @Generated
    public CommandEncoder(Function<Long, AppendBatchSizeTracker> appendTracker, MetricNotifier metricNotifier, OutputStream output) {
        this.appendTracker = appendTracker;
        this.metricNotifier = metricNotifier;
        this.output = output;
    }

    @VisibleForTesting
    final class Session {
        private final UUID id;
        private final long requestId;
        private final List<ByteBuf> pendingList = new ArrayList<ByteBuf>();
        private int pendingBytes = 0;
        private long lastEventNumber = -1L;
        private int eventCount = 0;

        private void record(Append append) {
            this.lastEventNumber = append.getEventNumber();
            this.eventCount += append.getEventCount();
        }

        private boolean isFree() {
            return this.eventCount == 0;
        }

        private void write(ByteBuf data) throws IOException {
            CommandEncoder.this.pendingWrites.putIfAbsent(this.id, this);
            if (data.readableBytes() > 0) {
                this.pendingBytes += data.readableBytes();
                this.pendingList.add(data);
            }
            this.conditionalFlush();
        }

        private void conditionalFlush() throws IOException {
            if (this.pendingBytes > 0x100000 || this.eventCount > 500) {
                CommandEncoder.this.breakCurrentAppend();
                CommandEncoder.this.flushAllToBuffer();
                CommandEncoder.this.flushBuffer();
            }
        }

        private void flushToBuffer() {
            if (!this.isFree()) {
                CommandEncoder.this.pendingWrites.remove(this.id);
                CommandEncoder.this.writeMessage(new WireCommands.AppendBlock(this.id), this.pendingBytes);
                if (this.pendingBytes > 0) {
                    this.pendingList.forEach(CommandEncoder.this.buffer::writeBytes);
                    this.pendingList.clear();
                }
                this.flushToBuffer(this.pendingBytes, null);
                this.pendingBytes = 0;
            }
        }

        private void flushToBuffer(int sizeOfWholeEvents, ByteBuf data) {
            CommandEncoder.writeMessage(new WireCommands.AppendBlockEnd(this.id, sizeOfWholeEvents, data, this.eventCount, this.lastEventNumber, this.requestId), CommandEncoder.this.buffer);
            this.eventCount = 0;
        }

        @ConstructorProperties(value={"id", "requestId"})
        @SuppressFBWarnings(justification="generated code")
        @Generated
        public Session(UUID id, long requestId) {
            this.id = id;
            this.requestId = requestId;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public UUID getId() {
            return this.id;
        }
    }
}

