package io.pravega.connectors.flink;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TransactionalEventStreamWriter;
import io.pravega.common.Exceptions;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter.class */
public class FlinkPravegaWriter<T> extends RichSinkFunction<T> implements ListCheckpointed<PendingTransaction>, CheckpointListener {
    private static final Logger log = LoggerFactory.getLogger(FlinkPravegaWriter.class);
    private static final long serialVersionUID = 1;
    private static final String PRAVEGA_WRITER_METRICS_GROUP = "PravegaWriter";
    private static final String SCOPED_STREAM_METRICS_GAUGE = "stream";
    final boolean enableMetrics;
    final ClientConfig clientConfig;
    final SerializationSchema<T> serializationSchema;
    final PravegaEventRouter<T> eventRouter;

    @SuppressFBWarnings({"SE_BAD_FIELD"})
    final Stream stream;
    private final long txnLeaseRenewalPeriod;
    private PravegaWriterMode writerMode;

    @VisibleForTesting
    transient FlinkPravegaWriter<T>.AbstractInternalWriter writer = null;
    private transient EventStreamClientFactory clientFactory = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$AbstractInternalWriter.class */
    public abstract class AbstractInternalWriter {
        private EventStreamWriter<T> pravegaWriter;
        private TransactionalEventStreamWriter<T> pravegaTxnWriter;

        AbstractInternalWriter(EventStreamClientFactory eventStreamClientFactory, boolean z) {
            FlinkSerializer flinkSerializer = new FlinkSerializer(FlinkPravegaWriter.this.serializationSchema);
            EventWriterConfig build = EventWriterConfig.builder().transactionTimeoutTime(FlinkPravegaWriter.this.txnLeaseRenewalPeriod).build();
            if (z) {
                this.pravegaTxnWriter = eventStreamClientFactory.createTransactionalEventWriter(FlinkPravegaWriter.this.stream.getStreamName(), flinkSerializer, build);
            } else {
                this.pravegaWriter = eventStreamClientFactory.createEventWriter(FlinkPravegaWriter.this.stream.getStreamName(), flinkSerializer, build);
            }
        }

        abstract void open() throws Exception;

        abstract void write(T t) throws Exception;

        void close() throws Exception {
            if (this.pravegaWriter != null) {
                this.pravegaWriter.close();
            }
            if (this.pravegaTxnWriter != null) {
                this.pravegaTxnWriter.close();
            }
        }

        abstract List<PendingTransaction> snapshotState(long j, long j2) throws Exception;

        abstract void restoreState(List<PendingTransaction> list) throws Exception;

        abstract void notifyCheckpointComplete(long j) throws Exception;

        public EventStreamWriter<T> getPravegaWriter() {
            return this.pravegaWriter;
        }

        public TransactionalEventStreamWriter<T> getPravegaTxnWriter() {
            return this.pravegaTxnWriter;
        }
    }

    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$Builder.class */
    public static class Builder<T> extends AbstractStreamingWriterBuilder<T, Builder<T>> {
        private SerializationSchema<T> serializationSchema;
        private PravegaEventRouter<T> eventRouter;

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.pravega.connectors.flink.AbstractWriterBuilder
        public Builder<T> builder() {
            return this;
        }

        public Builder<T> withSerializationSchema(SerializationSchema<T> serializationSchema) {
            this.serializationSchema = serializationSchema;
            return builder();
        }

        public Builder<T> withEventRouter(PravegaEventRouter<T> pravegaEventRouter) {
            this.eventRouter = pravegaEventRouter;
            return builder();
        }

        public FlinkPravegaWriter<T> build() {
            Preconditions.checkState(this.eventRouter != null, "Event router must be supplied.");
            Preconditions.checkState(this.serializationSchema != null, "Serialization schema must be supplied.");
            return createSinkFunction(this.serializationSchema, this.eventRouter);
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$FlinkSerializer.class */
    static final class FlinkSerializer<T> implements Serializer<T> {
        private final SerializationSchema<T> serializationSchema;

        /* JADX INFO: Access modifiers changed from: package-private */
        public FlinkSerializer(SerializationSchema<T> serializationSchema) {
            this.serializationSchema = serializationSchema;
        }

        @Override // io.pravega.client.stream.Serializer
        public ByteBuffer serialize(T t) {
            return ByteBuffer.wrap(this.serializationSchema.serialize(t));
        }

        @Override // io.pravega.client.stream.Serializer
        public T deserialize(ByteBuffer byteBuffer) {
            throw new IllegalStateException("deserialize() called within a serializer");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$NonTransactionalWriter.class */
    public class NonTransactionalWriter extends FlinkPravegaWriter<T>.AbstractInternalWriter {

        @VisibleForTesting
        final AtomicReference<Throwable> writeError;

        @VisibleForTesting
        final AtomicInteger pendingWritesCount;
        private final ExecutorService executorService;

        NonTransactionalWriter(EventStreamClientFactory eventStreamClientFactory, ExecutorService executorService) {
            super(eventStreamClientFactory, false);
            this.writeError = new AtomicReference<>(null);
            this.pendingWritesCount = new AtomicInteger(0);
            this.executorService = executorService;
        }

        @Override // io.pravega.connectors.flink.FlinkPravegaWriter.AbstractInternalWriter
        public void open() throws Exception {
        }

        @Override // io.pravega.connectors.flink.FlinkPravegaWriter.AbstractInternalWriter
        public void write(T t) throws Exception {
            checkWriteError();
            this.pendingWritesCount.incrementAndGet();
            getPravegaWriter().writeEvent(FlinkPravegaWriter.this.eventRouter.getRoutingKey(t), t).whenCompleteAsync((r5, th) -> {
                if (th != null) {
                    FlinkPravegaWriter.log.warn("Detected a write failure: {}", th);
                    this.writeError.compareAndSet(null, th);
                }
                synchronized (this) {
                    this.pendingWritesCount.decrementAndGet();
                    notify();
                }
            }, (Executor) this.executorService);
        }

        @Override // io.pravega.connectors.flink.FlinkPravegaWriter.AbstractInternalWriter
        public void close() throws Exception {
            Exception exc = null;
            try {
                flushAndVerify();
            } catch (Exception e) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
            }
            try {
                super.close();
            } catch (Exception e2) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
            }
            try {
                this.executorService.shutdown();
            } catch (Exception e3) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
            }
            if (exc != null) {
                throw exc;
            }
        }

        @Override // io.pravega.connectors.flink.FlinkPravegaWriter.AbstractInternalWriter
        public List<PendingTransaction> snapshotState(long j, long j2) throws Exception {
            FlinkPravegaWriter.log.debug("Snapshot triggered, wait for all pending writes to complete");
            flushAndVerify();
            return new ArrayList();
        }

        @Override // io.pravega.connectors.flink.FlinkPravegaWriter.AbstractInternalWriter
        public void notifyCheckpointComplete(long j) throws Exception {
        }

        @Override // io.pravega.connectors.flink.FlinkPravegaWriter.AbstractInternalWriter
        public void restoreState(List<PendingTransaction> list) throws Exception {
        }

        @VisibleForTesting
        void flushAndVerify() throws Exception {
            getPravegaWriter().flush();
            synchronized (this) {
                while (this.pendingWritesCount.get() > 0) {
                    wait();
                }
            }
            checkWriteError();
        }

        private void checkWriteError() throws Exception {
            Throwable andSet = this.writeError.getAndSet(null);
            if (andSet != null) {
                throw new IOException("Write failure", andSet);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$PendingTransaction.class */
    public static class PendingTransaction implements Serializable {
        private final UUID uuid;
        private final String scope;
        private final String stream;

        public PendingTransaction(UUID uuid, String str, String str2) {
            Preconditions.checkNotNull(uuid, "UUID");
            Preconditions.checkNotNull(str, Pravega.CONNECTOR_READER_STREAM_INFO_SCOPE);
            Preconditions.checkNotNull(str2, "stream");
            this.uuid = uuid;
            this.scope = str;
            this.stream = str2;
        }

        public UUID getUuid() {
            return this.uuid;
        }

        public String getScope() {
            return this.scope;
        }

        public String getStream() {
            return this.stream;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$StreamNameGauge.class */
    public static class StreamNameGauge implements Gauge<String> {
        final String stream;

        public StreamNameGauge(String str) {
            this.stream = str;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public String m93getValue() {
            return this.stream;
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$TransactionAndCheckpoint.class */
    static final class TransactionAndCheckpoint<T> {
        private final Transaction<T> transaction;
        private final long checkpointId;

        TransactionAndCheckpoint(Transaction<T> transaction, long j) {
            this.transaction = transaction;
            this.checkpointId = j;
        }

        Transaction<T> transaction() {
            return this.transaction;
        }

        long checkpointId() {
            return this.checkpointId;
        }

        public String toString() {
            return "(checkpoint: " + this.checkpointId + ", transaction: " + this.transaction.getTxnId() + ')';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter$TransactionalWriter.class */
    public class TransactionalWriter extends FlinkPravegaWriter<T>.AbstractInternalWriter {

        @VisibleForTesting
        Transaction<T> currentTxn;

        @VisibleForTesting
        final ArrayDeque<TransactionAndCheckpoint<T>> txnsPendingCommit;

        TransactionalWriter(EventStreamClientFactory eventStreamClientFactory) {
            super(eventStreamClientFactory, true);
            this.txnsPendingCommit = new ArrayDeque<>();
        }

        @Override // io.pravega.connectors.flink.FlinkPravegaWriter.AbstractInternalWriter
        public void open() throws Exception {
            this.currentTxn = getPravegaTxnWriter().beginTxn();
            FlinkPravegaWriter.log.debug("{} - started first transaction '{}'", FlinkPravegaWriter.this.name(), this.currentTxn.getTxnId());
        }

        @Override // io.pravega.connectors.flink.FlinkPravegaWriter.AbstractInternalWriter
        public void write(T t) throws Exception {
            this.currentTxn.writeEvent(FlinkPravegaWriter.this.eventRouter.getRoutingKey(t), t);
        }

        @Override // io.pravega.connectors.flink.FlinkPravegaWriter.AbstractInternalWriter
        public void close() throws Exception {
            Exception exc = null;
            Transaction<T> transaction = this.currentTxn;
            if (transaction != null) {
                try {
                    transaction.getClass();
                    Exceptions.handleInterrupted(transaction::abort);
                } catch (Exception e) {
                    exc = e;
                }
            }
            try {
                super.close();
            } catch (Exception e2) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
            }
            if (exc != null) {
                throw exc;
            }
        }

        @Override // io.pravega.connectors.flink.FlinkPravegaWriter.AbstractInternalWriter
        public List<PendingTransaction> snapshotState(long j, long j2) throws Exception {
            Transaction<T> transaction = this.currentTxn;
            Preconditions.checkState(transaction != null, "bug: no transaction object when performing state snapshot");
            FlinkPravegaWriter.log.debug("{} - checkpoint {} triggered, flushing transaction '{}'", new Object[]{FlinkPravegaWriter.this.name(), Long.valueOf(j), transaction.getTxnId()});
            transaction.flush();
            this.txnsPendingCommit.addLast(new TransactionAndCheckpoint<>(transaction, j));
            this.currentTxn = getPravegaTxnWriter().beginTxn();
            FlinkPravegaWriter.log.debug("{} - started new transaction '{}'", FlinkPravegaWriter.this.name(), this.currentTxn.getTxnId());
            FlinkPravegaWriter.log.debug("{} - storing pending transactions {}", FlinkPravegaWriter.this.name(), this.txnsPendingCommit);
            return (List) this.txnsPendingCommit.stream().map(transactionAndCheckpoint -> {
                return new PendingTransaction(transactionAndCheckpoint.transaction().getTxnId(), FlinkPravegaWriter.this.stream.getScope(), FlinkPravegaWriter.this.stream.getStreamName());
            }).collect(Collectors.toList());
        }

        @Override // io.pravega.connectors.flink.FlinkPravegaWriter.AbstractInternalWriter
        public void notifyCheckpointComplete(long j) throws Exception {
            Preconditions.checkState(!this.txnsPendingCommit.isEmpty(), "checkpoint completed, but no transaction pending");
            while (true) {
                TransactionAndCheckpoint<T> peekFirst = this.txnsPendingCommit.peekFirst();
                if (peekFirst == null || peekFirst.checkpointId() > j) {
                    return;
                }
                this.txnsPendingCommit.removeFirst();
                FlinkPravegaWriter.log.info("{} - checkpoint {} complete, committing completed checkpoint transaction {}", new Object[]{FlinkPravegaWriter.this.name(), Long.valueOf(j), peekFirst.transaction().getTxnId()});
                peekFirst.transaction().commit();
                FlinkPravegaWriter.log.debug("{} - committed checkpoint transaction {}", FlinkPravegaWriter.this.name(), peekFirst.transaction().getTxnId());
            }
        }

        /* JADX WARN: Finally extract failed */
        @Override // io.pravega.connectors.flink.FlinkPravegaWriter.AbstractInternalWriter
        public void restoreState(List<PendingTransaction> list) throws Exception {
            EventStreamClientFactory createClientFactory;
            Throwable th;
            if (list == null || list.size() == 0) {
                return;
            }
            Map map = (Map) list.stream().collect(Collectors.groupingBy(pendingTransaction -> {
                return Stream.of(pendingTransaction.getScope(), pendingTransaction.getStream());
            }));
            FlinkPravegaWriter.log.debug("pendingTransactionsMap:: " + map);
            for (Map.Entry entry : map.entrySet()) {
                Stream stream = (Stream) entry.getKey();
                String scope = stream.getScope();
                String streamName = stream.getStreamName();
                FlinkSerializer flinkSerializer = new FlinkSerializer(FlinkPravegaWriter.this.serializationSchema);
                EventWriterConfig build = EventWriterConfig.builder().transactionTimeoutTime(FlinkPravegaWriter.this.txnLeaseRenewalPeriod).build();
                try {
                    createClientFactory = FlinkPravegaWriter.this.createClientFactory(scope, FlinkPravegaWriter.this.clientConfig);
                    th = null;
                } catch (Exception e) {
                    FlinkPravegaWriter.log.error("Exception occurred while restoring the state for scope: {} and stream: {}", new Object[]{scope, streamName, e});
                }
                try {
                    TransactionalEventStreamWriter<T> createTransactionalEventWriter = createClientFactory.createTransactionalEventWriter(streamName, flinkSerializer, build);
                    Throwable th2 = null;
                    try {
                        try {
                            FlinkPravegaWriter.log.info("restore state for the scope: {} and stream: {}", scope, streamName);
                            Iterator it = ((List) entry.getValue()).iterator();
                            while (it.hasNext()) {
                                UUID uuid = ((PendingTransaction) it.next()).getUuid();
                                Transaction<T> txn = createTransactionalEventWriter.getTxn(uuid);
                                Transaction.Status checkStatus = txn.checkStatus();
                                if (checkStatus == Transaction.Status.OPEN) {
                                    FlinkPravegaWriter.log.info("{} - committing completed checkpoint transaction {} after task restore", FlinkPravegaWriter.this.name(), uuid);
                                    txn.commit();
                                    FlinkPravegaWriter.log.debug("{} - committed checkpoint transaction {}", FlinkPravegaWriter.this.name(), uuid);
                                } else if (checkStatus == Transaction.Status.COMMITTED || checkStatus == Transaction.Status.COMMITTING) {
                                    FlinkPravegaWriter.log.debug("{} - at restore, transaction {} was already committed", FlinkPravegaWriter.this.name(), uuid);
                                } else {
                                    FlinkPravegaWriter.log.warn("{} - found unexpected transaction status {} for transaction {} on task restore. Transaction probably timed out between failure and restore. ", new Object[]{FlinkPravegaWriter.this.name(), checkStatus, uuid});
                                }
                            }
                            if (createTransactionalEventWriter != null) {
                                if (0 != 0) {
                                    try {
                                        createTransactionalEventWriter.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    createTransactionalEventWriter.close();
                                }
                            }
                            if (createClientFactory != null) {
                                if (0 != 0) {
                                    try {
                                        createClientFactory.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    createClientFactory.close();
                                }
                            }
                        } catch (Throwable th5) {
                            th2 = th5;
                            throw th5;
                        }
                    } catch (Throwable th6) {
                        if (createTransactionalEventWriter != null) {
                            if (th2 != null) {
                                try {
                                    createTransactionalEventWriter.close();
                                } catch (Throwable th7) {
                                    th2.addSuppressed(th7);
                                }
                            } else {
                                createTransactionalEventWriter.close();
                            }
                        }
                        throw th6;
                    }
                } catch (Throwable th8) {
                    if (createClientFactory != null) {
                        if (0 != 0) {
                            try {
                                createClientFactory.close();
                            } catch (Throwable th9) {
                                th.addSuppressed(th9);
                            }
                        } else {
                            createClientFactory.close();
                        }
                    }
                    throw th8;
                }
            }
            if (0 != 0) {
                throw null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkPravegaWriter(ClientConfig clientConfig, Stream stream, SerializationSchema<T> serializationSchema, PravegaEventRouter<T> pravegaEventRouter, PravegaWriterMode pravegaWriterMode, long j, boolean z) {
        this.clientConfig = (ClientConfig) Preconditions.checkNotNull(clientConfig, "clientConfig");
        this.stream = (Stream) Preconditions.checkNotNull(stream, "stream");
        this.serializationSchema = (SerializationSchema) Preconditions.checkNotNull(serializationSchema, "serializationSchema");
        this.eventRouter = (PravegaEventRouter) Preconditions.checkNotNull(pravegaEventRouter, "eventRouter");
        this.writerMode = (PravegaWriterMode) Preconditions.checkNotNull(pravegaWriterMode, "writerMode");
        Preconditions.checkArgument(j > 0, "txnLeaseRenewalPeriod must be > 0");
        this.txnLeaseRenewalPeriod = j;
        this.enableMetrics = z;
    }

    public PravegaEventRouter<T> getEventRouter() {
        return this.eventRouter;
    }

    public PravegaWriterMode getPravegaWriterMode() {
        return this.writerMode;
    }

    public void open(Configuration configuration) throws Exception {
        initializeInternalWriter();
        this.writer.open();
        log.info("Initialized Pravega writer for stream: {} with controller URI: {}", this.stream, this.clientConfig.getControllerURI());
        if (this.enableMetrics) {
            registerMetrics();
        }
    }

    public void invoke(T t, SinkFunction.Context context) throws Exception {
        this.writer.write(t);
    }

    public void close() throws Exception {
        Exception exc = null;
        if (this.writer != null) {
            try {
                this.writer.close();
            } catch (Exception e) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
            }
        }
        if (this.clientFactory != null) {
            try {
                this.clientFactory.close();
            } catch (Exception e2) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
            }
        }
        if (exc != null) {
            throw exc;
        }
    }

    private void registerMetrics() {
        getRuntimeContext().getMetricGroup().addGroup(PRAVEGA_WRITER_METRICS_GROUP).gauge("stream", new StreamNameGauge(this.stream.getScopedName()));
    }

    public List<PendingTransaction> snapshotState(long j, long j2) throws Exception {
        return this.writer.snapshotState(j, j2);
    }

    public void restoreState(List<PendingTransaction> list) throws Exception {
        initializeInternalWriter();
        this.writer.restoreState(list);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.writer.notifyCheckpointComplete(j);
    }

    @VisibleForTesting
    protected EventStreamClientFactory createClientFactory(String str, ClientConfig clientConfig) {
        return EventStreamClientFactory.withScope(str, clientConfig);
    }

    @VisibleForTesting
    protected FlinkPravegaWriter<T>.AbstractInternalWriter createInternalWriter() {
        Preconditions.checkState(this.clientFactory != null, "clientFactory not initialized");
        if (this.writerMode == PravegaWriterMode.EXACTLY_ONCE) {
            return new TransactionalWriter(this.clientFactory);
        }
        return new NonTransactionalWriter(this.clientFactory, createExecutorService());
    }

    @VisibleForTesting
    protected ExecutorService createExecutorService() {
        return Executors.newSingleThreadExecutor();
    }

    private void initializeInternalWriter() {
        if (this.writer != null) {
            return;
        }
        if (this.writerMode == PravegaWriterMode.EXACTLY_ONCE && !isCheckpointEnabled()) {
            throw new UnsupportedOperationException("Enable checkpointing to use the exactly-once writer mode.");
        }
        this.clientFactory = createClientFactory(this.stream.getScope(), this.clientConfig);
        this.writer = createInternalWriter();
    }

    private boolean isCheckpointEnabled() {
        return getRuntimeContext().isCheckpointingEnabled();
    }

    protected String name() {
        return getRuntimeContext().getTaskNameWithSubtasks();
    }

    public static <T> Builder<T> builder() {
        return new Builder<>();
    }
}
