package io.pravega.segmentstore.server.host.handler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.LinkedListMultimap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.pravega.auth.AuthHandler;
import io.pravega.auth.TokenException;
import io.pravega.auth.TokenExpiredException;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.Timer;
import io.pravega.segmentstore.contracts.AttributeUpdate;
import io.pravega.segmentstore.contracts.AttributeUpdateType;
import io.pravega.segmentstore.contracts.Attributes;
import io.pravega.segmentstore.contracts.BadAttributeUpdateException;
import io.pravega.segmentstore.contracts.ContainerNotFoundException;
import io.pravega.segmentstore.contracts.StreamSegmentExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentNotExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentSealedException;
import io.pravega.segmentstore.contracts.StreamSegmentStore;
import io.pravega.segmentstore.server.IllegalContainerStateException;
import io.pravega.segmentstore.server.host.delegationtoken.DelegationTokenVerifier;
import io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder;
import io.pravega.shared.protocol.netty.Append;
import io.pravega.shared.protocol.netty.ByteBufWrapper;
import io.pravega.shared.protocol.netty.DelegatingRequestProcessor;
import io.pravega.shared.protocol.netty.RequestProcessor;
import io.pravega.shared.protocol.netty.WireCommands;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/segmentstore/server/host/handler/AppendProcessor.class */
public class AppendProcessor extends DelegatingRequestProcessor {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(AppendProcessor.class);
    static final Duration TIMEOUT = Duration.ofMinutes(1);
    private static final int HIGH_WATER_MARK = 1048576;
    private static final int LOW_WATER_MARK = 655360;
    private static final String EMPTY_STACK_TRACE = "";
    private final StreamSegmentStore store;
    private final ServerConnection connection;
    private final AtomicLong pendingBytes;
    private final RequestProcessor nextRequestProcessor;
    private final Object lock;
    private final SegmentStatsRecorder statsRecorder;
    private final DelegationTokenVerifier tokenVerifier;
    private final boolean replyWithStackTraceOnError;

    @GuardedBy("lock")
    private final LinkedListMultimap<UUID, Append> waitingAppends;

    @GuardedBy("lock")
    private final HashMap<Pair<String, UUID>, Long> latestEventNumbers;

    @GuardedBy("lock")
    private Append outstandingAppend;

    @VisibleForTesting
    public AppendProcessor(StreamSegmentStore streamSegmentStore, ServerConnection serverConnection, RequestProcessor requestProcessor, DelegationTokenVerifier delegationTokenVerifier) {
        this(streamSegmentStore, serverConnection, requestProcessor, SegmentStatsRecorder.noOp(), delegationTokenVerifier, false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AppendProcessor(StreamSegmentStore streamSegmentStore, ServerConnection serverConnection, RequestProcessor requestProcessor, SegmentStatsRecorder segmentStatsRecorder, DelegationTokenVerifier delegationTokenVerifier, boolean z) {
        this.pendingBytes = new AtomicLong(0L);
        this.lock = new Object();
        this.waitingAppends = LinkedListMultimap.create(2);
        this.latestEventNumbers = new HashMap<>();
        this.outstandingAppend = null;
        this.store = (StreamSegmentStore) Preconditions.checkNotNull(streamSegmentStore, "store");
        this.connection = (ServerConnection) Preconditions.checkNotNull(serverConnection, "connection");
        this.nextRequestProcessor = (RequestProcessor) Preconditions.checkNotNull(requestProcessor, "next");
        this.statsRecorder = (SegmentStatsRecorder) Preconditions.checkNotNull(segmentStatsRecorder, segmentStatsRecorder);
        this.tokenVerifier = delegationTokenVerifier;
        this.replyWithStackTraceOnError = z;
    }

    public void hello(WireCommands.Hello hello) {
        log.info("Received hello from connection: {}", this.connection);
        this.connection.send(new WireCommands.Hello(9, 5));
        if (hello.getLowVersion() > 9 || hello.getHighVersion() < 5) {
            log.warn("Incompatible wire protocol versions {} from connection {}", hello, this.connection);
            this.connection.close();
        }
    }

    public void setupAppend(WireCommands.SetupAppend setupAppend) {
        String segment = setupAppend.getSegment();
        UUID writerId = setupAppend.getWriterId();
        log.info("Setting up appends for writer: {} on segment: {}", writerId, segment);
        if (this.tokenVerifier != null) {
            try {
                this.tokenVerifier.verifyToken(segment, setupAppend.getDelegationToken(), AuthHandler.Permissions.READ_UPDATE);
            } catch (TokenException e) {
                handleException(setupAppend.getWriterId(), setupAppend.getRequestId(), segment, "Update Segment Attribute", e);
                return;
            }
        }
        this.store.getAttributes(segment, Collections.singleton(writerId), true, TIMEOUT).whenComplete((map, th) -> {
            try {
                if (th != null) {
                    handleException(writerId, setupAppend.getRequestId(), segment, "setting up append", th);
                } else {
                    long longValue = ((Long) map.getOrDefault(writerId, Long.MIN_VALUE)).longValue();
                    synchronized (this.lock) {
                        this.latestEventNumbers.putIfAbsent(Pair.of(segment, writerId), Long.valueOf(longValue));
                    }
                    this.connection.send(new WireCommands.AppendSetup(setupAppend.getRequestId(), segment, writerId, longValue));
                }
            } catch (Throwable th) {
                handleException(writerId, setupAppend.getRequestId(), segment, "handling setupAppend result", th);
            }
        });
    }

    private void performNextWrite() {
        Append nextAppend = getNextAppend();
        if (nextAppend == null) {
            return;
        }
        long traceEnter = LoggerHelpers.traceEnter(log, "storeAppend", new Object[]{nextAppend});
        Timer timer = new Timer();
        storeAppend(nextAppend).whenComplete((l, th) -> {
            handleAppendResult(nextAppend, l, th, timer);
            LoggerHelpers.traceLeave(log, "storeAppend", traceEnter, new Object[]{nextAppend, th});
        }).whenComplete((l2, th2) -> {
            nextAppend.getData().release();
        });
    }

    private Append getNextAppend() {
        synchronized (this.lock) {
            if (this.outstandingAppend != null || this.waitingAppends.isEmpty()) {
                return null;
            }
            UUID uuid = (UUID) this.waitingAppends.keys().iterator().next();
            List list = this.waitingAppends.get(uuid);
            if (((Append) list.get(0)).isConditional()) {
                this.outstandingAppend = (Append) list.remove(0);
            } else {
                ByteBuf[] byteBufArr = new ByteBuf[list.size()];
                Append append = (Append) list.get(0);
                int i = 0;
                int i2 = -1;
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    Append append2 = (Append) it.next();
                    if (append2.isConditional()) {
                        break;
                    }
                    i2++;
                    byteBufArr[i2] = append2.getData();
                    append = append2;
                    i += append2.getEventCount();
                    it.remove();
                }
                this.outstandingAppend = new Append(append.getSegment(), uuid, append.getEventNumber(), i, Unpooled.wrappedBuffer(byteBufArr), (Long) null, append.getRequestId());
            }
            setPendingBytes(getPendingBytes() - this.outstandingAppend.getData().readableBytes());
            return this.outstandingAppend;
        }
    }

    private CompletableFuture<Long> storeAppend(Append append) {
        long longValue;
        synchronized (this.lock) {
            longValue = this.latestEventNumbers.get(Pair.of(append.getSegment(), append.getWriterId())).longValue();
        }
        List asList = Arrays.asList(new AttributeUpdate(append.getWriterId(), AttributeUpdateType.ReplaceIfEquals, append.getEventNumber(), longValue), new AttributeUpdate(Attributes.EVENT_COUNT, AttributeUpdateType.Accumulate, append.getEventCount()));
        ByteBufWrapper byteBufWrapper = new ByteBufWrapper(append.getData());
        return append.isConditional() ? this.store.append(append.getSegment(), append.getExpectedLength().longValue(), byteBufWrapper, asList, TIMEOUT) : this.store.append(append.getSegment(), byteBufWrapper, asList, TIMEOUT);
    }

    /* JADX WARN: Removed duplicated region for block: B:10:0x0029 A[EXC_TOP_SPLITTER, SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void handleAppendResult(io.pravega.shared.protocol.netty.Append r13, java.lang.Long r14, java.lang.Throwable r15, io.pravega.common.Timer r16) {
        /*
            Method dump skipped, instructions count: 463
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.pravega.segmentstore.server.host.handler.AppendProcessor.handleAppendResult(io.pravega.shared.protocol.netty.Append, java.lang.Long, java.lang.Throwable, io.pravega.common.Timer):void");
    }

    private void handleException(UUID uuid, long j, String str, String str2, Throwable th) {
        handleException(uuid, j, str, -1L, str2, th);
    }

    private void handleException(UUID uuid, long j, String str, long j2, String str2, Throwable th) {
        if (th == null) {
            IllegalStateException illegalStateException = new IllegalStateException("No exception to handle.");
            log.error("Append processor: Error {} on segment = '{}'", new Object[]{str2, str, illegalStateException});
            throw illegalStateException;
        }
        ContainerNotFoundException unwrap = Exceptions.unwrap(th);
        String stackTraceAsString = this.replyWithStackTraceOnError ? Throwables.getStackTraceAsString(unwrap) : EMPTY_STACK_TRACE;
        if (unwrap instanceof StreamSegmentExistsException) {
            log.warn("Segment '{}' already exists and {} cannot perform operation '{}'.", new Object[]{str, uuid, str2});
            this.connection.send(new WireCommands.SegmentAlreadyExists(j, str, stackTraceAsString));
            return;
        }
        if (unwrap instanceof StreamSegmentNotExistsException) {
            log.warn("Segment '{}' does not exist and {} cannot perform operation '{}'.", new Object[]{str, uuid, str2});
            this.connection.send(new WireCommands.NoSuchSegment(j, str, stackTraceAsString, -1L));
            return;
        }
        if (unwrap instanceof StreamSegmentSealedException) {
            log.info("Segment '{}' is sealed and {} cannot perform operation '{}'.", new Object[]{str, uuid, str2});
            this.connection.send(new WireCommands.SegmentIsSealed(j, str, stackTraceAsString, j2));
            return;
        }
        if (unwrap instanceof ContainerNotFoundException) {
            log.warn("Wrong host. Segment '{}' (Container {}) is not owned and {} cannot perform operation '{}'.", new Object[]{str, Integer.valueOf(unwrap.getContainerId()), uuid, str2});
            this.connection.send(new WireCommands.WrongHost(j, str, EMPTY_STACK_TRACE, stackTraceAsString));
            return;
        }
        if (unwrap instanceof BadAttributeUpdateException) {
            log.warn("Bad attribute update by {} on segment {}.", new Object[]{uuid, str, unwrap});
            this.connection.send(new WireCommands.InvalidEventNumber(uuid, j, stackTraceAsString));
            this.connection.close();
            return;
        }
        if (unwrap instanceof TokenExpiredException) {
            log.warn("Token expired for writer {} on segment {}.", new Object[]{uuid, str, unwrap});
            this.connection.send(new WireCommands.AuthTokenCheckFailed(j, stackTraceAsString, WireCommands.AuthTokenCheckFailed.ErrorCode.TOKEN_EXPIRED));
            return;
        }
        if (unwrap instanceof TokenException) {
            log.warn("Token check failed or writer {} on segment {}.", new Object[]{uuid, str, unwrap});
            this.connection.send(new WireCommands.AuthTokenCheckFailed(j, stackTraceAsString, WireCommands.AuthTokenCheckFailed.ErrorCode.TOKEN_CHECK_FAILED));
        } else if (unwrap instanceof UnsupportedOperationException) {
            log.warn("Unsupported Operation '{}'.", str2, unwrap);
            this.connection.send(new WireCommands.OperationUnsupported(j, str2, stackTraceAsString));
        } else if (unwrap instanceof CancellationException) {
            log.info("Closing connection '{}' while performing append on Segment '{}' due to {}.", new Object[]{this.connection, str, unwrap.toString()});
            this.connection.close();
        } else {
            logError(str, unwrap);
            this.connection.close();
        }
    }

    private void logError(String str, Throwable th) {
        if (th instanceof IllegalContainerStateException) {
            log.warn("Error (Segment = '{}', Operation = 'append'): {}.", str, th.toString());
        } else {
            log.error("Error (Segment = '{}', Operation = 'append')", str, th);
        }
    }

    private void pauseOrResumeReading() {
        long pendingBytes = getPendingBytes();
        if (pendingBytes > 1048576) {
            log.debug("Pausing writing from connection {}", this.connection);
            this.connection.pauseReading();
        }
        if (pendingBytes < 655360) {
            log.trace("Resuming writing from connection {}", this.connection);
            this.connection.resumeReading();
        }
    }

    protected long getPendingBytes() {
        return this.pendingBytes.get();
    }

    protected void setPendingBytes(long j) {
        this.pendingBytes.set(Math.max(j, 0L));
    }

    public void append(Append append) {
        log.trace("Processing append received from client {}", append);
        UUID writerId = append.getWriterId();
        synchronized (this.lock) {
            Long l = this.latestEventNumbers.get(Pair.of(append.getSegment(), writerId));
            Preconditions.checkState(l != null, "Data from unexpected connection: %s.", writerId);
            Preconditions.checkState(append.getEventNumber() >= l.longValue(), "Event was already appended.");
            this.waitingAppends.put(writerId, append);
        }
        setPendingBytes(getPendingBytes() + append.getData().readableBytes());
        pauseOrResumeReading();
        performNextWrite();
    }

    @SuppressFBWarnings(justification = "generated code")
    public RequestProcessor getNextRequestProcessor() {
        return this.nextRequestProcessor;
    }
}
