package io.pravega.segmentstore.server.host.handler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.auth.AuthHandler;
import io.pravega.auth.TokenException;
import io.pravega.auth.TokenExpiredException;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.segmentstore.contracts.AttributeId;
import io.pravega.segmentstore.contracts.AttributeUpdate;
import io.pravega.segmentstore.contracts.AttributeUpdateCollection;
import io.pravega.segmentstore.contracts.AttributeUpdateType;
import io.pravega.segmentstore.contracts.Attributes;
import io.pravega.segmentstore.contracts.BadAttributeUpdateException;
import io.pravega.segmentstore.contracts.ContainerNotFoundException;
import io.pravega.segmentstore.contracts.StreamSegmentExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentNotExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentSealedException;
import io.pravega.segmentstore.contracts.StreamSegmentStore;
import io.pravega.segmentstore.server.IllegalContainerStateException;
import io.pravega.segmentstore.server.host.delegationtoken.DelegationTokenVerifier;
import io.pravega.segmentstore.server.host.handler.WriterState;
import io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder;
import io.pravega.shared.protocol.netty.Append;
import io.pravega.shared.protocol.netty.ByteBufWrapper;
import io.pravega.shared.protocol.netty.DelegatingRequestProcessor;
import io.pravega.shared.protocol.netty.FailingRequestProcessor;
import io.pravega.shared.protocol.netty.RequestProcessor;
import io.pravega.shared.protocol.netty.WireCommands;
import io.pravega.shared.security.token.JsonWebToken;
import java.time.Duration;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Generated;
import lombok.NonNull;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/segmentstore/server/host/handler/AppendProcessor.class */
public class AppendProcessor extends DelegatingRequestProcessor {
    private static final String EMPTY_STACK_TRACE = "";
    private final StreamSegmentStore store;
    private final TrackedConnection connection;
    private final RequestProcessor nextRequestProcessor;
    private final SegmentStatsRecorder statsRecorder;
    private final DelegationTokenVerifier tokenVerifier;
    private final boolean replyWithStackTraceOnError;
    private final ConcurrentHashMap<Pair<String, UUID>, WriterState> writerStates = new ConcurrentHashMap<>();
    private final ScheduledExecutorService tokenExpiryHandlerExecutor;
    static final Duration TIMEOUT = Duration.ofMinutes(1);
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(AppendProcessor.class));

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    /* loaded from: input_file:io/pravega/segmentstore/server/host/handler/AppendProcessor$AppendProcessorBuilder.class */
    public static class AppendProcessorBuilder {

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private StreamSegmentStore store;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private TrackedConnection connection;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private RequestProcessor nextRequestProcessor;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private SegmentStatsRecorder statsRecorder;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private DelegationTokenVerifier tokenVerifier;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private boolean replyWithStackTraceOnError;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private ScheduledExecutorService tokenExpiryHandlerExecutor;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        AppendProcessorBuilder() {
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public AppendProcessorBuilder store(@NonNull StreamSegmentStore streamSegmentStore) {
            if (streamSegmentStore == null) {
                throw new NullPointerException("store is marked non-null but is null");
            }
            this.store = streamSegmentStore;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public AppendProcessorBuilder connection(@NonNull TrackedConnection trackedConnection) {
            if (trackedConnection == null) {
                throw new NullPointerException("connection is marked non-null but is null");
            }
            this.connection = trackedConnection;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public AppendProcessorBuilder nextRequestProcessor(@NonNull RequestProcessor requestProcessor) {
            if (requestProcessor == null) {
                throw new NullPointerException("nextRequestProcessor is marked non-null but is null");
            }
            this.nextRequestProcessor = requestProcessor;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public AppendProcessorBuilder statsRecorder(@NonNull SegmentStatsRecorder segmentStatsRecorder) {
            if (segmentStatsRecorder == null) {
                throw new NullPointerException("statsRecorder is marked non-null but is null");
            }
            this.statsRecorder = segmentStatsRecorder;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public AppendProcessorBuilder tokenVerifier(DelegationTokenVerifier delegationTokenVerifier) {
            this.tokenVerifier = delegationTokenVerifier;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public AppendProcessorBuilder replyWithStackTraceOnError(boolean z) {
            this.replyWithStackTraceOnError = z;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public AppendProcessorBuilder tokenExpiryHandlerExecutor(ScheduledExecutorService scheduledExecutorService) {
            this.tokenExpiryHandlerExecutor = scheduledExecutorService;
            return this;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public AppendProcessor build() {
            return new AppendProcessor(this.store, this.connection, this.nextRequestProcessor, this.statsRecorder, this.tokenVerifier, this.replyWithStackTraceOnError, this.tokenExpiryHandlerExecutor);
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            return "AppendProcessor.AppendProcessorBuilder(store=" + this.store + ", connection=" + this.connection + ", nextRequestProcessor=" + this.nextRequestProcessor + ", statsRecorder=" + this.statsRecorder + ", tokenVerifier=" + this.tokenVerifier + ", replyWithStackTraceOnError=" + this.replyWithStackTraceOnError + ", tokenExpiryHandlerExecutor=" + this.tokenExpiryHandlerExecutor + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AppendProcessor(@NonNull StreamSegmentStore streamSegmentStore, @NonNull TrackedConnection trackedConnection, @NonNull RequestProcessor requestProcessor, @NonNull SegmentStatsRecorder segmentStatsRecorder, DelegationTokenVerifier delegationTokenVerifier, boolean z, ScheduledExecutorService scheduledExecutorService) {
        if (streamSegmentStore == null) {
            throw new NullPointerException("store is marked non-null but is null");
        }
        if (trackedConnection == null) {
            throw new NullPointerException("connection is marked non-null but is null");
        }
        if (requestProcessor == null) {
            throw new NullPointerException("nextRequestProcessor is marked non-null but is null");
        }
        if (segmentStatsRecorder == null) {
            throw new NullPointerException("statsRecorder is marked non-null but is null");
        }
        this.store = streamSegmentStore;
        this.connection = trackedConnection;
        this.nextRequestProcessor = requestProcessor;
        this.statsRecorder = segmentStatsRecorder;
        this.tokenVerifier = delegationTokenVerifier;
        this.replyWithStackTraceOnError = z;
        this.tokenExpiryHandlerExecutor = scheduledExecutorService;
    }

    @VisibleForTesting
    public static AppendProcessorBuilder defaultBuilder() {
        return builder().nextRequestProcessor(new FailingRequestProcessor()).statsRecorder(SegmentStatsRecorder.noOp()).replyWithStackTraceOnError(false);
    }

    public void hello(WireCommands.Hello hello) {
        log.info("Received hello from connection: {}", this.connection);
        this.connection.send(new WireCommands.Hello(14, 5));
        if (hello.getLowVersion() > 14 || hello.getHighVersion() < 5) {
            log.warn(hello.getRequestId(), "Incompatible wire protocol versions {} from connection {}", new Object[]{hello, this.connection});
            this.connection.close();
        }
    }

    public void keepAlive(WireCommands.KeepAlive keepAlive) {
        log.debug("Received a keepAlive from connection: {}", this.connection);
        this.connection.send(keepAlive);
    }

    public void setupAppend(WireCommands.SetupAppend setupAppend) {
        String segment = setupAppend.getSegment();
        UUID writerId = setupAppend.getWriterId();
        log.info("Setting up appends for writer: {} on segment: {}", writerId, segment);
        if (this.tokenVerifier != null) {
            try {
                setupTokenExpiryTask(setupAppend, this.tokenVerifier.verifyToken(segment, setupAppend.getDelegationToken(), AuthHandler.Permissions.READ_UPDATE));
            } catch (TokenException e) {
                handleException(setupAppend.getWriterId(), setupAppend.getRequestId(), segment, "Update Segment Attribute", e);
                return;
            }
        }
        AttributeId fromUUID = AttributeId.fromUUID(writerId);
        Futures.exceptionallyComposeExpecting(this.store.getAttributes(segment, Collections.singleton(fromUUID), true, TIMEOUT), th -> {
            return th instanceof StreamSegmentSealedException;
        }, () -> {
            return this.store.getAttributes(segment, Collections.singleton(fromUUID), false, TIMEOUT);
        }).whenComplete((map, th2) -> {
            try {
                if (th2 != null) {
                    handleException(writerId, setupAppend.getRequestId(), segment, "setting up append", th2);
                } else {
                    long longValue = ((Long) map.getOrDefault(fromUUID, Long.MIN_VALUE)).longValue();
                    WriterState put = this.writerStates.put(Pair.of(segment, writerId), new WriterState(longValue));
                    if (put != null) {
                        log.info("SetupAppend invoked again for writer {}. Last event number from store is {}. Prev writer state {}", new Object[]{writerId, Long.valueOf(longValue), put});
                    }
                    this.connection.send(new WireCommands.AppendSetup(setupAppend.getRequestId(), segment, writerId, longValue));
                }
            } catch (Throwable th2) {
                handleException(writerId, setupAppend.getRequestId(), segment, "handling setupAppend result", th2);
            }
        });
    }

    @VisibleForTesting
    CompletableFuture<Void> setupTokenExpiryTask(@NonNull WireCommands.SetupAppend setupAppend, @NonNull JsonWebToken jsonWebToken) {
        if (setupAppend == null) {
            throw new NullPointerException("setupAppend is marked non-null but is null");
        }
        if (jsonWebToken == null) {
            throw new NullPointerException("token is marked non-null but is null");
        }
        String segment = setupAppend.getSegment();
        UUID writerId = setupAppend.getWriterId();
        long requestId = setupAppend.getRequestId();
        return jsonWebToken.getExpirationTime() == null ? CompletableFuture.completedFuture(null) : Futures.delayedTask(() -> {
            if (!isSetupAppendCompleted(segment, writerId)) {
                return null;
            }
            log.debug("Closing client connection for writer {} due to token expiry, when processing request {} for segment {}", new Object[]{writerId, Long.valueOf(requestId), segment});
            this.connection.close();
            return null;
        }, jsonWebToken.durationToExpiry(), this.tokenExpiryHandlerExecutor);
    }

    @VisibleForTesting
    boolean isSetupAppendCompleted(String str, UUID uuid) {
        return this.writerStates.containsKey(Pair.of(str, uuid));
    }

    public void append(Append append) {
        long traceEnter = LoggerHelpers.traceEnter(log, "append", new Object[]{append});
        UUID writerId = append.getWriterId();
        WriterState writerState = this.writerStates.get(Pair.of(append.getSegment(), writerId));
        Preconditions.checkState(writerState != null, "Data from unexpected connection: Segment=%s, WriterId=%s.", append.getSegment(), writerId);
        long beginAppend = writerState.beginAppend(append.getEventNumber());
        int readableBytes = append.getData().readableBytes();
        this.connection.adjustOutstandingBytes(readableBytes);
        Timer timer = new Timer();
        storeAppend(append, beginAppend).whenComplete((l, th) -> {
            handleAppendResult(append, l, th, writerState, timer);
            LoggerHelpers.traceLeave(log, "storeAppend", traceEnter, new Object[]{append, th});
        }).whenComplete((l2, th2) -> {
            this.connection.adjustOutstandingBytes(-readableBytes);
            append.getData().release();
        });
    }

    private CompletableFuture<Long> storeAppend(Append append, long j) {
        AttributeUpdateCollection from = AttributeUpdateCollection.from(new AttributeUpdate[]{new AttributeUpdate(AttributeId.fromUUID(append.getWriterId()), AttributeUpdateType.ReplaceIfEquals, append.getEventNumber(), j), new AttributeUpdate(Attributes.EVENT_COUNT, AttributeUpdateType.Accumulate, append.getEventCount())});
        ByteBufWrapper byteBufWrapper = new ByteBufWrapper(append.getData());
        return append.isConditional() ? this.store.append(append.getSegment(), append.getExpectedLength().longValue(), byteBufWrapper, from, TIMEOUT) : this.store.append(append.getSegment(), byteBufWrapper, from, TIMEOUT);
    }

    /* JADX WARN: Removed duplicated region for block: B:23:0x0150  */
    /* JADX WARN: Removed duplicated region for block: B:26:? A[RETURN, SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void handleAppendResult(io.pravega.shared.protocol.netty.Append r13, java.lang.Long r14, java.lang.Throwable r15, io.pravega.segmentstore.server.host.handler.WriterState r16, io.pravega.common.Timer r17) {
        /*
            Method dump skipped, instructions count: 364
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.pravega.segmentstore.server.host.handler.AppendProcessor.handleAppendResult(io.pravega.shared.protocol.netty.Append, java.lang.Long, java.lang.Throwable, io.pravega.segmentstore.server.host.handler.WriterState, io.pravega.common.Timer):void");
    }

    private void executeDelayedErrorHandler(WriterState writerState, String str, UUID uuid) {
        WriterState.DelayedErrorHandler fetchEligibleDelayedErrorHandler = writerState.fetchEligibleDelayedErrorHandler();
        if (fetchEligibleDelayedErrorHandler == null) {
            return;
        }
        synchronized (writerState.getAckLock()) {
            try {
                fetchEligibleDelayedErrorHandler.getHandlersToExecute().forEach((v0) -> {
                    v0.run();
                });
                if (fetchEligibleDelayedErrorHandler.getHandlersRemaining() == 0) {
                    this.writerStates.remove(Pair.of(str, uuid));
                }
            } catch (Throwable th) {
                if (fetchEligibleDelayedErrorHandler.getHandlersRemaining() == 0) {
                    this.writerStates.remove(Pair.of(str, uuid));
                }
                throw th;
            }
        }
    }

    private void handleException(UUID uuid, long j, String str, String str2, Throwable th) {
        handleException(uuid, j, str, -1L, str2, th);
    }

    private void handleException(UUID uuid, long j, String str, long j2, String str2, Throwable th) {
        if (th == null) {
            IllegalStateException illegalStateException = new IllegalStateException("No exception to handle.");
            log.error(j, "Append processor: Error {} on segment = '{}'", new Object[]{str2, str, illegalStateException});
            throw illegalStateException;
        }
        ContainerNotFoundException unwrap = Exceptions.unwrap(th);
        String stackTraceAsString = this.replyWithStackTraceOnError ? Throwables.getStackTraceAsString(unwrap) : EMPTY_STACK_TRACE;
        if (unwrap instanceof StreamSegmentExistsException) {
            log.warn(j, "Segment '{}' already exists and {} cannot perform operation '{}'.", new Object[]{str, uuid, str2});
            this.connection.send(new WireCommands.SegmentAlreadyExists(j, str, stackTraceAsString));
            return;
        }
        if (unwrap instanceof StreamSegmentNotExistsException) {
            log.warn(j, "Segment '{}' does not exist and {} cannot perform operation '{}'.", new Object[]{str, uuid, str2});
            this.connection.send(new WireCommands.NoSuchSegment(j, str, stackTraceAsString, -1L));
            return;
        }
        if (unwrap instanceof StreamSegmentSealedException) {
            log.info("Segment '{}' is sealed and {} cannot perform operation '{}'.", new Object[]{str, uuid, str2});
            this.connection.send(new WireCommands.SegmentIsSealed(j, str, stackTraceAsString, j2));
            return;
        }
        if (unwrap instanceof ContainerNotFoundException) {
            log.warn(j, "Wrong host. Segment '{}' (Container {}) is not owned and {} cannot perform operation '{}'.", new Object[]{str, Integer.valueOf(unwrap.getContainerId()), uuid, str2});
            this.connection.send(new WireCommands.WrongHost(j, str, EMPTY_STACK_TRACE, stackTraceAsString));
            return;
        }
        if (unwrap instanceof BadAttributeUpdateException) {
            log.warn(j, "Bad attribute update by {} on segment {}.", new Object[]{uuid, str, unwrap});
            this.connection.send(new WireCommands.InvalidEventNumber(uuid, j, stackTraceAsString));
            this.connection.close();
            return;
        }
        if (unwrap instanceof TokenExpiredException) {
            log.warn(j, "Token expired for writer {} on segment {}.", new Object[]{uuid, str, unwrap});
            this.connection.close();
            return;
        }
        if (unwrap instanceof TokenException) {
            log.warn(j, "Token check failed or writer {} on segment {}.", new Object[]{uuid, str, unwrap});
            this.connection.send(new WireCommands.AuthTokenCheckFailed(j, stackTraceAsString, WireCommands.AuthTokenCheckFailed.ErrorCode.TOKEN_CHECK_FAILED));
        } else if (unwrap instanceof UnsupportedOperationException) {
            log.warn(j, "Unsupported Operation '{}'.", new Object[]{str2, unwrap});
            this.connection.send(new WireCommands.OperationUnsupported(j, str2, stackTraceAsString));
        } else if (unwrap instanceof CancellationException) {
            log.info("Closing connection '{}' while performing append on Segment '{}' due to {}.", new Object[]{this.connection, str, unwrap.toString()});
            this.connection.close();
        } else {
            logError(str, unwrap);
            this.connection.close();
        }
    }

    private void logError(String str, Throwable th) {
        if (th instanceof IllegalContainerStateException) {
            log.warn("Error (Segment = '{}', Operation = 'append'): {}.", str, th.toString());
        } else {
            log.error("Error (Segment = '{}', Operation = 'append')", str, th);
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public static AppendProcessorBuilder builder() {
        return new AppendProcessorBuilder();
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public RequestProcessor getNextRequestProcessor() {
        return this.nextRequestProcessor;
    }
}
