package io.pravega.connectors.flink;

import io.pravega.client.ClientFactory;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.shaded.com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaWriter.class */
public class FlinkPravegaWriter<T> extends RichSinkFunction<T> implements CheckpointedFunction, Serializable {
    private static final Logger log = LoggerFactory.getLogger(FlinkPravegaWriter.class);
    private static final long serialVersionUID = 1;
    private final SerializationSchema<T> serializationSchema;
    private final PravegaEventRouter<T> eventRouter;
    private final URI controllerURI;
    private final String scopeName;
    private final String streamName;
    private PravegaWriterMode writerMode = PravegaWriterMode.ATLEAST_ONCE;
    private transient EventStreamWriter<T> pravegaWriter = null;
    private transient Serializer<T> eventSerializer = null;
    private transient AtomicReference<Throwable> writeError = null;
    private transient AtomicInteger pendingWritesCount = null;
    private transient ExecutorService executorService = null;

    public FlinkPravegaWriter(URI uri, String str, String str2, SerializationSchema<T> serializationSchema, PravegaEventRouter<T> pravegaEventRouter) {
        Preconditions.checkNotNull(uri);
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(str2);
        Preconditions.checkNotNull(serializationSchema);
        Preconditions.checkNotNull(pravegaEventRouter);
        this.controllerURI = uri;
        this.scopeName = str;
        this.streamName = str2;
        this.serializationSchema = serializationSchema;
        this.eventRouter = pravegaEventRouter;
    }

    public PravegaEventRouter<T> getEventRouter() {
        return this.eventRouter;
    }

    public void setPravegaWriterMode(PravegaWriterMode pravegaWriterMode) {
        Preconditions.checkNotNull(pravegaWriterMode);
        this.writerMode = pravegaWriterMode;
    }

    public void open(Configuration configuration) throws Exception {
        this.eventSerializer = new Serializer<T>() { // from class: io.pravega.connectors.flink.FlinkPravegaWriter.1
            @Override // io.pravega.client.stream.Serializer
            public ByteBuffer serialize(T t) {
                return ByteBuffer.wrap(FlinkPravegaWriter.this.serializationSchema.serialize(t));
            }

            @Override // io.pravega.client.stream.Serializer
            public T deserialize(ByteBuffer byteBuffer) {
                throw new IllegalStateException("deserialize() called for a serializer");
            }
        };
        this.writeError = new AtomicReference<>(null);
        this.pendingWritesCount = new AtomicInteger(0);
        this.pravegaWriter = ClientFactory.withScope(this.scopeName, this.controllerURI).createEventWriter(this.streamName, this.eventSerializer, EventWriterConfig.builder().build());
        this.executorService = Executors.newFixedThreadPool(5);
        log.info("Initialized pravega writer for stream: {}/{} with controller URI: {}", new Object[]{this.scopeName, this.streamName, this.controllerURI});
    }

    public void close() throws Exception {
        if (this.writerMode == PravegaWriterMode.ATLEAST_ONCE) {
            flushAndVerify();
        }
        this.pravegaWriter.close();
    }

    public void invoke(T t) throws Exception {
        if (this.writerMode == PravegaWriterMode.ATLEAST_ONCE) {
            checkWriteError();
        }
        this.pendingWritesCount.incrementAndGet();
        CompletableFuture<Void> writeEvent = this.pravegaWriter.writeEvent(this.eventRouter.getRoutingKey(t), t);
        if (this.writerMode == PravegaWriterMode.ATLEAST_ONCE) {
            writeEvent.whenCompleteAsync((r5, th) -> {
                if (th != null) {
                    log.warn("Detected a write failure: {}", th);
                    this.writeError.compareAndSet(null, th);
                } else {
                    synchronized (this) {
                        this.pendingWritesCount.decrementAndGet();
                        notify();
                    }
                }
            }, (Executor) this.executorService);
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (this.writerMode == PravegaWriterMode.ATLEAST_ONCE) {
            log.debug("Snapshot triggered, wait for all pending writes to complete");
            flushAndVerify();
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    private void flushAndVerify() throws Exception {
        this.pravegaWriter.flush();
        synchronized (this) {
            while (this.pendingWritesCount.get() > 0) {
                wait();
            }
        }
        checkWriteError();
    }

    private void checkWriteError() throws Exception {
        Throwable andSet = this.writeError.getAndSet(null);
        if (andSet != null) {
            throw new IOException("Write failure", andSet);
        }
    }
}
