package io.pravega.connectors.flink.util;

import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.connectors.flink.FlinkPravegaReader;
import io.pravega.connectors.flink.FlinkPravegaWriter;
import io.pravega.connectors.flink.PravegaEventRouter;
import io.pravega.connectors.flink.serialization.PravegaSerialization;
import io.pravega.shaded.com.google.common.collect.Sets;
import java.io.Serializable;
import java.net.URI;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;

/* loaded from: input_file:io/pravega/connectors/flink/util/FlinkPravegaParams.class */
public class FlinkPravegaParams {
    public static final String DEFAULT_CONTROLLER_URI = "tcp://127.0.0.1:9090";
    public static final String CONTROLLER_PARAM_NAME = "controller";
    private ParameterTool params;

    public FlinkPravegaParams(ParameterTool parameterTool) {
        this.params = parameterTool;
    }

    public URI getControllerUri() {
        return URI.create(this.params.get(CONTROLLER_PARAM_NAME, DEFAULT_CONTROLLER_URI));
    }

    public StreamId getStreamFromParam(String str, String str2) {
        return StreamId.fromSpec(this.params.get(str, str2));
    }

    public <T extends Serializable> FlinkPravegaReader<T> newReader(StreamId streamId, long j, Class<T> cls) {
        return newReader(streamId, j, PravegaSerialization.deserializationFor(cls));
    }

    public <T extends Serializable> FlinkPravegaReader<T> newReader(StreamId streamId, long j, DeserializationSchema<T> deserializationSchema) {
        return new FlinkPravegaReader<>(getControllerUri(), streamId.getScope(), Sets.newHashSet(streamId.getName()), j, deserializationSchema);
    }

    public <T extends Serializable> FlinkPravegaWriter<T> newWriter(StreamId streamId, Class<T> cls, PravegaEventRouter<T> pravegaEventRouter) {
        return newWriter(streamId, PravegaSerialization.serializationFor(cls), pravegaEventRouter);
    }

    public <T extends Serializable> FlinkPravegaWriter<T> newWriter(StreamId streamId, SerializationSchema<T> serializationSchema, PravegaEventRouter<T> pravegaEventRouter) {
        return new FlinkPravegaWriter<>(getControllerUri(), streamId.getScope(), streamId.getName(), serializationSchema, pravegaEventRouter);
    }

    public StreamId createStreamFromParam(String str, String str2) {
        StreamId streamFromParam = getStreamFromParam(str, str2);
        createStream(streamFromParam);
        return streamFromParam;
    }

    public void createStream(StreamId streamId) {
        createStream(streamId, ScalingPolicy.fixed(1));
    }

    public void createStream(StreamId streamId, ScalingPolicy scalingPolicy) {
        StreamManager create = StreamManager.create(getControllerUri());
        create.createScope(streamId.getScope());
        create.createStream(streamId.getScope(), streamId.getName(), StreamConfiguration.builder().scalingPolicy(scalingPolicy).build());
    }
}
