package cz.o2.proxima.beam.core.direct.io;

import cz.o2.proxima.beam.core.direct.io.BlockingQueueLogObserver;
import cz.o2.proxima.beam.core.direct.io.OffsetRestrictionTracker;
import cz.o2.proxima.core.repository.Repository;
import cz.o2.proxima.core.repository.RepositoryFactory;
import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.storage.commitlog.Position;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.direct.core.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.commitlog.ObserveHandle;
import cz.o2.proxima.direct.core.commitlog.Offset;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.com.google.common.cache.Cache;
import cz.o2.proxima.internal.com.google.common.cache.CacheBuilder;
import cz.o2.proxima.internal.com.google.common.collect.Lists;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/beam/core/direct/io/CommitLogRead.class */
public class CommitLogRead extends PTransform<PBegin, PCollection<StreamElement>> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(CommitLogRead.class);
    private final String observeName;
    private final Position position;
    private final long limit;
    private final boolean bounded;
    private final RepositoryFactory repoFactory;
    private final CommitLogReader.Factory<?> readerFactory;

    /* loaded from: input_file:cz/o2/proxima/beam/core/direct/io/CommitLogRead$AbstractCommitLogReadFn.class */
    private abstract class AbstractCommitLogReadFn extends DoFn<byte[], StreamElement> {

        @Nullable
        protected final String name;
        protected final Position position;
        protected final RepositoryFactory repositoryFactory;
        protected final CommitLogReader.Factory<?> readerFactory;
        protected final long limit;
        private transient CommitLogReader reader;
        protected transient Cache<Integer, ObserveHandle> runningObserves;
        protected transient Map<Integer, BlockingQueueLogObserver.CommitLogObserver> observers;
        private Map<Partition, Offset> lastSeekedOffsets = new HashMap();
        private transient boolean externalizableOffsets = false;

        public AbstractCommitLogReadFn(@Nullable String str, Position position, long j, RepositoryFactory repositoryFactory, CommitLogReader.Factory<?> factory) {
            this.name = str;
            this.position = position;
            this.limit = j;
            this.repositoryFactory = repositoryFactory;
            this.readerFactory = factory;
        }

        public DoFn.ProcessContinuation process(RestrictionTracker<OffsetRestrictionTracker.OffsetRange, Offset> restrictionTracker, DoFn.OutputReceiver<StreamElement> outputReceiver, ManualWatermarkEstimator<?> manualWatermarkEstimator, DoFn.BundleFinalizer bundleFinalizer) {
            AtomicReference atomicReference = new AtomicReference();
            bundleFinalizer.afterBundleCommit(BoundedWindow.TIMESTAMP_MAX_VALUE, () -> {
                Optional.ofNullable((BlockingQueueLogObserver.UnifiedContext) atomicReference.getAndSet(null)).ifPresent((v0) -> {
                    v0.confirm();
                });
            });
            Partition partition = ((OffsetRestrictionTracker.OffsetRange) restrictionTracker.currentRestriction()).getPartition();
            boolean z = false;
            if (this.externalizableOffsets) {
                z = seekToRequestedOffsetsIfNeeded(restrictionTracker, partition);
            }
            BlockingQueueLogObserver.CommitLogObserver commitLogObserver = this.observers.get(Integer.valueOf(partition.getId()));
            if (commitLogObserver == null) {
                startObserve(this.name, partition, (OffsetRestrictionTracker.OffsetRange) restrictionTracker.currentRestriction());
                return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(50L));
            }
            CommitLogRead.updateWatermark(manualWatermarkEstimator, commitLogObserver);
            while (!Thread.currentThread().isInterrupted() && commitLogObserver.getWatermark() < Long.MAX_VALUE && commitLogObserver.peekElement()) {
                BlockingQueueLogObserver.UnifiedContext unifiedContext = (BlockingQueueLogObserver.UnifiedContext) Objects.requireNonNull(commitLogObserver.getPeekContext());
                Offset offset = (Offset) Objects.requireNonNull(unifiedContext.getOffset());
                if (z) {
                    z = false;
                    if (offset.equals(((OffsetRestrictionTracker.OffsetRange) restrictionTracker.currentRestriction()).getStartOffset())) {
                        commitLogObserver.take();
                    }
                }
                if (!restrictionTracker.tryClaim(offset)) {
                    return DoFn.ProcessContinuation.stop();
                }
                StreamElement streamElement = (StreamElement) Objects.requireNonNull(commitLogObserver.take());
                outputReceiver.outputWithTimestamp(streamElement, Instant.ofEpochMilli(streamElement.getStamp()));
                atomicReference.set(unifiedContext);
                CommitLogRead.updateWatermark(manualWatermarkEstimator, commitLogObserver);
            }
            Optional.ofNullable(commitLogObserver.getError()).ifPresent(ExceptionUtils::rethrowAsIllegalStateException);
            return ((OffsetRestrictionTracker.OffsetRange) restrictionTracker.currentRestriction()).isLimitConsumed() || (commitLogObserver.getWatermark() > Long.MAX_VALUE ? 1 : (commitLogObserver.getWatermark() == Long.MAX_VALUE ? 0 : -1)) >= 0 ? DoFn.ProcessContinuation.stop() : DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L));
        }

        private boolean seekToRequestedOffsetsIfNeeded(RestrictionTracker<OffsetRestrictionTracker.OffsetRange, Offset> restrictionTracker, Partition partition) {
            BlockingQueueLogObserver.CommitLogObserver commitLogObserver = this.observers.get(Integer.valueOf(partition.getId()));
            if (commitLogObserver != null) {
                Offset offset = commitLogObserver.getLastReadContext() == null ? this.lastSeekedOffsets.get(partition) : (Offset) Objects.requireNonNull(commitLogObserver.getLastReadContext().getOffset());
                if (!Objects.equals(offset, ((OffsetRestrictionTracker.OffsetRange) restrictionTracker.currentRestriction()).getStartOffset())) {
                    CommitLogRead.log.info("Closing reader due to non-matching offsets {} and {}", offset, ((OffsetRestrictionTracker.OffsetRange) restrictionTracker.currentRestriction()).getStartOffset());
                    closeHandle(partition.getId());
                }
            }
            return !((OffsetRestrictionTracker.OffsetRange) restrictionTracker.currentRestriction()).isStartInclusive();
        }

        protected void closeHandle(int i) {
            this.runningObserves.invalidate(Integer.valueOf(i));
            Optional.ofNullable(this.observers.remove(Integer.valueOf(i))).ifPresent(commitLogObserver -> {
                commitLogObserver.stop(true);
            });
        }

        private void startObserve(@Nullable String str, Partition partition, OffsetRestrictionTracker.OffsetRange offsetRange) {
            ObserveHandle observeBulkPartitions;
            if (this.reader == null) {
                this.reader = (CommitLogReader) this.readerFactory.apply(this.repositoryFactory.apply());
            }
            this.externalizableOffsets = this.reader.hasExternalizableOffsets();
            BlockingQueueLogObserver.CommitLogObserver newObserver = CommitLogRead.this.newObserver(str, offsetRange);
            Preconditions.checkState(this.observers.put(Integer.valueOf(partition.getId()), newObserver) == null);
            if (offsetRange.getStartOffset() != null) {
                observeBulkPartitions = observeBulkOffsets(offsetRange, this.reader, newObserver);
                this.lastSeekedOffsets.put(partition, offsetRange.getStartOffset());
            } else {
                observeBulkPartitions = observeBulkPartitions(str, offsetRange, this.reader, newObserver);
            }
            this.runningObserves.put(Integer.valueOf(partition.getId()), observeBulkPartitions);
        }

        abstract ObserveHandle observeBulkOffsets(OffsetRestrictionTracker.OffsetRange offsetRange, CommitLogReader commitLogReader, BlockingQueueLogObserver.CommitLogObserver commitLogObserver);

        abstract ObserveHandle observeBulkPartitions(@Nullable String str, OffsetRestrictionTracker.OffsetRange offsetRange, CommitLogReader commitLogReader, BlockingQueueLogObserver.CommitLogObserver commitLogObserver);

        public void setup() {
            this.observers = new HashMap();
            this.runningObserves = CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterAccess(5L, TimeUnit.SECONDS).removalListener(removalNotification -> {
                if (removalNotification.wasEvicted()) {
                    CommitLogRead.log.info("Closing observer {} due to expiry", removalNotification.getKey());
                    Optional.ofNullable(this.observers.remove(removalNotification.getKey())).ifPresent(commitLogObserver -> {
                        commitLogObserver.stop(true);
                    });
                    ((ObserveHandle) removalNotification.getValue()).close();
                }
            }).build();
        }

        public void tearDown() {
            Lists.newArrayList(this.observers.keySet()).forEach((v1) -> {
                closeHandle(v1);
            });
            this.reader = null;
        }

        void splitRestriction(OffsetRestrictionTracker.OffsetRange offsetRange, DoFn.OutputReceiver<OffsetRestrictionTracker.OffsetRange> outputReceiver) {
            if (!offsetRange.isInitial()) {
                outputReceiver.output(offsetRange);
                return;
            }
            List partitions = ((CommitLogReader) this.readerFactory.apply(this.repositoryFactory.apply())).getPartitions();
            List list = (List) partitions.stream().map(partition -> {
                return OffsetRestrictionTracker.OffsetRange.readingPartition(partition, this.position, offsetRange.getTotalLimit() / partitions.size(), offsetRange.isBounded());
            }).collect(Collectors.toList());
            CommitLogRead.log.info("Split initial restriction {} to {} splits", offsetRange, Integer.valueOf(list.size()));
            Objects.requireNonNull(outputReceiver);
            list.forEach((v1) -> {
                r1.output(v1);
            });
        }

        public Coder<OffsetRestrictionTracker.OffsetRange> getRestrictionCoder() {
            return SerializableCoder.of(OffsetRestrictionTracker.OffsetRange.class);
        }

        public WatermarkEstimators.Manual newWatermarkEstimator(Instant instant) {
            return SDFUtils.rangeCheckedManualEstimator(instant);
        }

        public Instant getInitialWatermarkEstimatorState() {
            return BoundedWindow.TIMESTAMP_MIN_VALUE;
        }

        public Coder<Instant> getWatermarkEstimatorStateCoder() {
            return InstantCoder.of();
        }

        public TypeDescriptor<StreamElement> getOutputTypeDescriptor() {
            return TypeDescriptor.of(StreamElement.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.BoundedPerElement
    /* loaded from: input_file:cz/o2/proxima/beam/core/direct/io/CommitLogRead$BoundedCommitLogReadFn.class */
    public class BoundedCommitLogReadFn extends AbstractCommitLogReadFn {
        private BoundedCommitLogReadFn(@Nullable String str, Position position, long j, RepositoryFactory repositoryFactory, CommitLogReader.Factory<?> factory) {
            super(str, position, j, repositoryFactory, factory);
        }

        @DoFn.ProcessElement
        public void processBounded(RestrictionTracker<OffsetRestrictionTracker.OffsetRange, Offset> restrictionTracker, DoFn.OutputReceiver<StreamElement> outputReceiver, ManualWatermarkEstimator<?> manualWatermarkEstimator, DoFn.BundleFinalizer bundleFinalizer) {
            DoFn.ProcessContinuation process;
            do {
                process = process(restrictionTracker, outputReceiver, manualWatermarkEstimator, bundleFinalizer);
            } while (process.shouldResume());
            Preconditions.checkState(!process.shouldResume(), "Should have terminated processing of the whole restriction, got %s", process);
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.Setup
        public void setup() {
            super.setup();
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.Teardown
        public void tearDown() {
            super.tearDown();
        }

        @DoFn.GetInitialRestriction
        public OffsetRestrictionTracker.OffsetRange initialRestriction() {
            return OffsetRestrictionTracker.OffsetRange.initialRestriction(this.limit, true);
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.SplitRestriction
        public void splitRestriction(@DoFn.Restriction OffsetRestrictionTracker.OffsetRange offsetRange, DoFn.OutputReceiver<OffsetRestrictionTracker.OffsetRange> outputReceiver) {
            super.splitRestriction(offsetRange, outputReceiver);
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.GetRestrictionCoder
        public Coder<OffsetRestrictionTracker.OffsetRange> getRestrictionCoder() {
            return super.getRestrictionCoder();
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.NewWatermarkEstimator
        public WatermarkEstimators.Manual newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
            return super.newWatermarkEstimator(instant);
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.GetInitialWatermarkEstimatorState
        public Instant getInitialWatermarkEstimatorState() {
            return super.getInitialWatermarkEstimatorState();
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.GetWatermarkEstimatorStateCoder
        public Coder<Instant> getWatermarkEstimatorStateCoder() {
            return super.getWatermarkEstimatorStateCoder();
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        ObserveHandle observeBulkOffsets(OffsetRestrictionTracker.OffsetRange offsetRange, CommitLogReader commitLogReader, BlockingQueueLogObserver.CommitLogObserver commitLogObserver) {
            return commitLogReader.observeBulkOffsets(Collections.singletonList(offsetRange.getStartOffset()), true, commitLogObserver);
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        ObserveHandle observeBulkPartitions(String str, OffsetRestrictionTracker.OffsetRange offsetRange, CommitLogReader commitLogReader, BlockingQueueLogObserver.CommitLogObserver commitLogObserver) {
            return commitLogReader.observeBulkPartitions(str, Collections.singletonList(offsetRange.getPartition()), offsetRange.getPosition(), true, commitLogObserver);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.UnboundedPerElement
    /* loaded from: input_file:cz/o2/proxima/beam/core/direct/io/CommitLogRead$UnboundedCommitLogReadFn.class */
    public class UnboundedCommitLogReadFn extends AbstractCommitLogReadFn {
        private UnboundedCommitLogReadFn(@Nullable String str, Position position, long j, RepositoryFactory repositoryFactory, CommitLogReader.Factory<?> factory) {
            super(str, position, j, repositoryFactory, factory);
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation processUnbounded(RestrictionTracker<OffsetRestrictionTracker.OffsetRange, Offset> restrictionTracker, DoFn.OutputReceiver<StreamElement> outputReceiver, ManualWatermarkEstimator<?> manualWatermarkEstimator, DoFn.BundleFinalizer bundleFinalizer) {
            return process(restrictionTracker, outputReceiver, manualWatermarkEstimator, bundleFinalizer);
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.Setup
        public void setup() {
            super.setup();
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.Teardown
        public void tearDown() {
            super.tearDown();
        }

        @DoFn.GetInitialRestriction
        public OffsetRestrictionTracker.OffsetRange initialRestriction() {
            return OffsetRestrictionTracker.OffsetRange.initialRestriction(this.limit, false);
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.SplitRestriction
        public void splitRestriction(@DoFn.Restriction OffsetRestrictionTracker.OffsetRange offsetRange, DoFn.OutputReceiver<OffsetRestrictionTracker.OffsetRange> outputReceiver) {
            super.splitRestriction(offsetRange, outputReceiver);
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.GetRestrictionCoder
        public Coder<OffsetRestrictionTracker.OffsetRange> getRestrictionCoder() {
            return super.getRestrictionCoder();
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.NewWatermarkEstimator
        public WatermarkEstimators.Manual newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
            return super.newWatermarkEstimator(instant);
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.GetInitialWatermarkEstimatorState
        public Instant getInitialWatermarkEstimatorState() {
            return super.getInitialWatermarkEstimatorState();
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        @DoFn.GetWatermarkEstimatorStateCoder
        public Coder<Instant> getWatermarkEstimatorStateCoder() {
            return super.getWatermarkEstimatorStateCoder();
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        ObserveHandle observeBulkOffsets(OffsetRestrictionTracker.OffsetRange offsetRange, CommitLogReader commitLogReader, BlockingQueueLogObserver.CommitLogObserver commitLogObserver) {
            return commitLogReader.observeBulkOffsets(Collections.singletonList(offsetRange.getStartOffset()), commitLogObserver);
        }

        @Override // cz.o2.proxima.beam.core.direct.io.CommitLogRead.AbstractCommitLogReadFn
        ObserveHandle observeBulkPartitions(String str, OffsetRestrictionTracker.OffsetRange offsetRange, CommitLogReader commitLogReader, BlockingQueueLogObserver.CommitLogObserver commitLogObserver) {
            return commitLogReader.observeBulkPartitions(str, Collections.singletonList(offsetRange.getPartition()), offsetRange.getPosition(), commitLogObserver);
        }
    }

    public static CommitLogRead of(String str, Position position, long j, Repository repository, CommitLogReader commitLogReader) {
        return of(str, position, j, repository.asFactory(), commitLogReader);
    }

    public static CommitLogRead of(String str, Position position, long j, RepositoryFactory repositoryFactory, CommitLogReader commitLogReader) {
        return new CommitLogRead(str, position, j, false, repositoryFactory, commitLogReader);
    }

    public static CommitLogRead ofBounded(String str, long j, RepositoryFactory repositoryFactory, CommitLogReader commitLogReader) {
        return new CommitLogRead(str, Position.OLDEST, j, true, repositoryFactory, commitLogReader);
    }

    private static void updateWatermark(ManualWatermarkEstimator<?> manualWatermarkEstimator, BlockingQueueLogObserver.CommitLogObserver commitLogObserver) {
        long watermark = commitLogObserver.getWatermark();
        if (watermark > BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
            manualWatermarkEstimator.setWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        } else if (watermark > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
            manualWatermarkEstimator.setWatermark(Instant.ofEpochMilli(watermark));
        }
    }

    @VisibleForTesting
    CommitLogRead(String str, Position position, long j, boolean z, RepositoryFactory repositoryFactory, CommitLogReader commitLogReader) {
        this.observeName = str;
        this.position = position;
        this.limit = j;
        this.bounded = z;
        this.repoFactory = repositoryFactory;
        this.readerFactory = commitLogReader.asFactory();
    }

    public PCollection<StreamElement> expand(PBegin pBegin) {
        return pBegin.apply(Impulse.create()).apply(ParDo.of(this.bounded ? new BoundedCommitLogReadFn(this.observeName, this.position, this.limit, this.repoFactory, this.readerFactory) : new UnboundedCommitLogReadFn(this.observeName, this.position, this.limit, this.repoFactory, this.readerFactory)));
    }

    @VisibleForTesting
    BlockingQueueLogObserver.CommitLogObserver newObserver(@Nullable String str, OffsetRestrictionTracker.OffsetRange offsetRange) {
        return BlockingQueueLogObserver.createCommitLogObserver(str != null ? str : UUID.randomUUID().toString(), offsetRange.getTotalLimit(), Long.MIN_VALUE);
    }
}
