package io.pravega.client.stream.impl;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.pravega.auth.AuthenticationException;
import io.pravega.auth.TokenExpiredException;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.connection.impl.RawClient;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.segment.impl.NoSuchSegmentException;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentSealedException;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.Retry;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.Reply;
import io.pravega.shared.protocol.netty.WireCommandType;
import io.pravega.shared.protocol.netty.WireCommands;
import java.beans.ConstructorProperties;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/stream/impl/LargeEventWriter.class */
public class LargeEventWriter {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(LargeEventWriter.class);
    private static final int WRITE_SIZE = 8388608;

    @Nonnull
    private final UUID writerId;

    @Nonnull
    private final Controller controller;

    @Nonnull
    private final ConnectionPool connectionPool;

    public void writeLargeEvent(Segment segment, List<ByteBuffer> list, DelegationTokenProvider delegationTokenProvider, EventWriterConfig eventWriterConfig) throws NoSuchSegmentException, AuthenticationException, SegmentSealedException {
        List<ByteBuf> createBufs = createBufs(list);
        Retry.withExpBackoff(eventWriterConfig.getInitialBackoffMillis(), eventWriterConfig.getBackoffMultiple(), 1 + Math.max(0, eventWriterConfig.getRetryAttempts()), eventWriterConfig.getMaxBackoffMillis()).retryWhen(th -> {
            Throwable unwrap = Exceptions.unwrap(th);
            if (unwrap instanceof ConnectionFailedException) {
                log.info("Connection failure while sending large event: {}. Retrying", unwrap.getMessage());
                return true;
            }
            if (!(unwrap instanceof TokenExpiredException)) {
                return false;
            }
            delegationTokenProvider.signalTokenExpired();
            log.info("Authentication token expired while writing large event to segment {}. Retrying", segment);
            return true;
        }).run(() -> {
            RawClient rawClient = new RawClient(this.controller, this.connectionPool, segment);
            try {
                write(segment, createBufs, rawClient, delegationTokenProvider);
                if (Collections.singletonList(rawClient).get(0) != null) {
                    rawClient.close();
                }
                return null;
            } catch (Throwable th2) {
                if (Collections.singletonList(rawClient).get(0) != null) {
                    rawClient.close();
                }
                throw th2;
            }
        });
    }

    private List<ByteBuf> createBufs(List<ByteBuffer> list) {
        ByteBuffer[] byteBufferArr = new ByteBuffer[2 * list.size()];
        for (int i = 0; i < list.size(); i++) {
            ByteBuffer byteBuffer = list.get(i);
            ByteBuffer wrap = ByteBuffer.wrap(new byte[8]);
            wrap.putInt(WireCommandType.EVENT.getCode());
            wrap.putInt(byteBuffer.remaining());
            wrap.flip();
            byteBufferArr[2 * i] = wrap;
            byteBufferArr[(2 * i) + 1] = byteBuffer;
        }
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(byteBufferArr);
        ArrayList arrayList = new ArrayList();
        while (wrappedBuffer.isReadable()) {
            arrayList.add(wrappedBuffer.readSlice(Math.min(wrappedBuffer.readableBytes(), 8388608)));
        }
        return arrayList;
    }

    private void write(Segment segment, List<ByteBuf> list, RawClient rawClient, DelegationTokenProvider delegationTokenProvider) throws TokenExpiredException, NoSuchSegmentException, AuthenticationException, SegmentSealedException, ConnectionFailedException {
        long nextSequenceNumber = rawClient.getFlow().getNextSequenceNumber();
        log.debug("Writing large event to segment {} with writer id {}", segment, this.writerId);
        String str = (String) Futures.getThrowingException(delegationTokenProvider.retrieveToken());
        WireCommands.SegmentCreated transformSegmentCreated = transformSegmentCreated((Reply) Futures.getThrowingException(rawClient.sendRequest(nextSequenceNumber, new WireCommands.CreateTransientSegment(nextSequenceNumber, this.writerId, segment.getScopedName(), str))), segment.getScopedName());
        long nextSequenceNumber2 = rawClient.getFlow().getNextSequenceNumber();
        if (transformAppendSetup((Reply) Futures.getThrowingException(rawClient.sendRequest(nextSequenceNumber2, new WireCommands.SetupAppend(nextSequenceNumber2, this.writerId, transformSegmentCreated.getSegment(), str))), transformSegmentCreated.getSegment()).getLastEventNumber() != Long.MIN_VALUE) {
            throw new IllegalStateException("Server indicates that transient segment was already written to: " + transformSegmentCreated.getSegment());
        }
        long j = 0;
        ArrayList<CompletableFuture<Reply>> arrayList = new ArrayList<>();
        for (int i = 0; i < list.size(); i++) {
            long nextSequenceNumber3 = rawClient.getFlow().getNextSequenceNumber();
            WireCommands.ConditionalBlockEnd conditionalBlockEnd = new WireCommands.ConditionalBlockEnd(this.writerId, i, j, Unpooled.wrappedBuffer(list.get(i)), nextSequenceNumber3);
            j += r0.readableBytes();
            CompletableFuture<Reply> sendRequest = rawClient.sendRequest(nextSequenceNumber3, conditionalBlockEnd);
            failFast(arrayList, transformSegmentCreated.getSegment());
            arrayList.add(sendRequest);
        }
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            transformDataAppended((Reply) Futures.getThrowingException(arrayList.get(i2)), transformSegmentCreated.getSegment());
        }
        long nextSequenceNumber4 = rawClient.getFlow().getNextSequenceNumber();
        transformSegmentMerged((Reply) Futures.getThrowingException(rawClient.sendRequest(nextSequenceNumber4, new WireCommands.MergeSegments(nextSequenceNumber4, segment.getScopedName(), transformSegmentCreated.getSegment(), str))), transformSegmentCreated.getSegment());
    }

    private void failFast(ArrayList<CompletableFuture<Reply>> arrayList, String str) throws TokenExpiredException, NoSuchSegmentException, AuthenticationException, SegmentSealedException, ConnectionFailedException {
        Iterator<CompletableFuture<Reply>> it = arrayList.iterator();
        while (it.hasNext()) {
            CompletableFuture<Reply> next = it.next();
            if (!next.isDone()) {
                return;
            } else {
                transformDataAppended((Reply) Futures.getThrowingException(next), str);
            }
        }
    }

    private WireCommands.SegmentCreated transformSegmentCreated(Reply reply, String str) throws TokenExpiredException, NoSuchSegmentException, AuthenticationException, SegmentSealedException, ConnectionFailedException {
        if (reply instanceof WireCommands.SegmentCreated) {
            return (WireCommands.SegmentCreated) reply;
        }
        throw handleUnexpectedReply(reply, "SegmentCreated", str);
    }

    private WireCommands.AppendSetup transformAppendSetup(Reply reply, String str) throws TokenExpiredException, NoSuchSegmentException, AuthenticationException, SegmentSealedException, ConnectionFailedException {
        if (reply instanceof WireCommands.AppendSetup) {
            return (WireCommands.AppendSetup) reply;
        }
        throw handleUnexpectedReply(reply, "AppendSetup", str);
    }

    private Void transformDataAppended(Reply reply, String str) throws TokenExpiredException, NoSuchSegmentException, AuthenticationException, SegmentSealedException, ConnectionFailedException {
        if (reply instanceof WireCommands.DataAppended) {
            return null;
        }
        throw handleUnexpectedReply(reply, "DataAppended", str);
    }

    private WireCommands.SegmentsMerged transformSegmentMerged(Reply reply, String str) throws TokenExpiredException, NoSuchSegmentException, AuthenticationException, SegmentSealedException, ConnectionFailedException {
        if (reply instanceof WireCommands.SegmentsMerged) {
            return (WireCommands.SegmentsMerged) reply;
        }
        throw handleUnexpectedReply(reply, "MergeSegments", str);
    }

    @VisibleForTesting
    RuntimeException handleUnexpectedReply(Reply reply, String str, String str2) throws NoSuchSegmentException, SegmentSealedException, TokenExpiredException, AuthenticationException, ConnectionFailedException {
        log.warn("Unexpected reply {} observed instead of {} for conditional writer {} for {}", new Object[]{reply, str, this.writerId, str2});
        if (reply instanceof WireCommands.NoSuchSegment) {
            throw new NoSuchSegmentException(reply.toString());
        }
        if (reply instanceof WireCommands.SegmentIsSealed) {
            throw new SegmentSealedException(reply.toString());
        }
        if (reply instanceof WireCommands.WrongHost) {
            throw new ConnectionFailedException(reply.toString());
        }
        if (reply instanceof WireCommands.AuthTokenCheckFailed) {
            WireCommands.AuthTokenCheckFailed authTokenCheckFailed = (WireCommands.AuthTokenCheckFailed) reply;
            if (authTokenCheckFailed.isTokenExpired()) {
                throw new TokenExpiredException(authTokenCheckFailed.getServerStackTrace());
            }
            throw new AuthenticationException(authTokenCheckFailed.toString());
        }
        if (reply instanceof WireCommands.OperationUnsupported) {
            throw new UnsupportedOperationException("Attempted to write a large append on segment " + str2 + " to a server version which only supports appends < 8 MB.");
        }
        if (!((reply instanceof WireCommands.ConditionalCheckFailed) | (reply instanceof WireCommands.InvalidEventNumber) | (reply instanceof WireCommands.SegmentIsTruncated)) && !(reply instanceof WireCommands.SegmentAlreadyExists)) {
            throw new ConnectionFailedException("Unexpected reply of " + reply + " when expecting an " + str);
        }
        log.error("Failure: " + reply + " while appending to transient segment: " + str2 + ". This indicates a bug, but the request will be retried.");
        throw new ConnectionFailedException("Server state error");
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    @ConstructorProperties({"writerId", "controller", "connectionPool"})
    public LargeEventWriter(@Nonnull UUID uuid, @Nonnull Controller controller, @Nonnull ConnectionPool connectionPool) {
        if (uuid == null) {
            throw new NullPointerException("writerId is marked non-null but is null");
        }
        if (controller == null) {
            throw new NullPointerException("controller is marked non-null but is null");
        }
        if (connectionPool == null) {
            throw new NullPointerException("connectionPool is marked non-null but is null");
        }
        this.writerId = uuid;
        this.controller = controller;
        this.connectionPool = connectionPool;
    }
}
