package cz.o2.proxima.beam.direct.io;

import cz.o2.proxima.beam.direct.io.BatchRestrictionTracker;
import cz.o2.proxima.beam.direct.io.BlockingQueueLogObserver;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.batch.ObserveHandle;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.ExceptionUtils;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:cz/o2/proxima/beam/direct/io/BatchLogRead.class */
public class BatchLogRead extends PTransform<PBegin, PCollection<StreamElement>> {
    private final List<AttributeDescriptor<?>> attributes;
    private final long limit;
    private final RepositoryFactory repoFactory;
    private final BatchLogReader.Factory<?> readerFactory;
    private final long startStamp;
    private final long endStamp;

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.BoundedPerElement
    /* loaded from: input_file:cz/o2/proxima/beam/direct/io/BatchLogRead$BatchLogReadFn.class */
    public class BatchLogReadFn extends DoFn<byte[], StreamElement> {
        private final List<AttributeDescriptor<?>> attributes;
        private final RepositoryFactory repositoryFactory;
        private final BatchLogReader.Factory<?> readerFactory;
        private final long limit;

        private BatchLogReadFn(List<AttributeDescriptor<?>> list, long j, RepositoryFactory repositoryFactory, BatchLogReader.Factory<?> factory) {
            this.attributes = (List) Objects.requireNonNull(list);
            this.repositoryFactory = repositoryFactory;
            this.readerFactory = factory;
            this.limit = j;
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation process(RestrictionTracker<BatchRestrictionTracker.PartitionList, Partition> restrictionTracker, DoFn.OutputReceiver<StreamElement> outputReceiver, ManualWatermarkEstimator<Instant> manualWatermarkEstimator) {
            if (((BatchRestrictionTracker.PartitionList) restrictionTracker.currentRestriction()).isEmpty()) {
                return DoFn.ProcessContinuation.stop();
            }
            manualWatermarkEstimator.setWatermark(((BatchRestrictionTracker.PartitionList) restrictionTracker.currentRestriction()).getMinTimestamp());
            while (!((BatchRestrictionTracker.PartitionList) restrictionTracker.currentRestriction()).isFinished()) {
                BatchRestrictionTracker.PartitionList partitionList = (BatchRestrictionTracker.PartitionList) Objects.requireNonNull(restrictionTracker.currentRestriction());
                Partition partition = (Partition) Objects.requireNonNull(partitionList.getFirstPartition());
                BlockingQueueLogObserver.BatchLogObserver newObserver = BatchLogRead.this.newObserver("observer-" + partition.getId(), partitionList.getTotalLimit());
                if (!restrictionTracker.tryClaim(partition)) {
                    return DoFn.ProcessContinuation.stop();
                }
                try {
                    ObserveHandle startObserve = startObserve(partition, newObserver);
                    Throwable th = null;
                    while (newObserver.getWatermark() < Long.MAX_VALUE && !partitionList.isLimitConsumed()) {
                        try {
                            try {
                                StreamElement takeBlocking = newObserver.takeBlocking(30L, TimeUnit.SECONDS);
                                if (takeBlocking != null) {
                                    partitionList.reportConsumed();
                                    outputReceiver.outputWithTimestamp(takeBlocking, Instant.ofEpochMilli(takeBlocking.getStamp()));
                                }
                            } catch (Throwable th2) {
                                th = th2;
                                throw th2;
                            }
                        } catch (Throwable th3) {
                            if (startObserve != null) {
                                if (th != null) {
                                    try {
                                        startObserve.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    startObserve.close();
                                }
                            }
                            throw th3;
                        }
                    }
                    Optional.ofNullable(newObserver.getError()).ifPresent(ExceptionUtils::rethrowAsIllegalStateException);
                    if (startObserve != null) {
                        if (0 != 0) {
                            try {
                                startObserve.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            startObserve.close();
                        }
                    }
                    manualWatermarkEstimator.setWatermark(((BatchRestrictionTracker.PartitionList) restrictionTracker.currentRestriction()).getMinTimestamp());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            return ((BatchRestrictionTracker.PartitionList) restrictionTracker.currentRestriction()).isFinished() ? DoFn.ProcessContinuation.stop() : DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L));
        }

        private ObserveHandle startObserve(Partition partition, BatchLogObserver batchLogObserver) {
            return ((BatchLogReader) this.readerFactory.apply(this.repositoryFactory.apply())).observe(Collections.singletonList(partition), this.attributes, batchLogObserver);
        }

        @DoFn.GetInitialRestriction
        public BatchRestrictionTracker.PartitionList initialRestriction() {
            return BatchRestrictionTracker.PartitionList.initialRestriction(((BatchLogReader) this.readerFactory.apply(BatchLogRead.this.repoFactory.apply())).getPartitions(BatchLogRead.this.startStamp, BatchLogRead.this.endStamp), this.limit);
        }

        @DoFn.SplitRestriction
        public void splitRestriction(@DoFn.Restriction BatchRestrictionTracker.PartitionList partitionList, DoFn.OutputReceiver<BatchRestrictionTracker.PartitionList> outputReceiver) {
            if (partitionList.isEmpty()) {
                outputReceiver.output(partitionList);
            } else {
                partitionList.getPartitions().forEach(partition -> {
                    outputReceiver.output(BatchRestrictionTracker.PartitionList.ofSinglePartition(partition, partitionList.getTotalLimit()));
                });
            }
        }

        @DoFn.GetRestrictionCoder
        public Coder<BatchRestrictionTracker.PartitionList> getRestrictionCoder() {
            return SerializableCoder.of(BatchRestrictionTracker.PartitionList.class);
        }

        @DoFn.NewWatermarkEstimator
        public WatermarkEstimators.Manual newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
            return SDFUtils.rangeCheckedManualEstimator(instant);
        }

        @DoFn.GetInitialWatermarkEstimatorState
        public Instant getInitialWatermarkEstimatorState() {
            return BoundedWindow.TIMESTAMP_MIN_VALUE;
        }

        @DoFn.GetWatermarkEstimatorStateCoder
        public Coder<Instant> getWatermarkEstimatorStateCoder() {
            return InstantCoder.of();
        }
    }

    public static BatchLogRead of(List<AttributeDescriptor<?>> list, long j, Repository repository, BatchLogReader batchLogReader) {
        return of(list, j, repository, batchLogReader, Long.MIN_VALUE, Long.MAX_VALUE);
    }

    public static BatchLogRead of(List<AttributeDescriptor<?>> list, long j, Repository repository, BatchLogReader batchLogReader, long j2, long j3) {
        return of(list, j, repository.asFactory(), batchLogReader, j2, j3);
    }

    public static BatchLogRead of(List<AttributeDescriptor<?>> list, long j, RepositoryFactory repositoryFactory, BatchLogReader batchLogReader, long j2, long j3) {
        return new BatchLogRead(list, j, repositoryFactory, batchLogReader.asFactory(), j2, j3);
    }

    @VisibleForTesting
    BatchLogRead(List<AttributeDescriptor<?>> list, long j, RepositoryFactory repositoryFactory, BatchLogReader.Factory<?> factory, long j2, long j3) {
        this.attributes = Lists.newArrayList((Iterable) Objects.requireNonNull(list));
        this.limit = j;
        this.repoFactory = repositoryFactory;
        this.readerFactory = factory;
        this.startStamp = j2;
        this.endStamp = j3;
    }

    public PCollection<StreamElement> expand(PBegin pBegin) {
        return pBegin.apply(Impulse.create()).apply(ParDo.of(new BatchLogReadFn(this.attributes, this.limit, this.repoFactory, this.readerFactory)));
    }

    @VisibleForTesting
    BlockingQueueLogObserver.BatchLogObserver newObserver(String str, long j) {
        return BlockingQueueLogObserver.createBatchLogObserver(str, j, Long.MIN_VALUE);
    }
}
