package io.pravega.connectors.flink;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TransactionalEventStreamWriter;
import io.pravega.client.stream.TxnFailedException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter.class */
public class FlinkPravegaWriter<T> extends TwoPhaseCommitSinkFunction<T, PravegaTransactionState, Void> {
    private static final Logger log = LoggerFactory.getLogger(FlinkPravegaWriter.class);
    private static final long serialVersionUID = 1;
    private static final String PRAVEGA_WRITER_METRICS_GROUP = "PravegaWriter";
    private static final String SCOPED_STREAM_METRICS_GAUGE = "stream";

    @VisibleForTesting
    volatile AtomicReference<Throwable> writeError;

    @VisibleForTesting
    AtomicLong pendingWritesCount;
    private transient ExecutorService executorService;
    private long currentWatermark;
    private final boolean enableMetrics;
    private final ClientConfig clientConfig;
    private final SerializationSchema<T> serializationSchema;
    private final PravegaEventRouter<T> eventRouter;

    @SuppressFBWarnings({"SE_BAD_FIELD"})
    private final Stream stream;
    private final long txnLeaseRenewalPeriod;
    private final PravegaWriterMode writerMode;
    private final boolean enableWatermark;
    private final String writerIdPrefix;
    private transient EventStreamClientFactory clientFactory;
    private transient EventStreamWriter<T> writer;
    private transient TransactionalEventStreamWriter<T> transactionalWriter;

    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$Builder.class */
    public static class Builder<T> extends AbstractStreamingWriterBuilder<T, Builder<T>> {
        private SerializationSchema<T> serializationSchema;
        private PravegaEventRouter<T> eventRouter;

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.pravega.connectors.flink.AbstractWriterBuilder
        public Builder<T> builder() {
            return this;
        }

        public Builder<T> withSerializationSchema(SerializationSchema<T> serializationSchema) {
            this.serializationSchema = serializationSchema;
            return builder();
        }

        public Builder<T> withEventRouter(PravegaEventRouter<T> pravegaEventRouter) {
            this.eventRouter = pravegaEventRouter;
            return builder();
        }

        public FlinkPravegaWriter<T> build() {
            Preconditions.checkState(this.serializationSchema != null, "Serialization schema must be supplied.");
            return createSinkFunction(this.serializationSchema, this.eventRouter);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$FlinkSerializer.class */
    public static final class FlinkSerializer<T> implements Serializer<T> {
        private final SerializationSchema<T> serializationSchema;

        /* JADX INFO: Access modifiers changed from: package-private */
        public FlinkSerializer(SerializationSchema<T> serializationSchema) {
            this.serializationSchema = serializationSchema;
        }

        @Override // io.pravega.client.stream.Serializer
        public ByteBuffer serialize(T t) {
            return ByteBuffer.wrap(this.serializationSchema.serialize(t));
        }

        @Override // io.pravega.client.stream.Serializer
        public T deserialize(ByteBuffer byteBuffer) {
            throw new IllegalStateException("deserialize() called within a serializer");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$PravegaTransactionState.class */
    public static class PravegaTransactionState {
        private transient Transaction transaction;
        private String transactionId;
        private Long watermark;

        PravegaTransactionState() {
            this(null);
        }

        PravegaTransactionState(Transaction transaction) {
            this(transaction, (Long) null);
        }

        PravegaTransactionState(Transaction transaction, Long l) {
            this.transaction = transaction;
            if (transaction != null) {
                this.transactionId = transaction.getTxnId().toString();
            }
            this.watermark = l;
        }

        PravegaTransactionState(String str, Long l) {
            this.transactionId = str;
            this.watermark = l;
        }

        Transaction getTransaction() {
            return this.transaction;
        }

        public String toString() {
            return String.format("%s [transactionId=%s, watermark=%s]", getClass().getSimpleName(), this.transactionId, this.watermark);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            PravegaTransactionState pravegaTransactionState = (PravegaTransactionState) obj;
            return Objects.equals(this.transactionId, pravegaTransactionState.transactionId) && Objects.equals(this.watermark, pravegaTransactionState.watermark);
        }

        public int hashCode() {
            return Objects.hash(this.transactionId, this.watermark);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$StreamNameGauge.class */
    public static class StreamNameGauge implements Gauge<String> {
        final String stream;

        public StreamNameGauge(String str) {
            this.stream = str;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public String m129getValue() {
            return this.stream;
        }
    }

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$TransactionStateSerializer.class */
    public static class TransactionStateSerializer extends TypeSerializerSingleton<PravegaTransactionState> {
        private static final long serialVersionUID = 1;

        /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$TransactionStateSerializer$TransactionStateSerializerSnapshot.class */
        public static final class TransactionStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<PravegaTransactionState> {
            public TransactionStateSerializerSnapshot() {
                super(TransactionStateSerializer::new);
            }
        }

        public boolean isImmutableType() {
            return true;
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public PravegaTransactionState m131createInstance() {
            return null;
        }

        public PravegaTransactionState copy(PravegaTransactionState pravegaTransactionState) {
            return pravegaTransactionState;
        }

        public PravegaTransactionState copy(PravegaTransactionState pravegaTransactionState, PravegaTransactionState pravegaTransactionState2) {
            return pravegaTransactionState;
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
            boolean readBoolean = dataInputView.readBoolean();
            dataOutputView.writeBoolean(readBoolean);
            if (readBoolean) {
                dataOutputView.writeUTF(dataInputView.readUTF());
            }
            boolean readBoolean2 = dataInputView.readBoolean();
            dataOutputView.writeBoolean(readBoolean2);
            if (readBoolean2) {
                dataOutputView.writeLong(dataInputView.readLong());
            }
        }

        public int getLength() {
            return -1;
        }

        public void serialize(PravegaTransactionState pravegaTransactionState, DataOutputView dataOutputView) throws IOException {
            if (pravegaTransactionState.transactionId == null) {
                dataOutputView.writeBoolean(false);
            } else {
                dataOutputView.writeBoolean(true);
                dataOutputView.writeUTF(pravegaTransactionState.transactionId);
            }
            if (pravegaTransactionState.watermark == null) {
                dataOutputView.writeBoolean(false);
            } else {
                dataOutputView.writeBoolean(true);
                dataOutputView.writeLong(pravegaTransactionState.watermark.longValue());
            }
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public PravegaTransactionState m130deserialize(DataInputView dataInputView) throws IOException {
            String str = null;
            if (dataInputView.readBoolean()) {
                str = dataInputView.readUTF();
            }
            Long l = null;
            if (dataInputView.readBoolean()) {
                l = Long.valueOf(dataInputView.readLong());
            }
            return new PravegaTransactionState(str, l);
        }

        public PravegaTransactionState deserialize(PravegaTransactionState pravegaTransactionState, DataInputView dataInputView) throws IOException {
            return m130deserialize(dataInputView);
        }

        public TypeSerializerSnapshot<PravegaTransactionState> snapshotConfiguration() {
            return new TransactionStateSerializerSnapshot();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkPravegaWriter(ClientConfig clientConfig, Stream stream, SerializationSchema<T> serializationSchema, PravegaEventRouter<T> pravegaEventRouter, PravegaWriterMode pravegaWriterMode, long j, boolean z, boolean z2) {
        super(new TransactionStateSerializer(), VoidSerializer.INSTANCE);
        this.writeError = new AtomicReference<>(null);
        this.pendingWritesCount = new AtomicLong();
        this.currentWatermark = Long.MIN_VALUE;
        this.clientFactory = null;
        this.writer = null;
        this.transactionalWriter = null;
        this.clientConfig = (ClientConfig) Preconditions.checkNotNull(clientConfig, "clientConfig");
        this.stream = (Stream) Preconditions.checkNotNull(stream, "stream");
        this.serializationSchema = (SerializationSchema) Preconditions.checkNotNull(serializationSchema, "serializationSchema");
        this.eventRouter = pravegaEventRouter;
        this.writerMode = (PravegaWriterMode) Preconditions.checkNotNull(pravegaWriterMode, "writerMode");
        Preconditions.checkArgument(j > 0, "txnLeaseRenewalPeriod must be > 0");
        this.txnLeaseRenewalPeriod = j;
        this.enableWatermark = z;
        this.enableMetrics = z2;
        this.writerIdPrefix = UUID.randomUUID().toString();
        if (pravegaWriterMode == PravegaWriterMode.EXACTLY_ONCE) {
            super.setTransactionTimeout(j);
            super.enableTransactionTimeoutWarnings(0.8d);
        }
    }

    public PravegaEventRouter<T> getEventRouter() {
        return this.eventRouter;
    }

    PravegaWriterMode getPravegaWriterMode() {
        return this.writerMode;
    }

    boolean getEnableWatermark() {
        return this.enableWatermark;
    }

    public void open(Configuration configuration) throws Exception {
        initializeInternalWriter();
        log.info("Initialized Pravega writer {} for stream: {} with controller URI: {}", new Object[]{writerId(), this.stream, this.clientConfig.getControllerURI()});
        if (this.enableMetrics) {
            registerMetrics();
        }
    }

    protected void invoke(PravegaTransactionState pravegaTransactionState, T t, SinkFunction.Context context) throws Exception {
        checkWriteError();
        switch (this.writerMode) {
            case EXACTLY_ONCE:
                if (this.eventRouter != null) {
                    pravegaTransactionState.getTransaction().writeEvent(this.eventRouter.getRoutingKey(t), t);
                } else {
                    pravegaTransactionState.getTransaction().writeEvent(t);
                }
                if (this.enableWatermark) {
                    pravegaTransactionState.watermark = Long.valueOf(context.currentWatermark());
                    return;
                }
                return;
            case ATLEAST_ONCE:
            case BEST_EFFORT:
                this.pendingWritesCount.incrementAndGet();
                CompletableFuture<Void> writeEvent = this.eventRouter != null ? this.writer.writeEvent(this.eventRouter.getRoutingKey(t), t) : this.writer.writeEvent(t);
                if (this.enableWatermark && shouldEmitWatermark(this.currentWatermark, context)) {
                    this.writer.noteTime(context.currentWatermark());
                    this.currentWatermark = context.currentWatermark();
                }
                writeEvent.whenCompleteAsync((r5, th) -> {
                    if (th != null) {
                        log.warn("Detected a write failure", th);
                        this.writeError.compareAndSet(null, th);
                    }
                    synchronized (this) {
                        this.pendingWritesCount.decrementAndGet();
                        notify();
                    }
                }, (Executor) this.executorService);
                return;
            default:
                throw new UnsupportedOperationException("Not implemented writer mode");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: beginTransaction, reason: merged with bridge method [inline-methods] */
    public PravegaTransactionState m127beginTransaction() throws Exception {
        initializeInternalWriter();
        switch (this.writerMode) {
            case EXACTLY_ONCE:
                return new PravegaTransactionState(this.transactionalWriter.beginTxn());
            case ATLEAST_ONCE:
            case BEST_EFFORT:
                return new PravegaTransactionState();
            default:
                throw new UnsupportedOperationException("Not implemented writer mode");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void preCommit(PravegaTransactionState pravegaTransactionState) throws Exception {
        switch (this.writerMode) {
            case EXACTLY_ONCE:
                pravegaTransactionState.getTransaction().flush();
                return;
            case ATLEAST_ONCE:
                flushAndVerify();
                return;
            case BEST_EFFORT:
                return;
            default:
                throw new UnsupportedOperationException("Not implemented writer mode");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commit(PravegaTransactionState pravegaTransactionState) {
        switch (this.writerMode) {
            case EXACTLY_ONCE:
                Transaction<T> transaction = pravegaTransactionState.getTransaction() != null ? pravegaTransactionState.getTransaction() : this.transactionalWriter.getTxn(UUID.fromString(pravegaTransactionState.transactionId));
                try {
                    Transaction.Status checkStatus = transaction.checkStatus();
                    if (checkStatus != Transaction.Status.OPEN) {
                        log.warn("{} - Transaction {} has unexpected transaction status {} while committing", new Object[]{writerId(), transaction.getTxnId(), checkStatus});
                    } else if (!this.enableWatermark || pravegaTransactionState.watermark == null) {
                        transaction.commit();
                    } else {
                        transaction.commit(pravegaTransactionState.watermark.longValue());
                    }
                    return;
                } catch (TxnFailedException e) {
                    log.error("{} - Transaction {} commit failed.", writerId(), transaction.getTxnId());
                    return;
                } catch (RuntimeException e2) {
                    if (e2.getMessage().contains("Unknown transaction")) {
                        log.error("{} - Transaction {} not found.", writerId(), transaction.getTxnId());
                        return;
                    }
                    return;
                }
            case ATLEAST_ONCE:
            case BEST_EFFORT:
                return;
            default:
                throw new UnsupportedOperationException("Not implemented writer mode");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void recoverAndCommit(PravegaTransactionState pravegaTransactionState) {
        initializeInternalWriter();
        commit(pravegaTransactionState);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void abort(PravegaTransactionState pravegaTransactionState) {
        switch (this.writerMode) {
            case EXACTLY_ONCE:
                (pravegaTransactionState.getTransaction() != null ? pravegaTransactionState.getTransaction() : this.transactionalWriter.getTxn(UUID.fromString(pravegaTransactionState.transactionId))).abort();
                return;
            case ATLEAST_ONCE:
            case BEST_EFFORT:
                return;
            default:
                throw new UnsupportedOperationException("Not implemented writer mode");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void recoverAndAbort(PravegaTransactionState pravegaTransactionState) {
        initializeInternalWriter();
        abort(pravegaTransactionState);
    }

    public void close() throws Exception {
        Exception exc = null;
        try {
            super.close();
        } catch (Exception e) {
            exc = e;
        }
        if (this.writer != null) {
            try {
                flushAndVerify();
            } catch (Exception e2) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
            }
            try {
                this.writer.close();
            } catch (Exception e3) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
            }
            try {
                this.executorService.shutdown();
            } catch (Exception e4) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e4, exc);
            }
        }
        if (this.transactionalWriter != null) {
            try {
                this.transactionalWriter.close();
            } catch (Exception e5) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e5, exc);
            }
        }
        if (this.clientFactory != null) {
            try {
                this.clientFactory.close();
            } catch (Exception e6) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e6, exc);
            }
        }
        if (exc != null) {
            throw exc;
        }
    }

    private void registerMetrics() {
        getRuntimeContext().getMetricGroup().addGroup(PRAVEGA_WRITER_METRICS_GROUP).gauge("stream", new StreamNameGauge(this.stream.getScopedName()));
    }

    private void checkWriteError() throws Exception {
        Throwable andSet = this.writeError.getAndSet(null);
        if (andSet != null) {
            throw new IOException("Write failure", andSet);
        }
    }

    @VisibleForTesting
    void flushAndVerify() throws Exception {
        this.writer.flush();
        synchronized (this) {
            while (this.pendingWritesCount.get() > 0) {
                wait();
            }
        }
        checkWriteError();
    }

    @VisibleForTesting
    protected EventStreamClientFactory createClientFactory(String str, ClientConfig clientConfig) {
        return EventStreamClientFactory.withScope(str, clientConfig);
    }

    @VisibleForTesting
    protected void createInternalWriter() {
        Preconditions.checkState(this.clientFactory != null, "clientFactory not initialized");
        FlinkSerializer flinkSerializer = new FlinkSerializer(this.serializationSchema);
        EventWriterConfig build = EventWriterConfig.builder().transactionTimeoutTime(this.txnLeaseRenewalPeriod).build();
        if (this.writerMode == PravegaWriterMode.EXACTLY_ONCE) {
            this.transactionalWriter = this.clientFactory.createTransactionalEventWriter(writerId(), this.stream.getStreamName(), flinkSerializer, build);
        } else {
            this.executorService = createExecutorService();
            this.writer = this.clientFactory.createEventWriter(writerId(), this.stream.getStreamName(), flinkSerializer, build);
        }
    }

    boolean shouldEmitWatermark(long j, SinkFunction.Context context) {
        return context.currentWatermark() > Long.MIN_VALUE && context.currentWatermark() < Long.MAX_VALUE && j < context.currentWatermark() && context.timestamp().longValue() >= context.currentWatermark();
    }

    @VisibleForTesting
    protected ExecutorService createExecutorService() {
        return Executors.newSingleThreadExecutor();
    }

    private void initializeInternalWriter() {
        if (this.writerMode == PravegaWriterMode.EXACTLY_ONCE) {
            if (this.transactionalWriter != null) {
                return;
            }
        } else if (this.writer != null) {
            return;
        }
        if (this.writerMode == PravegaWriterMode.EXACTLY_ONCE && !isCheckpointEnabled()) {
            throw new UnsupportedOperationException("Enable checkpointing to use the exactly-once writer mode.");
        }
        this.clientFactory = createClientFactory(this.stream.getScope(), this.clientConfig);
        createInternalWriter();
    }

    private boolean isCheckpointEnabled() {
        return getRuntimeContext().isCheckpointingEnabled();
    }

    protected String writerId() {
        return this.writerIdPrefix + "-" + getRuntimeContext().getIndexOfThisSubtask();
    }

    public static <T> Builder<T> builder() {
        return new Builder<>();
    }

    /* renamed from: ignoreFailuresAfterTransactionTimeout, reason: merged with bridge method [inline-methods] */
    public FlinkPravegaWriter<T> m126ignoreFailuresAfterTransactionTimeout() {
        super.ignoreFailuresAfterTransactionTimeout();
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected /* bridge */ /* synthetic */ void invoke(Object obj, Object obj2, SinkFunction.Context context) throws Exception {
        invoke((PravegaTransactionState) obj, (PravegaTransactionState) obj2, context);
    }
}
