/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.commitlog;

import cz.o2.proxima.direct.LogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.commitlog.RetryableLogObserver;
import cz.o2.proxima.functional.BiConsumer;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.functional.UnaryFunction;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.Pair;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLogObservers {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(CommitLogObservers.class);

    public static CommitLogObserver withNumRetriedExceptions(String name, int numRetries, CommitLogObserver delegate) {
        return CommitLogObservers.withNumRetriedExceptions(name, numRetries, (UnaryFunction<Throwable, TerminationStrategy>)(UnaryFunction & Serializable)throwable -> TerminationStrategy.RETHROW, delegate);
    }

    public static CommitLogObserver withNumRetriedExceptions(String name, int numRetries, UnaryFunction<Throwable, TerminationStrategy> onRetriesExhausted, CommitLogObserver delegate) {
        return new RetryableLogObserver(name, numRetries, onRetriesExhausted, false, delegate);
    }

    public static CommitLogObserver withNumRetriedThrowables(String name, int numRetries, UnaryFunction<Throwable, TerminationStrategy> onRetriesExhausted, CommitLogObserver delegate) {
        return new RetryableLogObserver(name, numRetries, onRetriesExhausted, true, delegate);
    }

    public static CommitLogObserver withSortBuffer(CommitLogObserver upstream, Duration allowedLateness) {
        return CommitLogObservers.withSortBuffer(upstream, allowedLateness, CommitLogObservers.lateDataLoggingConsumer(allowedLateness));
    }

    public static CommitLogObserver withSortBuffer(CommitLogObserver upstream, Duration allowedLateness, Consumer<StreamElement> latecomerConsumer) {
        return CommitLogObservers.withSortBuffer(upstream, allowedLateness, (BiConsumer<StreamElement, CommitLogObserver.OnNextContext>)(BiConsumer & Serializable)(el, ctx) -> {
            latecomerConsumer.accept(el);
            ctx.confirm();
        });
    }

    public static CommitLogObserver withSortBuffer(CommitLogObserver upstream, Duration allowedLateness, BiConsumer<StreamElement, CommitLogObserver.OnNextContext> latecomerConsumer) {
        return new SortedLogObserver(upstream, allowedLateness, latecomerConsumer);
    }

    public static CommitLogObserver withSortBufferWithinPartition(CommitLogObserver upstream, Duration allowedLateness) {
        return CommitLogObservers.withSortBufferWithinPartition(upstream, allowedLateness, CommitLogObservers.lateDataLoggingConsumer(allowedLateness));
    }

    public static CommitLogObserver withSortBufferWithinPartition(CommitLogObserver upstream, Duration allowedLateness, Consumer<StreamElement> latecomerConsumer) {
        return CommitLogObservers.withSortBufferWithinPartition(upstream, allowedLateness, (BiConsumer<StreamElement, CommitLogObserver.OnNextContext>)(BiConsumer & Serializable)(el, ctx) -> {
            latecomerConsumer.accept(el);
            ctx.confirm();
        });
    }

    public static CommitLogObserver withSortBufferWithinPartition(CommitLogObserver upstream, Duration allowedLateness, BiConsumer<StreamElement, CommitLogObserver.OnNextContext> latecomerConsumer) {
        return new SinglePartitionSortedLogObserver(upstream, allowedLateness, latecomerConsumer);
    }

    public static CommitLogObserver synchronizedObserver(final CommitLogObserver delegate) {
        return new CommitLogObserver(){

            @Override
            public synchronized void onCompleted() {
                delegate.onCompleted();
            }

            @Override
            public synchronized void onCancelled() {
                delegate.onCancelled();
            }

            @Override
            public synchronized boolean onError(Throwable error) {
                return delegate.onError(error);
            }

            @Override
            public synchronized boolean onException(Exception exception) {
                return delegate.onException(exception);
            }

            @Override
            public synchronized boolean onFatalError(Error error) {
                return delegate.onFatalError(error);
            }

            @Override
            public synchronized boolean onNext(StreamElement ingest, CommitLogObserver.OnNextContext context) {
                return delegate.onNext(ingest, context);
            }

            @Override
            public synchronized void onRepartition(CommitLogObserver.OnRepartitionContext context) {
                delegate.onRepartition(context);
            }

            @Override
            public synchronized void onIdle(CommitLogObserver.OnIdleContext context) {
                delegate.onIdle(context);
            }
        };
    }

    private CommitLogObservers() {
    }

    private static Consumer<StreamElement> lateDataLoggingConsumer(Duration allowedLateness) {
        long allowedLatenessMs = allowedLateness.toMillis();
        return (Consumer & Serializable)el -> log.warn("Element {} dropped due to allowed lateness {}", (Object)el.dump(), (Object)allowedLatenessMs);
    }

    private static class SinglePartitionSortedLogObserver
    extends AbstractSortedLogObserver {
        private final Map<Integer, AtomicLong> watermarkMap = new HashMap<Integer, AtomicLong>();
        private final Map<Integer, PriorityQueue<Pair<StreamElement, CommitLogObserver.OnNextContext>>> queueMap = new HashMap<Integer, PriorityQueue<Pair<StreamElement, CommitLogObserver.OnNextContext>>>();

        public SinglePartitionSortedLogObserver(CommitLogObserver upstream, Duration allowedLateness, BiConsumer<StreamElement, CommitLogObserver.OnNextContext> latecomerConsumer) {
            super(upstream, allowedLateness, latecomerConsumer);
        }

        @Override
        void enqueue(StreamElement ingest, CommitLogObserver.OnNextContext context) {
            this.watermarkMap.values().forEach(w -> w.accumulateAndGet(context.getWatermark(), Math::max));
            this.watermarkMap.get(context.getPartition().getId()).accumulateAndGet(ingest.getStamp(), Math::max);
            this.queueMap.get(context.getPartition().getId()).add((Pair<StreamElement, CommitLogObserver.OnNextContext>)Pair.of((Object)ingest, (Object)context));
        }

        @Override
        boolean onNextDrainQueue(CommitLogObserver.OnNextContext context) {
            return this.queueMap.entrySet().stream().map(entry -> this.drainQueue((PriorityQueue)entry.getValue(), this.watermarkMap.get(entry.getKey()).get() - this.allowedLatenessMs)).reduce(Boolean::logicalAnd).orElse(true);
        }

        @Override
        void onIdleDrainQueue(CommitLogObserver.OnIdleContext context) {
            this.queueMap.values().stream().forEach(queue -> this.drainQueue((PriorityQueue<Pair<StreamElement, CommitLogObserver.OnNextContext>>)queue, context.getWatermark() - this.allowedLatenessMs));
        }

        @Override
        protected void reassignPartitions(Collection<Partition> partitions) {
            this.queueMap.clear();
            this.watermarkMap.clear();
            partitions.forEach(part -> {
                this.queueMap.put(part.getId(), new PriorityQueue<Pair>(Comparator.comparing(p -> ((StreamElement)p.getFirst()).getStamp())));
                this.watermarkMap.put(part.getId(), new AtomicLong(Long.MIN_VALUE + this.allowedLatenessMs));
            });
        }

        @Override
        void onCompletedDrainQueue() {
            this.queueMap.values().stream().forEach(queue -> this.drainQueue((PriorityQueue<Pair<StreamElement, CommitLogObserver.OnNextContext>>)queue, Long.MAX_VALUE));
        }

        @Override
        long getWatermark(CommitLogObserver.OnNextContext context) {
            return this.watermarkMap.get(context.getPartition().getId()).get();
        }
    }

    private static class SortedLogObserver
    extends AbstractSortedLogObserver {
        private final PriorityQueue<Pair<StreamElement, CommitLogObserver.OnNextContext>> queue = new PriorityQueue<Pair>(Comparator.comparing(p -> ((StreamElement)p.getFirst()).getStamp()));

        public SortedLogObserver(CommitLogObserver upstream, Duration allowedLateness, BiConsumer<StreamElement, CommitLogObserver.OnNextContext> latecomerConsumer) {
            super(upstream, allowedLateness, latecomerConsumer);
        }

        @Override
        void enqueue(StreamElement ingest, CommitLogObserver.OnNextContext context) {
            this.queue.add((Pair<StreamElement, CommitLogObserver.OnNextContext>)Pair.of((Object)ingest, (Object)context));
        }

        @Override
        boolean onNextDrainQueue(CommitLogObserver.OnNextContext context) {
            return this.drainQueue(this.queue, context.getWatermark());
        }

        @Override
        void onIdleDrainQueue(CommitLogObserver.OnIdleContext context) {
            this.drainQueue(this.queue, context.getWatermark() - this.allowedLatenessMs);
        }

        @Override
        protected void reassignPartitions(Collection<Partition> partitions) {
            this.queue.clear();
        }

        @Override
        void onCompletedDrainQueue() {
            this.drainQueue(this.queue, Long.MAX_VALUE);
        }

        @Override
        long getWatermark(CommitLogObserver.OnNextContext context) {
            return context.getWatermark();
        }
    }

    private static abstract class AbstractSortedLogObserver
    implements CommitLogObserver {
        final CommitLogObserver upstream;
        final long allowedLatenessMs;
        final BiConsumer<StreamElement, CommitLogObserver.OnNextContext> latecomerConsumer;

        public AbstractSortedLogObserver(CommitLogObserver upstream, Duration allowedLateness, BiConsumer<StreamElement, CommitLogObserver.OnNextContext> latecomerConsumer) {
            this.upstream = upstream;
            this.allowedLatenessMs = allowedLateness.toMillis();
            this.latecomerConsumer = latecomerConsumer;
        }

        @Override
        public void onCompleted() {
            this.onCompletedDrainQueue();
            this.upstream.onCompleted();
        }

        @Override
        public void onCancelled() {
            this.reassignPartitions(Collections.emptyList());
            this.upstream.onCancelled();
        }

        @Override
        public boolean onError(Throwable error) {
            return this.upstream.onError(error);
        }

        @Override
        public boolean onNext(StreamElement ingest, CommitLogObserver.OnNextContext context) {
            long watermark = this.getWatermark(context);
            if (watermark - this.allowedLatenessMs <= ingest.getStamp()) {
                this.enqueue(ingest, context);
            } else {
                this.latecomerConsumer.accept((Object)ingest, (Object)context);
            }
            return this.onNextDrainQueue(context);
        }

        @Override
        public void onRepartition(CommitLogObserver.OnRepartitionContext context) {
            this.reassignPartitions(context.partitions());
            this.upstream.onRepartition(context);
        }

        @Override
        public void onIdle(CommitLogObserver.OnIdleContext context) {
            this.onIdleDrainQueue(context);
            this.upstream.onIdle(context);
        }

        abstract void enqueue(StreamElement var1, CommitLogObserver.OnNextContext var2);

        abstract void onCompletedDrainQueue();

        abstract boolean onNextDrainQueue(CommitLogObserver.OnNextContext var1);

        abstract void onIdleDrainQueue(CommitLogObserver.OnIdleContext var1);

        protected abstract void reassignPartitions(Collection<Partition> var1);

        abstract long getWatermark(CommitLogObserver.OnNextContext var1);

        boolean drainQueue(PriorityQueue<Pair<StreamElement, CommitLogObserver.OnNextContext>> queue, long maxTimestamp) {
            boolean cont = true;
            while (!queue.isEmpty() && cont) {
                cont = false;
                if (((StreamElement)queue.peek().getFirst()).getStamp() >= maxTimestamp) continue;
                Pair<StreamElement, CommitLogObserver.OnNextContext> p = queue.poll();
                if (!this.upstream.onNext((StreamElement)p.getFirst(), (LogObserver.OnNextContext)p.getSecond())) {
                    queue.clear();
                    return false;
                }
                cont = true;
            }
            return true;
        }
    }

    public static class ForwardingObserver
    implements CommitLogObserver {
        private final CommitLogObserver delegate;

        protected ForwardingObserver(CommitLogObserver delegate) {
            this.delegate = delegate;
        }

        @Override
        @Generated
        public void onRepartition(CommitLogObserver.OnRepartitionContext context) {
            this.delegate.onRepartition(context);
        }

        @Override
        @Generated
        public void onIdle(CommitLogObserver.OnIdleContext context) {
            this.delegate.onIdle(context);
        }

        @Override
        @Generated
        public void onCompleted() {
            this.delegate.onCompleted();
        }

        @Override
        @Generated
        public void onCancelled() {
            this.delegate.onCancelled();
        }

        @Override
        @Generated
        public boolean onError(Throwable error) {
            return this.delegate.onError(error);
        }

        @Override
        @Generated
        public boolean onException(Exception exception) {
            return this.delegate.onException(exception);
        }

        @Override
        @Generated
        public boolean onFatalError(Error error) {
            return this.delegate.onFatalError(error);
        }

        @Override
        @Generated
        public boolean onNext(StreamElement ingest, CommitLogObserver.OnNextContext context) {
            return this.delegate.onNext(ingest, context);
        }
    }

    public static enum TerminationStrategy {
        STOP_PROCESSING,
        RETHROW,
        EXIT;

    }
}

