package io.pravega.connectors.flink;

import io.pravega.client.ClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.connectors.flink.serialization.WrappingSerializer;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shaded.org.apache.commons.lang3.RandomStringUtils;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader.class */
public class FlinkPravegaReader<T> extends RichParallelSourceFunction<T> implements ResultTypeQueryable<T>, StoppableFunction, ExternallyInducedSource<T, Checkpoint> {
    private static final Logger log = LoggerFactory.getLogger(FlinkPravegaReader.class);
    private static final long serialVersionUID = 1;
    private static final long DEFAULT_EVENT_READ_TIMEOUT = 1000;
    private static final long DEFAULT_CHECKPOINT_INITIATE_TIMEOUT = 5000;
    private final DeserializationSchema<T> deserializationSchema;
    private final URI controllerURI;
    private final String scopeName;
    private final String readerGroupName;
    private final String readerName;
    private long eventReadTimeout;
    private long checkpointInitiateTimeout;
    private volatile boolean running;
    private transient ExternallyInducedSource.CheckpointTrigger checkpointTrigger;

    @VisibleForTesting
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader$FlinkDeserializer.class */
    static final class FlinkDeserializer<T> implements Serializer<T> {
        private final DeserializationSchema<T> deserializationSchema;

        FlinkDeserializer(DeserializationSchema<T> deserializationSchema) {
            this.deserializationSchema = deserializationSchema;
        }

        @Override // io.pravega.client.stream.Serializer
        public ByteBuffer serialize(T t) {
            throw new IllegalStateException("serialize() called within a deserializer");
        }

        @Override // io.pravega.client.stream.Serializer
        public T deserialize(ByteBuffer byteBuffer) {
            byte[] bArr;
            if (byteBuffer.hasArray() && byteBuffer.arrayOffset() == 0 && byteBuffer.position() == 0 && byteBuffer.limit() == byteBuffer.capacity()) {
                bArr = byteBuffer.array();
            } else {
                bArr = new byte[byteBuffer.remaining()];
                byteBuffer.get(bArr);
            }
            return (T) this.deserializationSchema.deserialize(bArr);
        }
    }

    public FlinkPravegaReader(URI uri, String str, Set<String> set, long j, DeserializationSchema<T> deserializationSchema) {
        this(uri, str, set, j, deserializationSchema, null);
    }

    public FlinkPravegaReader(URI uri, String str, Set<String> set, long j, DeserializationSchema<T> deserializationSchema, String str2) {
        this.eventReadTimeout = 1000L;
        this.checkpointInitiateTimeout = DEFAULT_CHECKPOINT_INITIATE_TIMEOUT;
        this.running = true;
        Preconditions.checkNotNull(uri, "controllerURI");
        Preconditions.checkNotNull(str, "scope");
        Preconditions.checkNotNull(set, "streamNames");
        Preconditions.checkArgument(j >= 0, "start time must be >= 0");
        Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
        if (str2 == null) {
            this.readerName = getDefaultReaderName(str, set);
        } else {
            this.readerName = str2;
        }
        this.controllerURI = uri;
        this.scopeName = str;
        this.deserializationSchema = deserializationSchema;
        this.readerGroupName = "flink" + RandomStringUtils.randomAlphanumeric(20).toLowerCase();
        log.info("Creating reader group: {} for the Flink job", this.readerGroupName);
        ReaderGroupManager.withScope(str, uri).createReaderGroup(this.readerGroupName, ReaderGroupConfig.builder().startingTime(j).build(), set);
    }

    public void setCheckpointInitiateTimeout(long j) {
        Preconditions.checkArgument(j > 0, "timeout must be >= 0");
        this.checkpointInitiateTimeout = j;
    }

    public long getCheckpointInitiateTimeout() {
        return this.checkpointInitiateTimeout;
    }

    public void setEventReadTimeout(long j) {
        Preconditions.checkArgument(this.checkpointInitiateTimeout > 0, "timeout must be >= 0");
        this.eventReadTimeout = j;
    }

    public long getEventReadTimeout() {
        return this.eventReadTimeout;
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        String taskNameWithSubtasks = getRuntimeContext().getTaskNameWithSubtasks();
        log.info("{} : Creating Pravega reader with ID '{}' for controller URI: {}", new Object[]{getRuntimeContext().getTaskNameWithSubtasks(), taskNameWithSubtasks, this.controllerURI});
        EventStreamReader<T> createReader = ClientFactory.withScope(this.scopeName, this.controllerURI).createReader(taskNameWithSubtasks, this.readerGroupName, this.deserializationSchema instanceof WrappingSerializer ? this.deserializationSchema.getWrappedSerializer() : new FlinkDeserializer<>(this.deserializationSchema), ReaderConfig.builder().build());
        Throwable th = null;
        try {
            try {
                log.info("Starting Pravega reader '{}' for controller URI {}", taskNameWithSubtasks, this.controllerURI);
                while (this.running) {
                    EventRead<T> readNextEvent = createReader.readNextEvent(this.eventReadTimeout);
                    T event = readNextEvent.getEvent();
                    if (event != null) {
                        if (this.deserializationSchema.isEndOfStream(event)) {
                            log.info("Reached end of stream for reader: {}", taskNameWithSubtasks);
                            if (createReader != null) {
                                if (0 == 0) {
                                    createReader.close();
                                    return;
                                }
                                try {
                                    createReader.close();
                                    return;
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                    return;
                                }
                            }
                            return;
                        }
                        sourceContext.collect(event);
                    }
                    if (readNextEvent.isCheckpoint()) {
                        triggerCheckpoint(readNextEvent.getCheckpointName());
                    }
                }
                if (createReader != null) {
                    if (0 == 0) {
                        createReader.close();
                        return;
                    }
                    try {
                        createReader.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (createReader != null) {
                if (th != null) {
                    try {
                        createReader.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createReader.close();
                }
            }
            throw th5;
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void stop() {
        this.running = false;
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    public MasterTriggerRestoreHook<Checkpoint> createMasterTriggerRestoreHook() {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
        try {
            ReaderCheckpointHook readerCheckpointHook = new ReaderCheckpointHook(this.readerName, this.readerGroupName, this.scopeName, this.controllerURI, this.checkpointInitiateTimeout);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return readerCheckpointHook;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    public void setCheckpointTrigger(ExternallyInducedSource.CheckpointTrigger checkpointTrigger) {
        this.checkpointTrigger = checkpointTrigger;
    }

    private void triggerCheckpoint(String str) throws FlinkException {
        Preconditions.checkState(this.checkpointTrigger != null, "checkpoint trigger not set");
        log.debug("{} received checkpoint event for {}", getRuntimeContext().getTaskNameWithSubtasks(), str);
        try {
            this.checkpointTrigger.triggerCheckpoint(ReaderCheckpointHook.parseCheckpointId(str));
        } catch (IllegalArgumentException e) {
            throw new FlinkException("Cannot trigger checkpoint due to invalid Pravega checkpoint name", e.getCause());
        }
    }

    private static String getDefaultReaderName(String str, Set<String> set) {
        String str2 = ((String) set.stream().collect(Collectors.joining("-"))) + "-" + str;
        int i = 0;
        for (int i2 = 0; i2 < str2.length(); i2++) {
            i = str2.charAt(i2) + (31 * i);
        }
        return Integer.toString(i);
    }
}
