/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.commitlog;

import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.internal.shaded.com.google.common.base.Suppliers;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.ThroughputLimiter;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.SerializableUtils;
import java.io.Serializable;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import lombok.Generated;

public class CommitLogReaders {
    public static CommitLogReader withThroughputLimit(CommitLogReader delegate, @Nullable ThroughputLimiter limiter) {
        if (limiter != null) {
            return new LimitedCommitLogReader(delegate, limiter);
        }
        return delegate;
    }

    private CommitLogReaders() {
    }

    private static class ForwardingLogObserver
    implements CommitLogObserver {
        private final CommitLogObserver delegate;

        public ForwardingLogObserver(CommitLogObserver delegate) {
            this.delegate = delegate;
        }

        public String toString() {
            return "ForwardingLogObserver{delegate=" + this.delegate + '}';
        }

        @Override
        @Generated
        public void onRepartition(CommitLogObserver.OnRepartitionContext context) {
            this.delegate.onRepartition(context);
        }

        @Override
        @Generated
        public void onIdle(CommitLogObserver.OnIdleContext context) {
            this.delegate.onIdle(context);
        }

        @Override
        @Generated
        public void onCompleted() {
            this.delegate.onCompleted();
        }

        @Override
        @Generated
        public void onCancelled() {
            this.delegate.onCancelled();
        }

        @Override
        @Generated
        public boolean onError(Throwable error) {
            return this.delegate.onError(error);
        }

        @Override
        @Generated
        public boolean onException(Exception exception) {
            return this.delegate.onException(exception);
        }

        @Override
        @Generated
        public boolean onFatalError(Error error) {
            return this.delegate.onFatalError(error);
        }

        @Override
        @Generated
        public boolean onNext(StreamElement ingest, CommitLogObserver.OnNextContext context) {
            return this.delegate.onNext(ingest, context);
        }
    }

    @VisibleForTesting
    public static class LimitedCommitLogReader
    extends ForwardingCommitLogReader {
        private final ThroughputLimiter limiter;
        private final Supplier<List<Partition>> availablePartitions;

        public LimitedCommitLogReader(CommitLogReader delegate, ThroughputLimiter limiter) {
            super(delegate);
            this.limiter = (ThroughputLimiter)SerializableUtils.clone((Serializable)((Serializable)Objects.requireNonNull(limiter)));
            this.availablePartitions = Suppliers.memoize(delegate::getPartitions);
        }

        @Override
        public ObserveHandle observe(String name, Position position, CommitLogObserver observer) {
            return super.observe(name, position, LimitedCommitLogReader.throughputLimited(this.limiter, (Collection<Partition>)this.availablePartitions.get(), observer));
        }

        @Override
        public ObserveHandle observePartitions(String name, Collection<Partition> partitions, Position position, boolean stopAtCurrent, CommitLogObserver observer) {
            return super.observePartitions(name, partitions, position, stopAtCurrent, LimitedCommitLogReader.throughputLimited(this.limiter, partitions, observer));
        }

        @Override
        public ObserveHandle observeBulk(String name, Position position, boolean stopAtCurrent, CommitLogObserver observer) {
            return super.observeBulk(name, position, stopAtCurrent, LimitedCommitLogReader.throughputLimited(this.limiter, (Collection<Partition>)this.availablePartitions.get(), observer));
        }

        @Override
        public ObserveHandle observeBulkPartitions(String name, Collection<Partition> partitions, Position position, boolean stopAtCurrent, CommitLogObserver observer) {
            return super.observeBulkPartitions(name, partitions, position, stopAtCurrent, LimitedCommitLogReader.throughputLimited(this.limiter, partitions, observer));
        }

        @Override
        public ObserveHandle observeBulkOffsets(Collection<Offset> offsets, boolean stopAtCurrent, CommitLogObserver observer) {
            return super.observeBulkOffsets(offsets, stopAtCurrent, LimitedCommitLogReader.throughputLimited(this.limiter, (Collection<Partition>)this.availablePartitions.get(), observer));
        }

        @Override
        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("limiter", (Object)this.limiter).add("delegate", (Object)this.getDelegate().toString()).toString();
        }

        @Override
        public CommitLogReader.Factory<?> asFactory() {
            CommitLogReader.Factory<?> delegateFactory = super.asFactory();
            ThroughputLimiter limiter = this.limiter;
            return (CommitLogReader.Factory & Serializable)repo -> CommitLogReaders.withThroughputLimit((CommitLogReader)delegateFactory.apply(repo), limiter);
        }

        private static CommitLogObserver throughputLimited(ThroughputLimiter readerLimiter, Collection<Partition> readerPartitions, CommitLogObserver delegate) {
            final ThroughputLimiter limiter = (ThroughputLimiter)SerializableUtils.clone((Serializable)readerLimiter);
            final ArrayList<Partition> partitions = new ArrayList<Partition>(readerPartitions);
            return new ForwardingLogObserver(delegate){
                long watermark;
                {
                    super(delegate);
                    this.watermark = Long.MIN_VALUE;
                }

                @Override
                public void onCompleted() {
                    try {
                        super.onCompleted();
                    }
                    finally {
                        limiter.close();
                    }
                }

                @Override
                public void onCancelled() {
                    try {
                        super.onCancelled();
                    }
                    finally {
                        limiter.close();
                    }
                }

                @Override
                public boolean onError(Throwable error) {
                    try {
                        boolean bl = super.onError(error);
                        return bl;
                    }
                    finally {
                        limiter.close();
                    }
                }

                @Override
                public boolean onNext(StreamElement ingest, CommitLogObserver.OnNextContext context) {
                    if (ExceptionUtils.ignoringInterrupted(this::waitIfNecessary)) {
                        return false;
                    }
                    this.watermark = context.getWatermark();
                    return super.onNext(ingest, context);
                }

                private void waitIfNecessary() throws InterruptedException {
                    Duration pauseTime = limiter.getPauseTime(this.getLimiterContext());
                    if (!pauseTime.equals(Duration.ZERO)) {
                        TimeUnit.MILLISECONDS.sleep(pauseTime.toMillis());
                    }
                }

                @Override
                public void onRepartition(CommitLogObserver.OnRepartitionContext context) {
                    super.onRepartition(context);
                    partitions.clear();
                    partitions.addAll(context.partitions());
                }

                @Override
                public void onIdle(CommitLogObserver.OnIdleContext context) {
                    if (limiter.getPauseTime(this.getLimiterContext()).isZero()) {
                        super.onIdle(context);
                    }
                }

                private ThroughputLimiter.Context getLimiterContext() {
                    return new ThroughputLimiter.Context(){

                        public Collection<Partition> getConsumedPartitions() {
                            return partitions;
                        }

                        public long getMinWatermark() {
                            return watermark;
                        }
                    };
                }
            };
        }

        @Generated
        public ThroughputLimiter getLimiter() {
            return this.limiter;
        }
    }

    private static class ForwardingCommitLogReader
    implements CommitLogReader {
        private final CommitLogReader delegate;

        private ForwardingCommitLogReader(CommitLogReader delegate) {
            this.delegate = Objects.requireNonNull(delegate);
        }

        public String toString() {
            return this.delegate.toString();
        }

        @Override
        public URI getUri() {
            return this.delegate.getUri();
        }

        @Override
        public List<Partition> getPartitions() {
            return this.delegate.getPartitions();
        }

        @Override
        public ObserveHandle observe(String name, Position position, CommitLogObserver observer) {
            return this.delegate.observe(name, position, observer);
        }

        @Override
        public ObserveHandle observePartitions(String name, Collection<Partition> partitions, Position position, boolean stopAtCurrent, CommitLogObserver observer) {
            return this.delegate.observePartitions(name, partitions, position, stopAtCurrent, observer);
        }

        @Override
        public ObserveHandle observeBulk(String name, Position position, boolean stopAtCurrent, CommitLogObserver observer) {
            return this.delegate.observeBulk(name, position, stopAtCurrent, observer);
        }

        @Override
        public ObserveHandle observeBulkPartitions(String name, Collection<Partition> partitions, Position position, boolean stopAtCurrent, CommitLogObserver observer) {
            return this.delegate.observeBulkPartitions(name, partitions, position, stopAtCurrent, observer);
        }

        @Override
        public ObserveHandle observeBulkOffsets(Collection<Offset> offsets, boolean stopAtCurrent, CommitLogObserver observer) {
            return this.delegate.observeBulkOffsets(offsets, stopAtCurrent, observer);
        }

        @Override
        public CommitLogReader.Factory<?> asFactory() {
            return this.delegate.asFactory();
        }

        @Override
        public boolean hasExternalizableOffsets() {
            return this.delegate.hasExternalizableOffsets();
        }

        @Generated
        public CommitLogReader getDelegate() {
            return this.delegate;
        }
    }
}

