package cz.o2.proxima.direct.commitlog;

import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.functional.BiConsumer;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.Pair;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/commitlog/LogObservers.class */
public class LogObservers {
    private static final Logger log = LoggerFactory.getLogger(LogObservers.class);

    /* loaded from: input_file:cz/o2/proxima/direct/commitlog/LogObservers$AbstractSortedLogObserver.class */
    private static abstract class AbstractSortedLogObserver implements LogObserver {
        final LogObserver upstream;
        final long allowedLatenessMs;
        final BiConsumer<StreamElement, LogObserver.OnNextContext> latecomerConsumer;

        public AbstractSortedLogObserver(LogObserver logObserver, Duration duration, BiConsumer<StreamElement, LogObserver.OnNextContext> biConsumer) {
            this.upstream = logObserver;
            this.allowedLatenessMs = duration.toMillis();
            this.latecomerConsumer = biConsumer;
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObserver
        public void onCompleted() {
            onCompletedDrainQueue();
            this.upstream.onCompleted();
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObserver
        public void onCancelled() {
            reassignPartitions(Collections.emptyList());
            this.upstream.onCancelled();
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObserver
        public boolean onError(Throwable th) {
            return this.upstream.onError(th);
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObserver
        public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
            if (getWatermark(onNextContext) - this.allowedLatenessMs <= streamElement.getStamp()) {
                enqueue(streamElement, onNextContext);
            } else {
                this.latecomerConsumer.accept(streamElement, onNextContext);
            }
            return onNextDrainQueue(onNextContext);
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObserver
        public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
            reassignPartitions(onRepartitionContext.partitions());
            this.upstream.onRepartition(onRepartitionContext);
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObserver
        public void onIdle(LogObserver.OnIdleContext onIdleContext) {
            onIdleDrainQueue(onIdleContext);
            this.upstream.onIdle(onIdleContext);
        }

        abstract void enqueue(StreamElement streamElement, LogObserver.OnNextContext onNextContext);

        abstract void onCompletedDrainQueue();

        abstract boolean onNextDrainQueue(LogObserver.OnNextContext onNextContext);

        abstract void onIdleDrainQueue(LogObserver.OnIdleContext onIdleContext);

        protected abstract void reassignPartitions(Collection<Partition> collection);

        abstract long getWatermark(LogObserver.OnNextContext onNextContext);

        boolean drainQueue(PriorityQueue<Pair<StreamElement, LogObserver.OnNextContext>> priorityQueue, long j) {
            boolean z = true;
            while (!priorityQueue.isEmpty() && z) {
                z = false;
                if (((StreamElement) priorityQueue.peek().getFirst()).getStamp() < j) {
                    Pair<StreamElement, LogObserver.OnNextContext> poll = priorityQueue.poll();
                    if (!this.upstream.onNext((StreamElement) poll.getFirst(), (LogObserver.OnNextContext) poll.getSecond())) {
                        priorityQueue.clear();
                        return false;
                    }
                    z = true;
                }
            }
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/direct/commitlog/LogObservers$SinglePartitionSortedLogObserver.class */
    public static class SinglePartitionSortedLogObserver extends AbstractSortedLogObserver {
        private final Map<Integer, AtomicLong> watermarkMap;
        private final Map<Integer, PriorityQueue<Pair<StreamElement, LogObserver.OnNextContext>>> queueMap;

        public SinglePartitionSortedLogObserver(LogObserver logObserver, Duration duration, BiConsumer<StreamElement, LogObserver.OnNextContext> biConsumer) {
            super(logObserver, duration, biConsumer);
            this.watermarkMap = new HashMap();
            this.queueMap = new HashMap();
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObservers.AbstractSortedLogObserver
        void enqueue(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
            this.watermarkMap.values().forEach(atomicLong -> {
                atomicLong.accumulateAndGet(onNextContext.getWatermark(), Math::max);
            });
            this.watermarkMap.get(Integer.valueOf(onNextContext.getPartition().getId())).accumulateAndGet(streamElement.getStamp(), Math::max);
            this.queueMap.get(Integer.valueOf(onNextContext.getPartition().getId())).add(Pair.of(streamElement, onNextContext));
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObservers.AbstractSortedLogObserver
        boolean onNextDrainQueue(LogObserver.OnNextContext onNextContext) {
            return ((Boolean) this.queueMap.entrySet().stream().map(entry -> {
                return Boolean.valueOf(drainQueue((PriorityQueue) entry.getValue(), this.watermarkMap.get(entry.getKey()).get() - this.allowedLatenessMs));
            }).reduce((v0, v1) -> {
                return Boolean.logicalAnd(v0, v1);
            }).orElse(true)).booleanValue();
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObservers.AbstractSortedLogObserver
        void onIdleDrainQueue(LogObserver.OnIdleContext onIdleContext) {
            this.queueMap.values().stream().forEach(priorityQueue -> {
                drainQueue(priorityQueue, onIdleContext.getWatermark() - this.allowedLatenessMs);
            });
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObservers.AbstractSortedLogObserver
        protected void reassignPartitions(Collection<Partition> collection) {
            this.queueMap.clear();
            this.watermarkMap.clear();
            collection.forEach(partition -> {
                this.queueMap.put(Integer.valueOf(partition.getId()), new PriorityQueue<>(Comparator.comparing(pair -> {
                    return Long.valueOf(((StreamElement) pair.getFirst()).getStamp());
                })));
                this.watermarkMap.put(Integer.valueOf(partition.getId()), new AtomicLong(Long.MIN_VALUE + this.allowedLatenessMs));
            });
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObservers.AbstractSortedLogObserver
        void onCompletedDrainQueue() {
            this.queueMap.values().stream().forEach(priorityQueue -> {
                drainQueue(priorityQueue, Long.MAX_VALUE);
            });
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObservers.AbstractSortedLogObserver
        long getWatermark(LogObserver.OnNextContext onNextContext) {
            return this.watermarkMap.get(Integer.valueOf(onNextContext.getPartition().getId())).get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/direct/commitlog/LogObservers$SortedLogObserver.class */
    public static class SortedLogObserver extends AbstractSortedLogObserver {
        private final PriorityQueue<Pair<StreamElement, LogObserver.OnNextContext>> queue;

        public SortedLogObserver(LogObserver logObserver, Duration duration, BiConsumer<StreamElement, LogObserver.OnNextContext> biConsumer) {
            super(logObserver, duration, biConsumer);
            this.queue = new PriorityQueue<>(Comparator.comparing(pair -> {
                return Long.valueOf(((StreamElement) pair.getFirst()).getStamp());
            }));
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObservers.AbstractSortedLogObserver
        void enqueue(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
            this.queue.add(Pair.of(streamElement, onNextContext));
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObservers.AbstractSortedLogObserver
        boolean onNextDrainQueue(LogObserver.OnNextContext onNextContext) {
            return drainQueue(this.queue, onNextContext.getWatermark());
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObservers.AbstractSortedLogObserver
        void onIdleDrainQueue(LogObserver.OnIdleContext onIdleContext) {
            drainQueue(this.queue, onIdleContext.getWatermark() - this.allowedLatenessMs);
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObservers.AbstractSortedLogObserver
        protected void reassignPartitions(Collection<Partition> collection) {
            this.queue.clear();
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObservers.AbstractSortedLogObserver
        void onCompletedDrainQueue() {
            drainQueue(this.queue, Long.MAX_VALUE);
        }

        @Override // cz.o2.proxima.direct.commitlog.LogObservers.AbstractSortedLogObserver
        long getWatermark(LogObserver.OnNextContext onNextContext) {
            return onNextContext.getWatermark();
        }
    }

    public static LogObserver withSortBuffer(LogObserver logObserver, Duration duration) {
        return withSortBuffer(logObserver, duration, lateDataLoggingConsumer(duration));
    }

    public static LogObserver withSortBuffer(LogObserver logObserver, Duration duration, Consumer<StreamElement> consumer) {
        return withSortBuffer(logObserver, duration, (BiConsumer<StreamElement, LogObserver.OnNextContext>) (streamElement, onNextContext) -> {
            consumer.accept(streamElement);
            onNextContext.confirm();
        });
    }

    public static LogObserver withSortBuffer(LogObserver logObserver, Duration duration, BiConsumer<StreamElement, LogObserver.OnNextContext> biConsumer) {
        return new SortedLogObserver(logObserver, duration, biConsumer);
    }

    public static LogObserver withSortBufferWithinPartition(LogObserver logObserver, Duration duration) {
        return withSortBufferWithinPartition(logObserver, duration, lateDataLoggingConsumer(duration));
    }

    public static LogObserver withSortBufferWithinPartition(LogObserver logObserver, Duration duration, Consumer<StreamElement> consumer) {
        return withSortBufferWithinPartition(logObserver, duration, (BiConsumer<StreamElement, LogObserver.OnNextContext>) (streamElement, onNextContext) -> {
            consumer.accept(streamElement);
            onNextContext.confirm();
        });
    }

    public static LogObserver withSortBufferWithinPartition(LogObserver logObserver, Duration duration, BiConsumer<StreamElement, LogObserver.OnNextContext> biConsumer) {
        return new SinglePartitionSortedLogObserver(logObserver, duration, biConsumer);
    }

    private LogObservers() {
    }

    private static Consumer<StreamElement> lateDataLoggingConsumer(Duration duration) {
        long millis = duration.toMillis();
        return streamElement -> {
            log.warn("Element {} dropped due to allowed lateness {}", streamElement.dump(), Long.valueOf(millis));
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2021190603:
                if (implMethodName.equals("lambda$withSortBufferWithinPartition$289b9a7b$1")) {
                    z = false;
                    break;
                }
                break;
            case -625228270:
                if (implMethodName.equals("lambda$withSortBuffer$289b9a7b$1")) {
                    z = true;
                    break;
                }
                break;
            case -223577934:
                if (implMethodName.equals("lambda$lateDataLoggingConsumer$feb8bb2f$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/BiConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/commitlog/LogObservers") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/functional/Consumer;Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/direct/commitlog/LogObserver$OnNextContext;)V")) {
                    Consumer consumer = (Consumer) serializedLambda.getCapturedArg(0);
                    return (streamElement, onNextContext) -> {
                        consumer.accept(streamElement);
                        onNextContext.confirm();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/BiConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/commitlog/LogObservers") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/functional/Consumer;Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/direct/commitlog/LogObserver$OnNextContext;)V")) {
                    Consumer consumer2 = (Consumer) serializedLambda.getCapturedArg(0);
                    return (streamElement2, onNextContext2) -> {
                        consumer2.accept(streamElement2);
                        onNextContext2.confirm();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/commitlog/LogObservers") && serializedLambda.getImplMethodSignature().equals("(JLcz/o2/proxima/storage/StreamElement;)V")) {
                    long longValue = ((Long) serializedLambda.getCapturedArg(0)).longValue();
                    return streamElement3 -> {
                        log.warn("Element {} dropped due to allowed lateness {}", streamElement3.dump(), Long.valueOf(longValue));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
