package io.pravega.client.stream.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.ClientFactory;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.SynchronizerClientFactory;
import io.pravega.client.admin.impl.ReaderGroupManagerImpl;
import io.pravega.client.batch.impl.BatchClientFactoryImpl;
import io.pravega.client.byteStream.impl.ByteStreamClientImpl;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.netty.impl.ConnectionFactoryImpl;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.security.auth.DelegationTokenProviderFactory;
import io.pravega.client.segment.impl.ConditionalOutputStreamFactory;
import io.pravega.client.segment.impl.ConditionalOutputStreamFactoryImpl;
import io.pravega.client.segment.impl.EventSegmentReader;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentInputStreamFactory;
import io.pravega.client.segment.impl.SegmentInputStreamFactoryImpl;
import io.pravega.client.segment.impl.SegmentMetadataClientFactory;
import io.pravega.client.segment.impl.SegmentMetadataClientFactoryImpl;
import io.pravega.client.segment.impl.SegmentOutputStreamFactory;
import io.pravega.client.segment.impl.SegmentOutputStreamFactoryImpl;
import io.pravega.client.state.InitialUpdate;
import io.pravega.client.state.Revisioned;
import io.pravega.client.state.RevisionedStreamClient;
import io.pravega.client.state.StateSynchronizer;
import io.pravega.client.state.SynchronizerConfig;
import io.pravega.client.state.Update;
import io.pravega.client.state.impl.RevisionedStreamClientImpl;
import io.pravega.client.state.impl.StateSynchronizerImpl;
import io.pravega.client.state.impl.UpdateOrInitSerializer;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.TransactionalEventStreamWriter;
import io.pravega.client.watermark.WatermarkSerializer;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shaded.com.google.common.collect.ImmutableMap;
import io.pravega.shared.NameUtils;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/stream/impl/ClientFactoryImpl.class */
public class ClientFactoryImpl implements ClientFactory, EventStreamClientFactory, SynchronizerClientFactory {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ClientFactoryImpl.class);
    private final String scope;
    private final Controller controller;
    private final SegmentInputStreamFactory inFactory;
    private final SegmentOutputStreamFactory outFactory;
    private final ConditionalOutputStreamFactory condFactory;
    private final SegmentMetadataClientFactory metaFactory;
    private final ConnectionFactory connectionFactory;
    private final ScheduledExecutorService watermarkReaderThreads;

    public ClientFactoryImpl(String str, Controller controller) {
        this.watermarkReaderThreads = ExecutorServiceHelpers.newScheduledThreadPool(getThreadPoolSize(), "WatermarkReader");
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(controller);
        this.scope = str;
        this.controller = controller;
        this.connectionFactory = new ConnectionFactoryImpl(ClientConfig.builder().build());
        this.inFactory = new SegmentInputStreamFactoryImpl(controller, this.connectionFactory);
        this.outFactory = new SegmentOutputStreamFactoryImpl(controller, this.connectionFactory);
        this.condFactory = new ConditionalOutputStreamFactoryImpl(controller, this.connectionFactory);
        this.metaFactory = new SegmentMetadataClientFactoryImpl(controller, this.connectionFactory);
    }

    @VisibleForTesting
    public ClientFactoryImpl(String str, Controller controller, ConnectionFactory connectionFactory) {
        this(str, controller, connectionFactory, new SegmentInputStreamFactoryImpl(controller, connectionFactory), new SegmentOutputStreamFactoryImpl(controller, connectionFactory), new ConditionalOutputStreamFactoryImpl(controller, connectionFactory), new SegmentMetadataClientFactoryImpl(controller, connectionFactory));
    }

    @VisibleForTesting
    public ClientFactoryImpl(String str, Controller controller, ConnectionFactory connectionFactory, SegmentInputStreamFactory segmentInputStreamFactory, SegmentOutputStreamFactory segmentOutputStreamFactory, ConditionalOutputStreamFactory conditionalOutputStreamFactory, SegmentMetadataClientFactory segmentMetadataClientFactory) {
        this.watermarkReaderThreads = ExecutorServiceHelpers.newScheduledThreadPool(getThreadPoolSize(), "WatermarkReader");
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(controller);
        Preconditions.checkNotNull(segmentInputStreamFactory);
        Preconditions.checkNotNull(segmentOutputStreamFactory);
        Preconditions.checkNotNull(conditionalOutputStreamFactory);
        Preconditions.checkNotNull(segmentMetadataClientFactory);
        this.scope = str;
        this.controller = controller;
        this.connectionFactory = connectionFactory;
        this.inFactory = segmentInputStreamFactory;
        this.outFactory = segmentOutputStreamFactory;
        this.condFactory = conditionalOutputStreamFactory;
        this.metaFactory = segmentMetadataClientFactory;
    }

    @Override // io.pravega.client.ClientFactory, io.pravega.client.EventStreamClientFactory
    public <T> EventStreamWriter<T> createEventWriter(String str, Serializer<T> serializer, EventWriterConfig eventWriterConfig) {
        return createEventWriter(UUID.randomUUID().toString(), str, serializer, eventWriterConfig);
    }

    @Override // io.pravega.client.EventStreamClientFactory
    public <T> EventStreamWriter<T> createEventWriter(String str, String str2, Serializer<T> serializer, EventWriterConfig eventWriterConfig) {
        log.info("Creating writer: {} for stream: {} with configuration: {}", new Object[]{str, str2, eventWriterConfig});
        StreamImpl streamImpl = new StreamImpl(this.scope, str2);
        return new EventStreamWriterImpl(streamImpl, str, this.controller, this.outFactory, serializer, eventWriterConfig, ExecutorServiceHelpers.getShrinkingExecutor(1, 100, "ScalingRetransmition-" + streamImpl.getScopedName()), this.connectionFactory.getInternalExecutor());
    }

    @Override // io.pravega.client.EventStreamClientFactory
    public <T> TransactionalEventStreamWriter<T> createTransactionalEventWriter(String str, String str2, Serializer<T> serializer, EventWriterConfig eventWriterConfig) {
        log.info("Creating transactional writer:{} for stream: {} with configuration: {}", new Object[]{str, str2, eventWriterConfig});
        return new TransactionalEventStreamWriterImpl(new StreamImpl(this.scope, str2), str, this.controller, this.outFactory, serializer, eventWriterConfig, this.connectionFactory.getInternalExecutor());
    }

    @Override // io.pravega.client.EventStreamClientFactory
    public <T> TransactionalEventStreamWriter<T> createTransactionalEventWriter(String str, Serializer<T> serializer, EventWriterConfig eventWriterConfig) {
        return createTransactionalEventWriter(UUID.randomUUID().toString(), str, serializer, eventWriterConfig);
    }

    @Override // io.pravega.client.ClientFactory, io.pravega.client.EventStreamClientFactory
    public <T> EventStreamReader<T> createReader(String str, String str2, Serializer<T> serializer, ReaderConfig readerConfig) {
        log.info("Creating reader: {} under readerGroup: {} with configuration: {}", new Object[]{str, str2, readerConfig});
        return createReader(str, str2, serializer, readerConfig, System::nanoTime, System::currentTimeMillis);
    }

    @VisibleForTesting
    public <T> EventStreamReader<T> createReader(String str, String str2, Serializer<T> serializer, ReaderConfig readerConfig, Supplier<Long> supplier, Supplier<Long> supplier2) {
        log.info("Creating reader: {} under readerGroup: {} with configuration: {}", new Object[]{str, str2, readerConfig});
        ReaderGroupStateManager readerGroupStateManager = new ReaderGroupStateManager(str, createStateSynchronizer(NameUtils.getStreamForReaderGroup(str2), new ReaderGroupManagerImpl.ReaderGroupStateUpdatesSerializer(), new ReaderGroupManagerImpl.ReaderGroupStateInitSerializer(), SynchronizerConfig.builder().build()), this.controller, supplier);
        readerGroupStateManager.initializeReader(readerConfig.getInitialAllocationDelay());
        ImmutableMap.Builder builder = ImmutableMap.builder();
        if (!readerConfig.isDisableTimeWindows()) {
            for (Stream stream : readerGroupStateManager.getStreams()) {
                builder.put(stream, new WatermarkReaderImpl(stream, createRevisionedStreamClient(getSegmentForRevisionedClient(stream.getScope(), StreamSegmentNameUtils.getMarkForStream(stream.getStreamName())), new WatermarkSerializer(), SynchronizerConfig.builder().readBufferSize(4096).build()), this.watermarkReaderThreads));
            }
        }
        return new EventStreamReaderImpl(this.inFactory, this.metaFactory, serializer, readerGroupStateManager, new Orderer(), supplier2, readerConfig, builder.build(), this.controller);
    }

    @Override // io.pravega.client.ClientFactory, io.pravega.client.SynchronizerClientFactory
    public <T> RevisionedStreamClient<T> createRevisionedStreamClient(String str, Serializer<T> serializer, SynchronizerConfig synchronizerConfig) {
        log.info("Creating revisioned stream client for stream: {} with synchronizer configuration: {}", str, synchronizerConfig);
        return createRevisionedStreamClient(getSegmentForRevisionedClient(this.scope, str), serializer, synchronizerConfig);
    }

    private <T> RevisionedStreamClient<T> createRevisionedStreamClient(Segment segment, Serializer<T> serializer, SynchronizerConfig synchronizerConfig) {
        EventSegmentReader createEventReaderForSegment = this.inFactory.createEventReaderForSegment(segment, synchronizerConfig.getReadBufferSize());
        DelegationTokenProvider create = DelegationTokenProviderFactory.create((String) Futures.getAndHandleExceptions(this.controller.getOrRefreshDelegationTokenFor(segment.getScope(), segment.getStreamName()), RuntimeException::new), this.controller, segment);
        return new RevisionedStreamClientImpl(segment, createEventReaderForSegment, this.outFactory, this.condFactory.createConditionalOutputStream(segment, create, synchronizerConfig.getEventWriterConfig()), this.metaFactory.createSegmentMetadataClient(segment, create), serializer, synchronizerConfig.getEventWriterConfig(), create);
    }

    @Override // io.pravega.client.ClientFactory, io.pravega.client.SynchronizerClientFactory
    public <StateT extends Revisioned, UpdateT extends Update<StateT>, InitT extends InitialUpdate<StateT>> StateSynchronizer<StateT> createStateSynchronizer(String str, Serializer<UpdateT> serializer, Serializer<InitT> serializer2, SynchronizerConfig synchronizerConfig) {
        log.info("Creating state synchronizer with stream: {} and configuration: {}", str, synchronizerConfig);
        UpdateOrInitSerializer updateOrInitSerializer = new UpdateOrInitSerializer(serializer, serializer2);
        Segment segmentForRevisionedClient = getSegmentForRevisionedClient(this.scope, str);
        return new StateSynchronizerImpl(segmentForRevisionedClient, createRevisionedStreamClient(segmentForRevisionedClient, updateOrInitSerializer, synchronizerConfig));
    }

    private Segment getSegmentForRevisionedClient(String str, String str2) {
        StreamSegments streamSegments = (StreamSegments) Futures.getAndHandleExceptions(this.controller.getCurrentSegments(str, str2), InvalidStreamException::new);
        if (streamSegments == null || streamSegments.getSegments().size() == 0) {
            throw new InvalidStreamException("Stream does not exist: " + str2);
        }
        return streamSegments.getSegmentForKey(0.0d);
    }

    @Override // io.pravega.client.ClientFactory
    @Deprecated
    public BatchClientFactoryImpl createBatchClient() {
        return new BatchClientFactoryImpl(this.controller, this.connectionFactory);
    }

    @Override // io.pravega.client.ClientFactory
    @Deprecated
    public ByteStreamClientImpl createByteStreamClient() {
        return new ByteStreamClientImpl(this.scope, this.controller, this.connectionFactory);
    }

    @Override // io.pravega.client.ClientFactory, java.lang.AutoCloseable
    public void close() {
        this.controller.close();
        this.connectionFactory.close();
    }

    private int getThreadPoolSize() {
        String property = System.getProperty("pravega.client.internal.threadpool.size", null);
        return property != null ? Integer.parseInt(property) : Runtime.getRuntime().availableProcessors();
    }
}
