/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.batch;

import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.batch.ObserveHandle;
import cz.o2.proxima.direct.batch.Offset;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.ThroughputLimiter;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.SerializableUtils;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import lombok.Generated;

public class BatchLogReaders {
    public static BatchLogReader withLimitedThroughput(BatchLogReader delegate, @Nullable ThroughputLimiter limiter) {
        if (limiter != null) {
            return new ThroughputLimitedBatchLogReader(delegate, limiter);
        }
        return delegate;
    }

    private BatchLogReaders() {
    }

    private static class ThroughputLimitedBatchLogObserver
    extends ForwardingBatchLogObserver {
        private final Collection<Partition> assignedPartitions;
        private final ThroughputLimiter limiter;
        private long watermark = Long.MIN_VALUE;

        public ThroughputLimitedBatchLogObserver(BatchLogObserver delegate, Collection<Partition> assignedPartitions, ThroughputLimiter limiter) {
            super(delegate);
            this.assignedPartitions = new ArrayList<Partition>(assignedPartitions);
            this.limiter = (ThroughputLimiter)SerializableUtils.clone((Serializable)((Serializable)Objects.requireNonNull(limiter)));
        }

        @Override
        public void onCompleted() {
            try {
                super.onCompleted();
            }
            finally {
                this.limiter.close();
            }
        }

        @Override
        public boolean onError(Throwable error) {
            try {
                boolean bl = super.onError(error);
                return bl;
            }
            finally {
                this.limiter.close();
            }
        }

        @Override
        public void onCancelled() {
            try {
                super.onCancelled();
            }
            finally {
                this.limiter.close();
            }
        }

        @Override
        public boolean onNext(StreamElement element) {
            if (ExceptionUtils.ignoringInterrupted(this::waitIfNecessary)) {
                return false;
            }
            return super.onNext(element);
        }

        @Override
        public boolean onNext(StreamElement element, BatchLogObserver.OnNextContext context) {
            this.watermark = context.getWatermark();
            if (ExceptionUtils.ignoringInterrupted(this::waitIfNecessary)) {
                return false;
            }
            return super.onNext(element, context);
        }

        private void waitIfNecessary() throws InterruptedException {
            Duration pause = this.limiter.getPauseTime(this.getLimiterContext());
            if (!pause.equals(Duration.ZERO)) {
                TimeUnit.MILLISECONDS.sleep(pause.toMillis());
            }
        }

        private ThroughputLimiter.Context getLimiterContext() {
            return new ThroughputLimiter.Context(){

                public Collection<Partition> getConsumedPartitions() {
                    return assignedPartitions;
                }

                public long getMinWatermark() {
                    return watermark;
                }
            };
        }
    }

    private static class ThroughputLimitedBatchLogReader
    extends ForwardingLimitedBatchLogReader {
        private final ThroughputLimiter limiter;

        public ThroughputLimitedBatchLogReader(BatchLogReader delegate, ThroughputLimiter limiter) {
            super(delegate);
            this.limiter = limiter;
        }

        @Override
        public ObserveHandle observe(List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer) {
            return super.observe(partitions, attributes, this.throughputLimited(observer, partitions));
        }

        @Override
        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("limiter", (Object)this.limiter).add("delegate", (Object)super.toString()).toString();
        }

        @Override
        public BatchLogReader.Factory<?> asFactory() {
            BatchLogReader.Factory<?> superFactory = super.asFactory();
            ThroughputLimiter limiter = this.limiter;
            return (BatchLogReader.Factory & Serializable)repo -> new ThroughputLimitedBatchLogReader((BatchLogReader)superFactory.apply(repo), limiter);
        }

        private BatchLogObserver throughputLimited(BatchLogObserver delegate, List<Partition> consumedPartitions) {
            return new ThroughputLimitedBatchLogObserver(delegate, consumedPartitions, this.limiter);
        }
    }

    private static class ForwardingLimitedBatchLogReader
    implements BatchLogReader {
        private final BatchLogReader delegate;

        private ForwardingLimitedBatchLogReader(BatchLogReader delegate) {
            this.delegate = delegate;
        }

        public String toString() {
            return this.delegate.toString();
        }

        @Override
        @Generated
        public List<Partition> getPartitions() {
            return this.delegate.getPartitions();
        }

        @Override
        @Generated
        public List<Partition> getPartitions(long startStamp) {
            return this.delegate.getPartitions(startStamp);
        }

        @Override
        @Generated
        public List<Partition> getPartitions(long startStamp, long endStamp) {
            return this.delegate.getPartitions(startStamp, endStamp);
        }

        @Override
        @Generated
        public ObserveHandle observe(List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer) {
            return this.delegate.observe(partitions, attributes, observer);
        }

        @Override
        @Generated
        public ObserveHandle observeOffsets(List<Offset> offsets, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer) {
            return this.delegate.observeOffsets(offsets, attributes, observer);
        }

        @Override
        @Generated
        public BatchLogReader.Factory<?> asFactory() {
            return this.delegate.asFactory();
        }
    }

    public static class ForwardingBatchLogObserver
    implements BatchLogObserver {
        private final BatchLogObserver delegate;

        public ForwardingBatchLogObserver(BatchLogObserver delegate) {
            this.delegate = delegate;
        }

        @Override
        @Generated
        public boolean onNext(StreamElement element) {
            return this.delegate.onNext(element);
        }

        @Override
        @Generated
        public boolean onNext(StreamElement element, BatchLogObserver.OnNextContext context) {
            return this.delegate.onNext(element, context);
        }

        @Override
        @Generated
        public void onCompleted() {
            this.delegate.onCompleted();
        }

        @Override
        @Generated
        public void onCancelled() {
            this.delegate.onCancelled();
        }

        @Override
        @Generated
        public boolean onError(Throwable error) {
            return this.delegate.onError(error);
        }

        @Override
        @Generated
        public boolean onException(Exception exception) {
            return this.delegate.onException(exception);
        }

        @Override
        @Generated
        public boolean onFatalError(Error error) {
            return this.delegate.onFatalError(error);
        }
    }
}

