package io.pravega.connectors.flink;

import io.pravega.client.ClientConfig;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.connectors.flink.util.FlinkPravegaUtils;
import io.pravega.shaded.com.google.common.base.Preconditions;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader.class */
public class FlinkPravegaReader<T> extends RichParallelSourceFunction<T> implements ResultTypeQueryable<T>, StoppableFunction, ExternallyInducedSource<T, Checkpoint> {
    private static final Logger log = LoggerFactory.getLogger(FlinkPravegaReader.class);
    private static final long serialVersionUID = 1;
    final String hookUid;
    final ClientConfig clientConfig;
    final ReaderGroupConfig readerGroupConfig;
    final String readerGroupScope;
    final String readerGroupName;
    final DeserializationSchema<T> deserializationSchema;
    final Time eventReadTimeout;
    final Time checkpointInitiateTimeout;
    volatile boolean running = true;
    private transient ExternallyInducedSource.CheckpointTrigger checkpointTrigger;

    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader$Builder.class */
    public static class Builder<T> extends AbstractStreamingReaderBuilder<T, Builder<T>> {
        private DeserializationSchema<T> deserializationSchema;

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.pravega.connectors.flink.AbstractReaderBuilder
        public Builder<T> builder() {
            return this;
        }

        public Builder<T> withDeserializationSchema(DeserializationSchema<T> deserializationSchema) {
            this.deserializationSchema = deserializationSchema;
            return builder();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.pravega.connectors.flink.AbstractStreamingReaderBuilder
        public DeserializationSchema<T> getDeserializationSchema() {
            Preconditions.checkState(this.deserializationSchema != null, "Deserialization schema must not be null.");
            return this.deserializationSchema;
        }

        public FlinkPravegaReader<T> build() {
            FlinkPravegaReader<T> buildSourceFunction = buildSourceFunction();
            buildSourceFunction.initialize();
            return buildSourceFunction;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkPravegaReader(String str, ClientConfig clientConfig, ReaderGroupConfig readerGroupConfig, String str2, String str3, DeserializationSchema<T> deserializationSchema, Time time, Time time2) {
        this.hookUid = (String) Preconditions.checkNotNull(str, "hookUid");
        this.clientConfig = (ClientConfig) Preconditions.checkNotNull(clientConfig, "clientConfig");
        this.readerGroupConfig = (ReaderGroupConfig) Preconditions.checkNotNull(readerGroupConfig, "readerGroupConfig");
        this.readerGroupScope = (String) Preconditions.checkNotNull(str2, "readerGroupScope");
        this.readerGroupName = (String) Preconditions.checkNotNull(str3, "readerGroupName");
        this.deserializationSchema = (DeserializationSchema) Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
        this.eventReadTimeout = (Time) Preconditions.checkNotNull(time, "eventReadTimeout");
        this.checkpointInitiateTimeout = (Time) Preconditions.checkNotNull(time2, "checkpointInitiateTimeout");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initialize() {
        log.info("Creating reader group: {}/{} for the Flink job", this.readerGroupScope, this.readerGroupName);
        createReaderGroup();
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        String taskNameWithSubtasks = getRuntimeContext().getTaskNameWithSubtasks();
        log.info("{} : Creating Pravega reader with ID '{}' for controller URI: {}", new Object[]{getRuntimeContext().getTaskNameWithSubtasks(), taskNameWithSubtasks, this.clientConfig.getControllerURI()});
        EventStreamReader<T> createEventStreamReader = createEventStreamReader(taskNameWithSubtasks);
        Throwable th = null;
        try {
            log.info("Starting Pravega reader '{}' for controller URI {}", taskNameWithSubtasks, this.clientConfig.getControllerURI());
            while (this.running) {
                EventRead<T> readNextEvent = createEventStreamReader.readNextEvent(this.eventReadTimeout.toMilliseconds());
                T event = readNextEvent.getEvent();
                if (event != null) {
                    if (this.deserializationSchema.isEndOfStream(event)) {
                        log.info("Reached end of stream for reader: {}", taskNameWithSubtasks);
                        if (createEventStreamReader != null) {
                            if (0 == 0) {
                                createEventStreamReader.close();
                                return;
                            }
                            try {
                                createEventStreamReader.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    }
                    synchronized (sourceContext.getCheckpointLock()) {
                        sourceContext.collect(event);
                    }
                }
                if (readNextEvent.isCheckpoint()) {
                    triggerCheckpoint(readNextEvent.getCheckpointName());
                }
            }
            if (createEventStreamReader != null) {
                if (0 == 0) {
                    createEventStreamReader.close();
                    return;
                }
                try {
                    createEventStreamReader.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (createEventStreamReader != null) {
                if (0 != 0) {
                    try {
                        createEventStreamReader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createEventStreamReader.close();
                }
            }
            throw th4;
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void stop() {
        this.running = false;
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    public MasterTriggerRestoreHook<Checkpoint> createMasterTriggerRestoreHook() {
        return new ReaderCheckpointHook(this.hookUid, createReaderGroup(), this.checkpointInitiateTimeout);
    }

    public void setCheckpointTrigger(ExternallyInducedSource.CheckpointTrigger checkpointTrigger) {
        this.checkpointTrigger = checkpointTrigger;
    }

    private void triggerCheckpoint(String str) throws FlinkException {
        Preconditions.checkState(this.checkpointTrigger != null, "checkpoint trigger not set");
        log.debug("{} received checkpoint event for {}", getRuntimeContext().getTaskNameWithSubtasks(), str);
        try {
            this.checkpointTrigger.triggerCheckpoint(ReaderCheckpointHook.parseCheckpointId(str));
        } catch (IllegalArgumentException e) {
            throw new FlinkException("Cannot trigger checkpoint due to invalid Pravega checkpoint name", e.getCause());
        }
    }

    protected ReaderGroup createReaderGroup() {
        ReaderGroupManager createReaderGroupManager = createReaderGroupManager();
        createReaderGroupManager.createReaderGroup(this.readerGroupName, this.readerGroupConfig);
        return createReaderGroupManager.getReaderGroup(this.readerGroupName);
    }

    protected ReaderGroupManager createReaderGroupManager() {
        return ReaderGroupManager.withScope(this.readerGroupScope, this.clientConfig);
    }

    protected EventStreamReader<T> createEventStreamReader(String str) {
        return FlinkPravegaUtils.createPravegaReader(this.clientConfig, str, this.readerGroupScope, this.readerGroupName, this.deserializationSchema, ReaderConfig.builder().build());
    }

    public static <T> Builder<T> builder() {
        return new Builder<>();
    }
}
