package io.pravega.connectors.flink;

import io.pravega.client.ClientFactory;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Transaction;
import io.pravega.common.Exceptions;
import io.pravega.shaded.com.google.common.base.Preconditions;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/connectors/flink/FlinkExactlyOncePravegaWriter.class */
public class FlinkExactlyOncePravegaWriter<T> extends RichSinkFunction<T> implements ListCheckpointed<UUID>, CheckpointListener {
    private static final Logger log = LoggerFactory.getLogger(FlinkExactlyOncePravegaWriter.class);
    private static final long serialVersionUID = 1;
    private static final long DEFAULT_TXN_TIMEOUT_MILLIS = 7200000;
    private static final long DEFAULT_TX_SCALE_GRACE_MILLIS = 600000;
    private final SerializationSchema<T> serializationSchema;
    private final PravegaEventRouter<T> eventRouter;
    private final URI controllerURI;
    private final String scopeName;
    private final String streamName;
    private final long txnTimeoutMillis;
    private final long txnMaxTimeMillis;
    private final long txnGracePeriodMillis;
    private transient EventStreamWriter<T> pravegaWriter;
    private transient Transaction<T> currentTxn;
    private transient ArrayDeque<TransactionAndCheckpoint<T>> txnsPendingCommit;

    /* loaded from: input_file:io/pravega/connectors/flink/FlinkExactlyOncePravegaWriter$FlinkSerializer.class */
    private static final class FlinkSerializer<T> implements Serializer<T> {
        private final SerializationSchema<T> serializationSchema;

        FlinkSerializer(SerializationSchema<T> serializationSchema) {
            this.serializationSchema = serializationSchema;
        }

        @Override // io.pravega.client.stream.Serializer
        public ByteBuffer serialize(T t) {
            return ByteBuffer.wrap(this.serializationSchema.serialize(t));
        }

        @Override // io.pravega.client.stream.Serializer
        public T deserialize(ByteBuffer byteBuffer) {
            throw new IllegalStateException("deserialize() called within a serializer");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkExactlyOncePravegaWriter$TransactionAndCheckpoint.class */
    public static final class TransactionAndCheckpoint<T> {
        private final Transaction<T> transaction;
        private final long checkpointId;

        TransactionAndCheckpoint(Transaction<T> transaction, long j) {
            this.transaction = transaction;
            this.checkpointId = j;
        }

        Transaction<T> transaction() {
            return this.transaction;
        }

        long checkpointId() {
            return this.checkpointId;
        }

        public String toString() {
            return "(checkpoint: " + this.checkpointId + ", transaction: " + this.transaction.getTxnId() + ')';
        }
    }

    public FlinkExactlyOncePravegaWriter(URI uri, String str, String str2, SerializationSchema<T> serializationSchema, PravegaEventRouter<T> pravegaEventRouter) {
        this(uri, str, str2, serializationSchema, pravegaEventRouter, DEFAULT_TXN_TIMEOUT_MILLIS, DEFAULT_TXN_TIMEOUT_MILLIS, DEFAULT_TX_SCALE_GRACE_MILLIS);
    }

    public FlinkExactlyOncePravegaWriter(URI uri, String str, String str2, SerializationSchema<T> serializationSchema, PravegaEventRouter<T> pravegaEventRouter, long j, long j2, long j3) {
        Preconditions.checkNotNull(uri, "controllerURI");
        Preconditions.checkNotNull(str, "scope");
        Preconditions.checkNotNull(str2, "streamName");
        Preconditions.checkNotNull(serializationSchema, "serializationSchema");
        Preconditions.checkNotNull(pravegaEventRouter, "router");
        Preconditions.checkArgument(j > 0, "txnTimeoutMillis must be > 0");
        Preconditions.checkArgument(j2 > 0, "txnMaxTimeMillis must be > 0");
        Preconditions.checkArgument(j3 > 0, "txnGracePeriodMillis must be > 0");
        this.controllerURI = uri;
        this.scopeName = str;
        this.streamName = str2;
        this.serializationSchema = serializationSchema;
        this.eventRouter = pravegaEventRouter;
        this.txnTimeoutMillis = j;
        this.txnMaxTimeMillis = j2;
        this.txnGracePeriodMillis = j3;
    }

    public void open(Configuration configuration) throws Exception {
        this.pravegaWriter = ClientFactory.withScope(this.scopeName, this.controllerURI).createEventWriter(this.streamName, new FlinkSerializer(this.serializationSchema), EventWriterConfig.builder().build());
        log.info("Initialized pravega writer for stream: {}/{} with controller URI: {}", new Object[]{this.scopeName, this.streamName, this.controllerURI});
        this.currentTxn = this.pravegaWriter.beginTxn(this.txnTimeoutMillis, this.txnMaxTimeMillis, this.txnGracePeriodMillis);
        log.debug("{} - started first transaction '{}'", name(), this.currentTxn.getTxnId());
        this.txnsPendingCommit = new ArrayDeque<>();
    }

    public void invoke(T t) throws Exception {
        this.currentTxn.writeEvent(this.eventRouter.getRoutingKey(t), t);
    }

    public void close() throws Exception {
        Exception exc = null;
        Transaction<T> transaction = this.currentTxn;
        if (transaction != null) {
            try {
                transaction.getClass();
                Exceptions.handleInterrupted(transaction::abort);
            } catch (Exception e) {
                exc = e;
            }
        }
        EventStreamWriter<T> eventStreamWriter = this.pravegaWriter;
        if (eventStreamWriter != null) {
            try {
                eventStreamWriter.close();
            } catch (Exception e2) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
            }
        }
        if (exc != null) {
            throw exc;
        }
    }

    public List<UUID> snapshotState(long j, long j2) throws Exception {
        Transaction<T> transaction = this.currentTxn;
        Preconditions.checkState(transaction != null, "bug: no transaction object when performing state snapshot");
        log.debug("{} - checkpoint {} triggered, flushing transaction '{}'", new Object[]{name(), Long.valueOf(j), transaction.getTxnId()});
        transaction.flush();
        this.txnsPendingCommit.addLast(new TransactionAndCheckpoint<>(transaction, j));
        this.currentTxn = this.pravegaWriter.beginTxn(this.txnTimeoutMillis, this.txnMaxTimeMillis, this.txnGracePeriodMillis);
        log.debug("{} - started new transaction '{}'", name(), this.currentTxn.getTxnId());
        log.debug("{} - storing pending transactions {}", name(), this.txnsPendingCommit);
        return (List) this.txnsPendingCommit.stream().map(transactionAndCheckpoint -> {
            return transactionAndCheckpoint.transaction().getTxnId();
        }).collect(Collectors.toList());
    }

    public void restoreState(List<UUID> list) throws Exception {
        if (list == null || list.size() <= 0) {
            return;
        }
        EventStreamWriter<T> createEventWriter = ClientFactory.withScope(this.scopeName, this.controllerURI).createEventWriter(this.streamName, new FlinkSerializer(null), EventWriterConfig.builder().build());
        for (UUID uuid : list) {
            if (uuid != null) {
                Transaction<T> txn = createEventWriter.getTxn(uuid);
                Transaction.Status checkStatus = txn.checkStatus();
                if (checkStatus == Transaction.Status.OPEN) {
                    log.info("{} - committing completed checkpoint transaction {} after task restore", name(), uuid);
                    txn.commit();
                    log.debug("{} - committed checkpoint transaction {}", name(), uuid);
                } else if (checkStatus == Transaction.Status.COMMITTED || checkStatus == Transaction.Status.COMMITTING) {
                    log.debug("{} - at restore, transaction {} was already committed", name(), uuid);
                } else {
                    log.warn("{} - found unexpected transaction status {} for transaction {} on task restore. Transaction probably timed out between failure and restore. ", new Object[]{name(), checkStatus, uuid});
                }
            }
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        Preconditions.checkState(!this.txnsPendingCommit.isEmpty(), "checkpoint completed, but no transaction pending");
        while (true) {
            TransactionAndCheckpoint<T> peekFirst = this.txnsPendingCommit.peekFirst();
            if (peekFirst == null || peekFirst.checkpointId() > j) {
                return;
            }
            this.txnsPendingCommit.removeFirst();
            log.info("{} - checkpoint {} complete, committing completed checkpoint transaction {}", new Object[]{name(), Long.valueOf(j), peekFirst.transaction().getTxnId()});
            peekFirst.transaction().commit();
            log.debug("{} - committed checkpoint transaction {}", name(), peekFirst.transaction().getTxnId());
        }
    }

    private String name() {
        return getRuntimeContext().getTaskNameWithSubtasks();
    }
}
