/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.view;

import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.randomaccess.KeyValue;
import cz.o2.proxima.direct.randomaccess.RandomAccessReader;
import cz.o2.proxima.direct.randomaccess.RandomOffset;
import cz.o2.proxima.direct.randomaccess.RawOffset;
import cz.o2.proxima.direct.view.CachedView;
import cz.o2.proxima.direct.view.TimeBoundedVersionedCache;
import cz.o2.proxima.functional.BiConsumer;
import cz.o2.proxima.functional.BiFunction;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.functional.UnaryFunction;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.Pair;
import java.io.Serializable;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalCachedPartitionedView
implements CachedView {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(LocalCachedPartitionedView.class);
    private final CommitLogReader reader;
    private final EntityDescriptor entity;
    private final OnlineAttributeWriter writer;
    private final TimeBoundedVersionedCache cache;
    private final AtomicReference<ObserveHandle> handle = new AtomicReference();
    private BiConsumer<StreamElement, Pair<Long, Object>> updateCallback = (BiConsumer & Serializable)(e, old) -> {};

    public LocalCachedPartitionedView(EntityDescriptor entity, CommitLogReader reader, OnlineAttributeWriter writer) {
        this(entity, reader, writer, 60000L);
    }

    public LocalCachedPartitionedView(EntityDescriptor entity, CommitLogReader reader, OnlineAttributeWriter writer, long keepCachedDuration) {
        this.cache = new TimeBoundedVersionedCache(entity, keepCachedDuration);
        this.reader = Objects.requireNonNull(reader);
        this.entity = Objects.requireNonNull(entity);
        this.writer = Objects.requireNonNull(writer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onCache(StreamElement ingest, boolean overwrite) {
        Optional parsed;
        Optional optional = parsed = ingest.isDelete() ? Optional.empty() : ingest.getParsed();
        if (ingest.isDelete() || parsed.isPresent()) {
            boolean updated;
            Pair<Long, TimeBoundedVersionedCache.Payload> oldVal;
            String attrName = ingest.isDeleteWildcard() ? ingest.getAttributeDescriptor().toAttributePrefix() : ingest.getAttribute();
            TimeBoundedVersionedCache timeBoundedVersionedCache = this.cache;
            synchronized (timeBoundedVersionedCache) {
                oldVal = this.cache.get(ingest.getKey(), attrName, Long.MAX_VALUE);
                long sequenceId = ingest.hasSequentialId() ? ingest.getSequentialId() : 0L;
                updated = this.cache.put(ingest.getKey(), attrName, ingest.getStamp(), sequenceId, overwrite, parsed.orElse(null));
            }
            if (updated) {
                this.updateCallback.accept((Object)ingest, oldVal != null ? Pair.of((Object)oldVal.getFirst(), (Object)((TimeBoundedVersionedCache.Payload)oldVal.getSecond()).getData()) : null);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void assign(final Collection<Partition> partitions, BiConsumer<StreamElement, Pair<Long, Object>> updateCallback, final @Nullable Duration ttl) {
        this.close();
        this.updateCallback = Objects.requireNonNull(updateCallback);
        final SynchronousQueue errorDuringPrefetch = new SynchronousQueue();
        final AtomicLong prefetchedCount = new AtomicLong();
        final long prefetchStartTime = this.getCurrentTimeMillis();
        final long ttlMs = ttl == null ? Long.MAX_VALUE : ttl.toMillis();
        CommitLogObserver prefetchObserver = new CommitLogObserver(){

            @Override
            public boolean onNext(StreamElement ingest, CommitLogObserver.OnNextContext context) {
                log.debug("Prefetched element {} with ttlMs {}", (Object)ingest, (Object)ttlMs);
                long prefetched = prefetchedCount.incrementAndGet();
                if (ttl == null || LocalCachedPartitionedView.this.getCurrentTimeMillis() - ingest.getStamp() < ttlMs) {
                    if (prefetched % 10000L == 0L) {
                        log.info("Prefetched so far {} elements in {} millis", (Object)prefetched, (Object)(LocalCachedPartitionedView.this.getCurrentTimeMillis() - prefetchStartTime));
                    }
                    LocalCachedPartitionedView.this.onCache(ingest, false);
                }
                context.confirm();
                return true;
            }

            @Override
            public boolean onError(Throwable error) {
                log.error("Failed to prefetch data", error);
                ExceptionUtils.unchecked((ExceptionUtils.ThrowingRunnable & Serializable)() -> errorDuringPrefetch.put(Optional.of(error)));
                return false;
            }

            @Override
            public void onCompleted() {
                ExceptionUtils.unchecked((ExceptionUtils.ThrowingRunnable & Serializable)() -> errorDuringPrefetch.put(Optional.empty()));
            }
        };
        CommitLogObserver observer = new CommitLogObserver(){
            private long lastCleanup = 0L;

            @Override
            public boolean onNext(StreamElement ingest, CommitLogObserver.OnNextContext context) {
                LocalCachedPartitionedView.this.onCache(ingest, false);
                context.confirm();
                if (ttl != null) {
                    this.lastCleanup = LocalCachedPartitionedView.this.maybeDoCleanup(this.lastCleanup, ttlMs);
                }
                return true;
            }

            @Override
            public boolean onError(Throwable error) {
                log.error("Error in caching data. Restarting consumption", error);
                LocalCachedPartitionedView.this.assign(partitions);
                return false;
            }
        };
        LocalCachedPartitionedView localCachedPartitionedView = this;
        synchronized (localCachedPartitionedView) {
            try {
                log.info("Starting prefetching old topic data for partitions {} with preUpdate {}", partitions.stream().map(p -> String.format("%s[%d]", this.getUri(), p.getId())).collect(Collectors.toList()), updateCallback);
                ObserveHandle h = this.reader.observeBulkPartitions(partitions, Position.OLDEST, true, prefetchObserver);
                ((Optional)errorDuringPrefetch.take()).ifPresent(ExceptionUtils::rethrowAsIllegalStateException);
                log.info("Finished prefetching after {} records in {} millis. Starting consumption of updates.", (Object)prefetchedCount.get(), (Object)(this.getCurrentTimeMillis() - prefetchStartTime));
                List<Offset> offsets = h.getCommittedOffsets();
                this.handle.set(this.reader.observeBulkOffsets(offsets, observer));
                this.handle.get().waitUntilReady();
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(ex);
            }
        }
    }

    @VisibleForTesting
    long getCurrentTimeMillis() {
        return System.currentTimeMillis();
    }

    private long maybeDoCleanup(long lastCleanup, long ttlMs) {
        long now = this.getCurrentTimeMillis();
        long cleanTime = now - ttlMs;
        if (cleanTime < lastCleanup) {
            return lastCleanup;
        }
        this.cache.clearStaleRecords(cleanTime);
        return this.getCurrentTimeMillis();
    }

    @Override
    public Collection<Partition> getAssigned() {
        if (this.handle.get() != null) {
            return this.handle.get().getCommittedOffsets().stream().map(Offset::getPartition).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    @Override
    public RandomOffset fetchOffset(RandomAccessReader.Listing type, String key) {
        switch (type) {
            case ATTRIBUTE: {
                return new RawOffset(key);
            }
            case ENTITY: {
                return new IntOffset(this.cache.findPosition(key));
            }
        }
        throw new IllegalArgumentException("Unknown listing type " + (Object)((Object)type));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> Optional<KeyValue<T>> get(String key, String attribute, AttributeDescriptor<T> desc, long stamp) {
        long deleteStamp = Long.MIN_VALUE;
        TimeBoundedVersionedCache timeBoundedVersionedCache = this.cache;
        synchronized (timeBoundedVersionedCache) {
            Pair<Long, TimeBoundedVersionedCache.Payload> wildcard;
            if (desc.isWildcard() && (wildcard = this.cache.get(key, desc.toAttributePrefix(), stamp)) != null && ((TimeBoundedVersionedCache.Payload)wildcard.getSecond()).getData() == null) {
                deleteStamp = (Long)wildcard.getFirst();
            }
            long filterStamp = deleteStamp;
            return Optional.ofNullable(this.cache.get(key, attribute, stamp)).filter(e -> (Long)e.getFirst() >= filterStamp).flatMap(e -> Optional.ofNullable(this.toKv(key, attribute, (Pair<Long, TimeBoundedVersionedCache.Payload>)e)));
        }
    }

    @Override
    public void scanWildcardAll(String key, RandomOffset offset, long stamp, int limit, Consumer<KeyValue<?>> consumer) {
        String off = offset == null ? "" : ((RawOffset)offset).getOffset();
        this.scanWildcardPrefix(key, "", off, stamp, limit, consumer);
    }

    @Override
    public <T> void scanWildcard(String key, AttributeDescriptor<T> wildcard, RandomOffset offset, long stamp, int limit, Consumer<KeyValue<T>> consumer) {
        String off = offset == null ? wildcard.toAttributePrefix() : ((RawOffset)offset).getOffset();
        this.scanWildcardPrefix(key, wildcard.toAttributePrefix(), off, stamp, limit, consumer);
    }

    private void scanWildcardPrefix(String key, String prefix, String offset, long stamp, int limit, Consumer<KeyValue<?>> consumer) {
        AtomicInteger missing = new AtomicInteger(limit);
        this.cache.scan(key, prefix, offset, stamp, (UnaryFunction<String, String>)(UnaryFunction & Serializable)attr -> {
            AttributeDescriptor desc = this.entity.getAttribute(attr);
            if (desc.isWildcard()) {
                return desc.toAttributePrefix();
            }
            return null;
        }, (BiFunction<String, Pair<Long, TimeBoundedVersionedCache.Payload>, Boolean>)(BiFunction & Serializable)(attr, e) -> {
            KeyValue kv = this.toKv(key, (String)attr, (Pair<Long, TimeBoundedVersionedCache.Payload>)e);
            if (kv != null) {
                if (missing.decrementAndGet() != 0) {
                    consumer.accept(kv);
                } else {
                    return false;
                }
            }
            return true;
        });
    }

    @Override
    public void listEntities(RandomOffset offset, int limit, Consumer<Pair<RandomOffset, String>> consumer) {
        IntOffset off = offset == null ? new IntOffset(0) : (IntOffset)offset;
        AtomicInteger newOff = new AtomicInteger(off.getOffset());
        this.cache.keys(off.getOffset(), limit, (Consumer<String>)(Consumer & Serializable)key -> consumer.accept((Object)Pair.of((Object)new IntOffset(newOff.incrementAndGet()), (Object)key)));
    }

    @Override
    public void close() {
        Optional.ofNullable(this.handle.getAndSet(null)).ifPresent(ObserveHandle::close);
        this.cache.clear();
    }

    @Nullable
    private <T> KeyValue<T> toKv(String key, String attribute, @Nullable Pair<Long, TimeBoundedVersionedCache.Payload> p) {
        Optional attrDesc = this.entity.findAttribute(attribute, true);
        if (attrDesc.isPresent()) {
            return this.toKv(key, attribute, (AttributeDescriptor)attrDesc.get(), p);
        }
        log.warn("Missing attribute {} in entity {}", (Object)attribute, (Object)this.entity);
        return null;
    }

    @Nullable
    private <T> KeyValue<T> toKv(String key, String attribute, AttributeDescriptor<?> attr, @Nullable Pair<Long, TimeBoundedVersionedCache.Payload> p) {
        if (p == null || p.getSecond() == null || ((TimeBoundedVersionedCache.Payload)p.getSecond()).getData() == null) {
            return null;
        }
        if (((TimeBoundedVersionedCache.Payload)p.getSecond()).getSeqId() > 0L) {
            return KeyValue.of(this.entity, attr, ((TimeBoundedVersionedCache.Payload)p.getSecond()).getSeqId(), key, attribute, new RawOffset(attribute), ((TimeBoundedVersionedCache.Payload)p.getSecond()).getData(), null, (Long)p.getFirst());
        }
        return KeyValue.of(this.entity, attr, key, attribute, new RawOffset(attribute), ((TimeBoundedVersionedCache.Payload)p.getSecond()).getData(), null, (long)((Long)p.getFirst()));
    }

    @Override
    public EntityDescriptor getEntityDescriptor() {
        return this.entity;
    }

    @Override
    public void write(StreamElement data, CommitCallback statusCallback) {
        try {
            this.cache(data);
            this.writer.write(data, statusCallback);
        }
        catch (Exception ex) {
            statusCallback.commit(false, ex);
        }
    }

    @Override
    public URI getUri() {
        return this.reader.getUri();
    }

    @Override
    public void cache(StreamElement element) {
        this.onCache(element, true);
    }

    @Override
    public CommitLogReader getUnderlyingReader() {
        return this.reader;
    }

    @Override
    public Optional<ObserveHandle> getRunningHandle() {
        return Optional.ofNullable(this.handle.get());
    }

    @Override
    public CachedView.Factory asFactory() {
        CommitLogReader.Factory<?> readerFactory = this.reader.asFactory();
        AttributeWriterBase.Factory writerFactory = this.writer.asFactory();
        EntityDescriptor entity = this.entity;
        return arg_0 -> LocalCachedPartitionedView.lambda$asFactory$6990a09e$1(entity, readerFactory, (OnlineAttributeWriter.Factory)writerFactory, arg_0);
    }

    private static /* synthetic */ CachedView lambda$asFactory$6990a09e$1(EntityDescriptor entity, CommitLogReader.Factory readerFactory, OnlineAttributeWriter.Factory writerFactory, Repository repo) {
        return new LocalCachedPartitionedView(entity, (CommitLogReader)readerFactory.apply(repo), (OnlineAttributeWriter)writerFactory.apply(repo));
    }

    @VisibleForTesting
    static final class IntOffset
    implements RandomOffset {
        private final int offset;

        @Generated
        public IntOffset(int offset) {
            this.offset = offset;
        }

        @Generated
        public int getOffset() {
            return this.offset;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof IntOffset)) {
                return false;
            }
            IntOffset other = (IntOffset)o;
            return this.getOffset() == other.getOffset();
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.getOffset();
            return result;
        }

        @Generated
        public String toString() {
            return "LocalCachedPartitionedView.IntOffset(offset=" + this.getOffset() + ")";
        }
    }
}

