package cz.o2.proxima.direct.commitlog;

import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.functional.BiConsumer;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.functional.UnaryFunction;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.Pair;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/commitlog/CommitLogObservers.class */
public class CommitLogObservers {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(CommitLogObservers.class);

    /* loaded from: input_file:cz/o2/proxima/direct/commitlog/CommitLogObservers$AbstractSortedLogObserver.class */
    private static abstract class AbstractSortedLogObserver implements CommitLogObserver {
        final CommitLogObserver upstream;
        final long allowedLatenessMs;
        final BiConsumer<StreamElement, CommitLogObserver.OnNextContext> latecomerConsumer;

        public AbstractSortedLogObserver(CommitLogObserver commitLogObserver, Duration duration, BiConsumer<StreamElement, CommitLogObserver.OnNextContext> biConsumer) {
            this.upstream = commitLogObserver;
            this.allowedLatenessMs = duration.toMillis();
            this.latecomerConsumer = biConsumer;
        }

        @Override // cz.o2.proxima.direct.LogObserver
        public void onCompleted() {
            onCompletedDrainQueue();
            this.upstream.onCompleted();
        }

        @Override // cz.o2.proxima.direct.LogObserver
        public void onCancelled() {
            reassignPartitions(Collections.emptyList());
            this.upstream.onCancelled();
        }

        @Override // cz.o2.proxima.direct.LogObserver
        public boolean onError(Throwable th) {
            return this.upstream.onError(th);
        }

        @Override // cz.o2.proxima.direct.LogObserver
        public boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
            if (getWatermark(onNextContext) - this.allowedLatenessMs <= streamElement.getStamp()) {
                enqueue(streamElement, onNextContext);
            } else {
                this.latecomerConsumer.accept(streamElement, onNextContext);
            }
            return onNextDrainQueue(onNextContext);
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObserver
        public void onRepartition(CommitLogObserver.OnRepartitionContext onRepartitionContext) {
            reassignPartitions(onRepartitionContext.partitions());
            this.upstream.onRepartition(onRepartitionContext);
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObserver
        public void onIdle(CommitLogObserver.OnIdleContext onIdleContext) {
            onIdleDrainQueue(onIdleContext);
            this.upstream.onIdle(onIdleContext);
        }

        abstract void enqueue(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext);

        abstract void onCompletedDrainQueue();

        abstract boolean onNextDrainQueue(CommitLogObserver.OnNextContext onNextContext);

        abstract void onIdleDrainQueue(CommitLogObserver.OnIdleContext onIdleContext);

        protected abstract void reassignPartitions(Collection<Partition> collection);

        abstract long getWatermark(CommitLogObserver.OnNextContext onNextContext);

        boolean drainQueue(PriorityQueue<Pair<StreamElement, CommitLogObserver.OnNextContext>> priorityQueue, long j) {
            boolean z = true;
            while (!priorityQueue.isEmpty() && z) {
                z = false;
                if (((StreamElement) priorityQueue.peek().getFirst()).getStamp() < j) {
                    Pair<StreamElement, CommitLogObserver.OnNextContext> poll = priorityQueue.poll();
                    if (!this.upstream.onNext((StreamElement) poll.getFirst(), (CommitLogObserver.OnNextContext) poll.getSecond())) {
                        priorityQueue.clear();
                        return false;
                    }
                    z = true;
                }
            }
            return true;
        }
    }

    /* loaded from: input_file:cz/o2/proxima/direct/commitlog/CommitLogObservers$ForwardingObserver.class */
    public static class ForwardingObserver implements CommitLogObserver {
        private final CommitLogObserver delegate;

        /* JADX INFO: Access modifiers changed from: protected */
        public ForwardingObserver(CommitLogObserver commitLogObserver) {
            this.delegate = commitLogObserver;
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObserver
        @Generated
        public void onRepartition(CommitLogObserver.OnRepartitionContext onRepartitionContext) {
            this.delegate.onRepartition(onRepartitionContext);
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObserver
        @Generated
        public void onIdle(CommitLogObserver.OnIdleContext onIdleContext) {
            this.delegate.onIdle(onIdleContext);
        }

        @Override // cz.o2.proxima.direct.LogObserver
        @Generated
        public void onCompleted() {
            this.delegate.onCompleted();
        }

        @Override // cz.o2.proxima.direct.LogObserver
        @Generated
        public void onCancelled() {
            this.delegate.onCancelled();
        }

        @Override // cz.o2.proxima.direct.LogObserver
        @Generated
        public boolean onError(Throwable th) {
            return this.delegate.onError(th);
        }

        @Override // cz.o2.proxima.direct.LogObserver
        @Generated
        public boolean onException(Exception exc) {
            return this.delegate.onException(exc);
        }

        @Override // cz.o2.proxima.direct.LogObserver
        @Generated
        public boolean onFatalError(Error error) {
            return this.delegate.onFatalError(error);
        }

        @Override // cz.o2.proxima.direct.LogObserver
        @Generated
        public boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
            return this.delegate.onNext(streamElement, onNextContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/direct/commitlog/CommitLogObservers$SinglePartitionSortedLogObserver.class */
    public static class SinglePartitionSortedLogObserver extends AbstractSortedLogObserver {
        private final Map<Integer, AtomicLong> watermarkMap;
        private final Map<Integer, PriorityQueue<Pair<StreamElement, CommitLogObserver.OnNextContext>>> queueMap;

        public SinglePartitionSortedLogObserver(CommitLogObserver commitLogObserver, Duration duration, BiConsumer<StreamElement, CommitLogObserver.OnNextContext> biConsumer) {
            super(commitLogObserver, duration, biConsumer);
            this.watermarkMap = new HashMap();
            this.queueMap = new HashMap();
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObservers.AbstractSortedLogObserver
        void enqueue(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
            this.watermarkMap.values().forEach(atomicLong -> {
                atomicLong.accumulateAndGet(onNextContext.getWatermark(), Math::max);
            });
            this.watermarkMap.get(Integer.valueOf(onNextContext.getPartition().getId())).accumulateAndGet(streamElement.getStamp(), Math::max);
            this.queueMap.get(Integer.valueOf(onNextContext.getPartition().getId())).add(Pair.of(streamElement, onNextContext));
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObservers.AbstractSortedLogObserver
        boolean onNextDrainQueue(CommitLogObserver.OnNextContext onNextContext) {
            return ((Boolean) this.queueMap.entrySet().stream().map(entry -> {
                return Boolean.valueOf(drainQueue((PriorityQueue) entry.getValue(), this.watermarkMap.get(entry.getKey()).get() - this.allowedLatenessMs));
            }).reduce((v0, v1) -> {
                return Boolean.logicalAnd(v0, v1);
            }).orElse(true)).booleanValue();
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObservers.AbstractSortedLogObserver
        void onIdleDrainQueue(CommitLogObserver.OnIdleContext onIdleContext) {
            this.queueMap.values().stream().forEach(priorityQueue -> {
                drainQueue(priorityQueue, onIdleContext.getWatermark() - this.allowedLatenessMs);
            });
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObservers.AbstractSortedLogObserver
        protected void reassignPartitions(Collection<Partition> collection) {
            this.queueMap.clear();
            this.watermarkMap.clear();
            collection.forEach(partition -> {
                this.queueMap.put(Integer.valueOf(partition.getId()), new PriorityQueue<>(Comparator.comparing(pair -> {
                    return Long.valueOf(((StreamElement) pair.getFirst()).getStamp());
                })));
                this.watermarkMap.put(Integer.valueOf(partition.getId()), new AtomicLong(Long.MIN_VALUE + this.allowedLatenessMs));
            });
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObservers.AbstractSortedLogObserver
        void onCompletedDrainQueue() {
            this.queueMap.values().stream().forEach(priorityQueue -> {
                drainQueue(priorityQueue, Long.MAX_VALUE);
            });
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObservers.AbstractSortedLogObserver
        long getWatermark(CommitLogObserver.OnNextContext onNextContext) {
            return this.watermarkMap.get(Integer.valueOf(onNextContext.getPartition().getId())).get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/direct/commitlog/CommitLogObservers$SortedLogObserver.class */
    public static class SortedLogObserver extends AbstractSortedLogObserver {
        private final PriorityQueue<Pair<StreamElement, CommitLogObserver.OnNextContext>> queue;

        public SortedLogObserver(CommitLogObserver commitLogObserver, Duration duration, BiConsumer<StreamElement, CommitLogObserver.OnNextContext> biConsumer) {
            super(commitLogObserver, duration, biConsumer);
            this.queue = new PriorityQueue<>(Comparator.comparing(pair -> {
                return Long.valueOf(((StreamElement) pair.getFirst()).getStamp());
            }));
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObservers.AbstractSortedLogObserver
        void enqueue(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
            this.queue.add(Pair.of(streamElement, onNextContext));
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObservers.AbstractSortedLogObserver
        boolean onNextDrainQueue(CommitLogObserver.OnNextContext onNextContext) {
            return drainQueue(this.queue, onNextContext.getWatermark());
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObservers.AbstractSortedLogObserver
        void onIdleDrainQueue(CommitLogObserver.OnIdleContext onIdleContext) {
            drainQueue(this.queue, onIdleContext.getWatermark() - this.allowedLatenessMs);
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObservers.AbstractSortedLogObserver
        protected void reassignPartitions(Collection<Partition> collection) {
            this.queue.clear();
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObservers.AbstractSortedLogObserver
        void onCompletedDrainQueue() {
            drainQueue(this.queue, Long.MAX_VALUE);
        }

        @Override // cz.o2.proxima.direct.commitlog.CommitLogObservers.AbstractSortedLogObserver
        long getWatermark(CommitLogObserver.OnNextContext onNextContext) {
            return onNextContext.getWatermark();
        }
    }

    /* loaded from: input_file:cz/o2/proxima/direct/commitlog/CommitLogObservers$TerminationStrategy.class */
    public enum TerminationStrategy {
        STOP_PROCESSING,
        RETHROW,
        EXIT
    }

    public static CommitLogObserver withNumRetriedExceptions(String str, int i, CommitLogObserver commitLogObserver) {
        return withNumRetriedExceptions(str, i, th -> {
            return TerminationStrategy.RETHROW;
        }, commitLogObserver);
    }

    public static CommitLogObserver withNumRetriedExceptions(String str, int i, UnaryFunction<Throwable, TerminationStrategy> unaryFunction, CommitLogObserver commitLogObserver) {
        return new RetryableLogObserver(str, i, unaryFunction, false, commitLogObserver);
    }

    public static CommitLogObserver withNumRetriedThrowables(String str, int i, UnaryFunction<Throwable, TerminationStrategy> unaryFunction, CommitLogObserver commitLogObserver) {
        return new RetryableLogObserver(str, i, unaryFunction, true, commitLogObserver);
    }

    public static CommitLogObserver withSortBuffer(CommitLogObserver commitLogObserver, Duration duration) {
        return withSortBuffer(commitLogObserver, duration, lateDataLoggingConsumer(duration));
    }

    public static CommitLogObserver withSortBuffer(CommitLogObserver commitLogObserver, Duration duration, Consumer<StreamElement> consumer) {
        return withSortBuffer(commitLogObserver, duration, (BiConsumer<StreamElement, CommitLogObserver.OnNextContext>) (streamElement, onNextContext) -> {
            consumer.accept(streamElement);
            onNextContext.confirm();
        });
    }

    public static CommitLogObserver withSortBuffer(CommitLogObserver commitLogObserver, Duration duration, BiConsumer<StreamElement, CommitLogObserver.OnNextContext> biConsumer) {
        return new SortedLogObserver(commitLogObserver, duration, biConsumer);
    }

    public static CommitLogObserver withSortBufferWithinPartition(CommitLogObserver commitLogObserver, Duration duration) {
        return withSortBufferWithinPartition(commitLogObserver, duration, lateDataLoggingConsumer(duration));
    }

    public static CommitLogObserver withSortBufferWithinPartition(CommitLogObserver commitLogObserver, Duration duration, Consumer<StreamElement> consumer) {
        return withSortBufferWithinPartition(commitLogObserver, duration, (BiConsumer<StreamElement, CommitLogObserver.OnNextContext>) (streamElement, onNextContext) -> {
            consumer.accept(streamElement);
            onNextContext.confirm();
        });
    }

    public static CommitLogObserver withSortBufferWithinPartition(CommitLogObserver commitLogObserver, Duration duration, BiConsumer<StreamElement, CommitLogObserver.OnNextContext> biConsumer) {
        return new SinglePartitionSortedLogObserver(commitLogObserver, duration, biConsumer);
    }

    public static CommitLogObserver synchronizedObserver(final CommitLogObserver commitLogObserver) {
        return new CommitLogObserver() { // from class: cz.o2.proxima.direct.commitlog.CommitLogObservers.1
            @Override // cz.o2.proxima.direct.LogObserver
            public synchronized void onCompleted() {
                CommitLogObserver.this.onCompleted();
            }

            @Override // cz.o2.proxima.direct.LogObserver
            public synchronized void onCancelled() {
                CommitLogObserver.this.onCancelled();
            }

            @Override // cz.o2.proxima.direct.LogObserver
            public synchronized boolean onError(Throwable th) {
                return CommitLogObserver.this.onError(th);
            }

            @Override // cz.o2.proxima.direct.LogObserver
            public synchronized boolean onException(Exception exc) {
                return CommitLogObserver.this.onException(exc);
            }

            @Override // cz.o2.proxima.direct.LogObserver
            public synchronized boolean onFatalError(Error error) {
                return CommitLogObserver.this.onFatalError(error);
            }

            @Override // cz.o2.proxima.direct.LogObserver
            public synchronized boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
                return CommitLogObserver.this.onNext(streamElement, onNextContext);
            }

            @Override // cz.o2.proxima.direct.commitlog.CommitLogObserver
            public synchronized void onRepartition(CommitLogObserver.OnRepartitionContext onRepartitionContext) {
                CommitLogObserver.this.onRepartition(onRepartitionContext);
            }

            @Override // cz.o2.proxima.direct.commitlog.CommitLogObserver
            public synchronized void onIdle(CommitLogObserver.OnIdleContext onIdleContext) {
                CommitLogObserver.this.onIdle(onIdleContext);
            }
        };
    }

    private CommitLogObservers() {
    }

    private static Consumer<StreamElement> lateDataLoggingConsumer(Duration duration) {
        long millis = duration.toMillis();
        return streamElement -> {
            log.warn("Element {} dropped due to allowed lateness {}", streamElement.dump(), Long.valueOf(millis));
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1154294250:
                if (implMethodName.equals("lambda$withSortBufferWithinPartition$bb76729b$1")) {
                    z = true;
                    break;
                }
                break;
            case -223577934:
                if (implMethodName.equals("lambda$lateDataLoggingConsumer$feb8bb2f$1")) {
                    z = 3;
                    break;
                }
                break;
            case 241668083:
                if (implMethodName.equals("lambda$withSortBuffer$bb76729b$1")) {
                    z = false;
                    break;
                }
                break;
            case 1887699436:
                if (implMethodName.equals("lambda$withNumRetriedExceptions$928fc1ec$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/BiConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/commitlog/CommitLogObservers") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/functional/Consumer;Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/direct/commitlog/CommitLogObserver$OnNextContext;)V")) {
                    Consumer consumer = (Consumer) serializedLambda.getCapturedArg(0);
                    return (streamElement, onNextContext) -> {
                        consumer.accept(streamElement);
                        onNextContext.confirm();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/BiConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/commitlog/CommitLogObservers") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/functional/Consumer;Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/direct/commitlog/CommitLogObserver$OnNextContext;)V")) {
                    Consumer consumer2 = (Consumer) serializedLambda.getCapturedArg(0);
                    return (streamElement2, onNextContext2) -> {
                        consumer2.accept(streamElement2);
                        onNextContext2.confirm();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/UnaryFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/commitlog/CommitLogObservers") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Throwable;)Lcz/o2/proxima/direct/commitlog/CommitLogObservers$TerminationStrategy;")) {
                    return th -> {
                        return TerminationStrategy.RETHROW;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/commitlog/CommitLogObservers") && serializedLambda.getImplMethodSignature().equals("(JLcz/o2/proxima/storage/StreamElement;)V")) {
                    long longValue = ((Long) serializedLambda.getCapturedArg(0)).longValue();
                    return streamElement3 -> {
                        log.warn("Element {} dropped due to allowed lateness {}", streamElement3.dump(), Long.valueOf(longValue));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
