package io.pravega.client.stream.impl;

import io.pravega.client.ClientFactory;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.netty.impl.ConnectionFactoryImpl;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentInputStreamFactory;
import io.pravega.client.segment.impl.SegmentInputStreamFactoryImpl;
import io.pravega.client.segment.impl.SegmentMetadataClientFactory;
import io.pravega.client.segment.impl.SegmentMetadataClientFactoryImpl;
import io.pravega.client.segment.impl.SegmentOutputStreamFactory;
import io.pravega.client.segment.impl.SegmentOutputStreamFactoryImpl;
import io.pravega.client.state.InitialUpdate;
import io.pravega.client.state.Revisioned;
import io.pravega.client.state.RevisionedStreamClient;
import io.pravega.client.state.StateSynchronizer;
import io.pravega.client.state.SynchronizerConfig;
import io.pravega.client.state.Update;
import io.pravega.client.state.impl.RevisionedStreamClientImpl;
import io.pravega.client.state.impl.StateSynchronizerImpl;
import io.pravega.client.state.impl.UpdateOrInitSerializer;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.concurrent.FutureHelpers;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shared.NameUtils;
import java.util.function.Supplier;

/* loaded from: input_file:io/pravega/client/stream/impl/ClientFactoryImpl.class */
public class ClientFactoryImpl implements ClientFactory {
    private final String scope;
    private final Controller controller;
    private final SegmentInputStreamFactory inFactory;
    private final SegmentOutputStreamFactory outFactory;
    private final SegmentMetadataClientFactory metaFactory;
    private final ConnectionFactory connectionFactory;

    public ClientFactoryImpl(String str, Controller controller) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(controller);
        this.scope = str;
        this.controller = controller;
        this.connectionFactory = new ConnectionFactoryImpl(false);
        this.inFactory = new SegmentInputStreamFactoryImpl(controller, this.connectionFactory);
        this.outFactory = new SegmentOutputStreamFactoryImpl(controller, this.connectionFactory);
        this.metaFactory = new SegmentMetadataClientFactoryImpl(controller, this.connectionFactory);
    }

    @VisibleForTesting
    public ClientFactoryImpl(String str, Controller controller, ConnectionFactory connectionFactory) {
        this(str, controller, connectionFactory, new SegmentInputStreamFactoryImpl(controller, connectionFactory), new SegmentOutputStreamFactoryImpl(controller, connectionFactory), new SegmentMetadataClientFactoryImpl(controller, connectionFactory));
    }

    @VisibleForTesting
    public ClientFactoryImpl(String str, Controller controller, ConnectionFactory connectionFactory, SegmentInputStreamFactory segmentInputStreamFactory, SegmentOutputStreamFactory segmentOutputStreamFactory, SegmentMetadataClientFactory segmentMetadataClientFactory) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(controller);
        Preconditions.checkNotNull(segmentInputStreamFactory);
        Preconditions.checkNotNull(segmentOutputStreamFactory);
        Preconditions.checkNotNull(segmentMetadataClientFactory);
        this.scope = str;
        this.controller = controller;
        this.connectionFactory = connectionFactory;
        this.inFactory = segmentInputStreamFactory;
        this.outFactory = segmentOutputStreamFactory;
        this.metaFactory = segmentMetadataClientFactory;
    }

    @Override // io.pravega.client.ClientFactory
    public <T> EventStreamWriter<T> createEventWriter(String str, Serializer<T> serializer, EventWriterConfig eventWriterConfig) {
        StreamImpl streamImpl = new StreamImpl(this.scope, str);
        return new EventStreamWriterImpl(streamImpl, this.controller, this.outFactory, serializer, eventWriterConfig, ExecutorServiceHelpers.getShrinkingExecutor(1, 100, "ScalingRetransmition-" + streamImpl.getScopedName()));
    }

    @Override // io.pravega.client.ClientFactory
    public <T> EventStreamReader<T> createReader(String str, String str2, Serializer<T> serializer, ReaderConfig readerConfig) {
        return createReader(str, str2, serializer, readerConfig, System::nanoTime, System::currentTimeMillis);
    }

    @VisibleForTesting
    public <T> EventStreamReader<T> createReader(String str, String str2, Serializer<T> serializer, ReaderConfig readerConfig, Supplier<Long> supplier, Supplier<Long> supplier2) {
        ReaderGroupStateManager readerGroupStateManager = new ReaderGroupStateManager(str, createStateSynchronizer(NameUtils.getStreamForReaderGroup(str2), new JavaSerializer(), new JavaSerializer(), SynchronizerConfig.builder().build()), this.controller, supplier);
        readerGroupStateManager.initializeReader(readerConfig.getInitialAllocationDelay());
        return new EventStreamReaderImpl(this.inFactory, serializer, readerGroupStateManager, new Orderer(), supplier2, readerConfig);
    }

    @Override // io.pravega.client.ClientFactory
    public <T> RevisionedStreamClient<T> createRevisionedStreamClient(String str, Serializer<T> serializer, SynchronizerConfig synchronizerConfig) {
        Segment segment = new Segment(this.scope, str, 0);
        return new RevisionedStreamClientImpl(segment, this.inFactory.createInputStreamForSegment(segment), this.outFactory.createOutputStreamForSegment(segment, segment2 -> {
            throw new IllegalStateException("RevisionedClient: Segmentsealed exception observed for segment:" + segment2);
        }, synchronizerConfig.getEventWriterConfig()), this.metaFactory.createSegmentMetadataClient(segment), serializer);
    }

    @Override // io.pravega.client.ClientFactory
    public <StateT extends Revisioned, UpdateT extends Update<StateT>, InitT extends InitialUpdate<StateT>> StateSynchronizer<StateT> createStateSynchronizer(String str, Serializer<UpdateT> serializer, Serializer<InitT> serializer2, SynchronizerConfig synchronizerConfig) {
        Segment segment = new Segment(this.scope, str, 0);
        if (((Boolean) FutureHelpers.getAndHandleExceptions(this.controller.isSegmentOpen(segment), InvalidStreamException::new)).booleanValue()) {
            return new StateSynchronizerImpl(segment, createRevisionedStreamClient(str, new UpdateOrInitSerializer(serializer, serializer2), synchronizerConfig));
        }
        throw new InvalidStreamException("Segment does not exist: " + segment);
    }

    @Override // io.pravega.client.ClientFactory, java.lang.AutoCloseable
    public void close() {
        this.connectionFactory.close();
    }
}
