/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.core;

import com.typesafe.config.Config;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.batch.BatchLogReaders;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.CommitLogReaders;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.ContextProvider;
import cz.o2.proxima.direct.core.DataAccessor;
import cz.o2.proxima.direct.core.DataAccessorFactory;
import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor;
import cz.o2.proxima.direct.core.DirectAttributeFamilyProxyDescriptor;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.core.OnlineAttributeWriters;
import cz.o2.proxima.direct.randomaccess.RandomAccessReader;
import cz.o2.proxima.direct.transaction.ClientTransactionManager;
import cz.o2.proxima.direct.transaction.ServerTransactionManager;
import cz.o2.proxima.direct.transaction.TransactionResourceManager;
import cz.o2.proxima.direct.transaction.TransactionalCachedView;
import cz.o2.proxima.direct.transaction.TransactionalOnlineAttributeWriter;
import cz.o2.proxima.direct.view.CachedView;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.functional.UnaryFunction;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Sets;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.repository.AttributeFamilyProxyDescriptor;
import cz.o2.proxima.repository.ConfigRepository;
import cz.o2.proxima.repository.DataOperator;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.repository.TransactionMode;
import cz.o2.proxima.storage.StorageType;
import cz.o2.proxima.storage.ThroughputLimiter;
import cz.o2.proxima.storage.internal.AbstractDataAccessorFactory;
import cz.o2.proxima.storage.internal.DataAccessorLoader;
import cz.o2.proxima.util.Classpath;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.Pair;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectDataOperator
implements DataOperator,
ContextProvider {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DirectDataOperator.class);
    private static final String THROUGHPUT_LIMITER_PREFIX = "direct.throughput-limiter.";
    private static final String KW_CLASS = "class";
    private static final AtomicInteger threadId = new AtomicInteger();
    private static final String ID = UUID.randomUUID().toString();
    private final Repository repo;
    private final Map<AttributeFamilyDescriptor, DirectAttributeFamilyDescriptor> familyMap = Collections.synchronizedMap(new HashMap());
    private final Map<AttributeDescriptor<?>, OnlineAttributeWriter> writers = Collections.synchronizedMap(new HashMap());
    private Factory<ExecutorService> executorFactory = DirectDataOperator.createExecutorFactory();
    private final Context context;
    private final DataAccessorLoader<DirectDataOperator, DataAccessor, DataAccessorFactory> loader;
    private volatile TransactionResourceManager transactionManager;

    private static Factory<ExecutorService> createExecutorFactory() {
        return (Factory & Serializable)() -> Executors.newCachedThreadPool(r -> {
            Thread t = new Thread(r);
            t.setName(String.format("DirectDataOperatorThread-%s-%d", ID, threadId.incrementAndGet()));
            t.setUncaughtExceptionHandler((thr, exc) -> {
                log.error("Error running task in thread {}, bailing out...", (Object)thr.getName(), (Object)exc);
                Runtime.getRuntime().exit(1);
            });
            return t;
        });
    }

    DirectDataOperator(Repository repo) {
        this.repo = repo;
        this.context = new Context((UnaryFunction<AttributeFamilyDescriptor, DirectAttributeFamilyDescriptor>)((UnaryFunction & Serializable)this.familyMap::get), this.executorFactory);
        this.loader = DataAccessorLoader.of((Repository)repo, DataAccessorFactory.class);
        this.reload();
    }

    public DirectDataOperator withExecutorFactory(Factory<ExecutorService> factory) {
        this.executorFactory = factory;
        return this;
    }

    public final void reload() {
        this.close();
        this.familyMap.clear();
        this.dependencyOrdered(this.repo.getAllFamilies(true)).forEach(this::addResolvedFamily);
    }

    private List<AttributeFamilyDescriptor> dependencyOrdered(Stream<AttributeFamilyDescriptor> families) {
        HashSet available = new HashSet();
        ArrayList<AttributeFamilyDescriptor> resolved = new ArrayList<AttributeFamilyDescriptor>();
        Set toResolve = families.collect(Collectors.toSet());
        while (!toResolve.isEmpty()) {
            ArrayList remove = new ArrayList();
            ArrayList add = new ArrayList();
            toResolve.stream().filter(af -> !available.contains(af)).forEachOrdered(af -> {
                if (!af.isProxy()) {
                    available.add(af);
                    resolved.add((AttributeFamilyDescriptor)af);
                    remove.add(af);
                } else {
                    AttributeFamilyProxyDescriptor proxy = af.toProxy();
                    if (available.contains(proxy.getTargetFamilyRead()) && available.contains(proxy.getTargetFamilyWrite())) {
                        available.add(af);
                        resolved.add((AttributeFamilyDescriptor)af);
                        remove.add(af);
                    } else if (!available.contains(proxy.getTargetFamilyRead())) {
                        add.add(proxy.getTargetFamilyRead());
                    } else {
                        add.add(proxy.getTargetFamilyWrite());
                    }
                }
            });
            if (add.isEmpty() && remove.isEmpty()) {
                throw new IllegalStateException("Cannot make progress in resolving families " + toResolve.stream().map(AttributeFamilyDescriptor::getName).collect(Collectors.toList()) + ", currently resolved " + available.stream().map(AttributeFamilyDescriptor::getName).collect(Collectors.toList()));
            }
            toResolve.addAll(add);
            remove.forEach(toResolve::remove);
        }
        return resolved;
    }

    private void addResolvedFamily(AttributeFamilyDescriptor family) {
        try {
            if (!this.familyMap.containsKey(family)) {
                log.debug("Adding new family {} to familyMap", (Object)family);
                if (family.isProxy()) {
                    AttributeFamilyProxyDescriptor proxy = family.toProxy();
                    this.familyMap.put(family, DirectAttributeFamilyProxyDescriptor.of(this.repo, this.context, proxy));
                    this.addResolvedFamily(proxy.getTargetFamilyRead());
                    this.addResolvedFamily(proxy.getTargetFamilyWrite());
                } else {
                    DataAccessor accessor = this.findAccessorFor(family);
                    this.familyMap.put(family, new DirectAttributeFamilyDescriptor(this.repo, family, this.context, accessor));
                }
            }
        }
        catch (Exception ex) {
            log.error("Failed to add family {}", (Object)family, (Object)ex);
            throw ex;
        }
    }

    private DataAccessor findAccessorFor(AttributeFamilyDescriptor desc) {
        return this.getAccessorFactory(desc.getStorageUri()).map(f -> (DataAccessor)f.createAccessor(this, desc)).filter(f -> !this.repo.isShouldValidate(Repository.Validate.ACCESSES) || f.isAcceptable(desc)).orElseThrow(() -> new IllegalStateException("No DataAccessor for URI " + desc.getStorageUri() + " found. You might be missing some dependency."));
    }

    @Override
    public Context getContext() {
        return this.context;
    }

    public DirectAttributeFamilyDescriptor resolveRequired(AttributeFamilyDescriptor family) {
        return this.context.resolveRequired(family);
    }

    public Optional<DirectAttributeFamilyDescriptor> resolve(AttributeFamilyDescriptor family) {
        return this.context.resolve(family);
    }

    public Optional<DataAccessorFactory> getAccessorFactory(URI uri) {
        return this.loader.findForUri(uri).map(DelegateDataAccessorFactory::new);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Optional<OnlineAttributeWriter> getWriter(AttributeDescriptor<?> attr) {
        Map<AttributeDescriptor<?>, OnlineAttributeWriter> map = this.writers;
        synchronized (map) {
            OnlineAttributeWriter writer = this.writers.get(attr);
            if (writer == null) {
                if (attr.getEntity().equals("_transaction") && attr.getName().equals("commit")) {
                    this.repo.getAllFamilies(true).filter(af -> af.getEntity().getName().equals(attr.getEntity())).filter(af -> af.getAttributes().contains(attr)).map(af -> this.getFamilyByName(af.getName())).forEach(this::cacheOrRetrieveWriterFor);
                } else {
                    this.repo.getFamiliesForAttribute(attr).stream().filter(af -> af.getType() == StorageType.PRIMARY).filter(af -> !af.getAccess().isReadonly()).findAny().flatMap(this.context::resolve).ifPresent(this::cacheOrRetrieveWriterFor);
                }
                return Optional.ofNullable(this.writers.get(attr));
            }
            return Optional.of(writer);
        }
    }

    public TransactionalOnlineAttributeWriter getGlobalTransactionWriter() {
        return TransactionalOnlineAttributeWriter.global(this);
    }

    private void cacheOrRetrieveWriterFor(DirectAttributeFamilyDescriptor af) {
        Optional<AttributeWriterBase> maybeWriter = af.getWriter();
        if (maybeWriter.isPresent()) {
            TransactionalOnlineAttributeWriter maybeTransactionalWriter = null;
            OnlineAttributeWriter familyWriter = maybeWriter.get().online();
            if (af.getAttributes().stream().anyMatch(a -> a.getTransactionMode() != TransactionMode.NONE)) {
                maybeTransactionalWriter = this.wrapTransactionalWriterAround(familyWriter);
            }
            for (AttributeDescriptor<?> a2 : af.getAttributes()) {
                if (a2.getTransactionMode() == TransactionMode.NONE) {
                    this.writers.put(a2, OnlineAttributeWriters.synchronizedWriter(familyWriter));
                    continue;
                }
                this.writers.put(a2, maybeTransactionalWriter);
            }
        }
    }

    private TransactionalOnlineAttributeWriter wrapTransactionalWriterAround(OnlineAttributeWriter familyWriter) {
        return TransactionalOnlineAttributeWriter.of(this, familyWriter);
    }

    public Optional<CommitLogReader> getCommitLogReader(Collection<AttributeDescriptor<?>> attrs) {
        return this.getFamilyForAttributes(attrs, (UnaryFunction<DirectAttributeFamilyDescriptor, Boolean>)((UnaryFunction & Serializable)DirectAttributeFamilyDescriptor::hasCommitLogReader)).flatMap(DirectAttributeFamilyDescriptor::getCommitLogReader);
    }

    @SafeVarargs
    public final Optional<CommitLogReader> getCommitLogReader(AttributeDescriptor<?> ... attrs) {
        return this.getCommitLogReader(Arrays.asList(attrs));
    }

    public Optional<BatchLogReader> getBatchLogReader(Collection<AttributeDescriptor<?>> attrs) {
        return this.getFamilyForAttributes(attrs, (UnaryFunction<DirectAttributeFamilyDescriptor, Boolean>)((UnaryFunction & Serializable)DirectAttributeFamilyDescriptor::hasBatchReader)).flatMap(DirectAttributeFamilyDescriptor::getBatchReader);
    }

    @SafeVarargs
    public final Optional<BatchLogReader> getBatchLogReader(AttributeDescriptor<?> ... attrs) {
        return this.getBatchLogReader(Arrays.asList(attrs));
    }

    public Optional<CachedView> getCachedView(Collection<AttributeDescriptor<?>> attrs) {
        boolean hasTransactions = attrs.stream().anyMatch(a -> a.getTransactionMode() != TransactionMode.NONE);
        Optional<CachedView> view = this.getFamilyForAttributes(attrs, (UnaryFunction<DirectAttributeFamilyDescriptor, Boolean>)((UnaryFunction & Serializable)DirectAttributeFamilyDescriptor::hasCachedView)).flatMap(DirectAttributeFamilyDescriptor::getCachedView);
        if (hasTransactions) {
            return view.map(v -> new TransactionalCachedView(this, (CachedView)v));
        }
        return view;
    }

    @SafeVarargs
    public final Optional<CachedView> getCachedView(AttributeDescriptor<?> ... attrs) {
        return this.getCachedView(Arrays.asList(attrs));
    }

    public Optional<RandomAccessReader> getRandomAccess(Collection<AttributeDescriptor<?>> attrs) {
        return this.getFamilyForAttributes(attrs, (UnaryFunction<DirectAttributeFamilyDescriptor, Boolean>)((UnaryFunction & Serializable)DirectAttributeFamilyDescriptor::hasRandomAccessReader)).flatMap(DirectAttributeFamilyDescriptor::getRandomAccessReader);
    }

    @SafeVarargs
    public final Optional<RandomAccessReader> getRandomAccess(AttributeDescriptor<?> ... attrs) {
        return this.getRandomAccess(Arrays.asList(attrs));
    }

    private Optional<DirectAttributeFamilyDescriptor> getFamilyForAttributes(Collection<AttributeDescriptor<?>> attrs, UnaryFunction<DirectAttributeFamilyDescriptor, Boolean> mask) {
        return attrs.stream().map(a -> this.getFamiliesForAttribute((AttributeDescriptor<?>)a).stream().filter(arg_0 -> ((UnaryFunction)mask).apply(arg_0)).collect(Collectors.toSet())).reduce(Sets::intersection).flatMap(s -> s.stream().findAny());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        Map<AttributeDescriptor<?>, OnlineAttributeWriter> map = this.writers;
        synchronized (map) {
            this.writers.values().stream().distinct().forEach(AttributeWriterBase::close);
            this.writers.clear();
        }
        Optional.ofNullable(this.transactionManager).ifPresent(TransactionResourceManager::close);
    }

    public Set<DirectAttributeFamilyDescriptor> getFamiliesForAttribute(AttributeDescriptor<?> desc) {
        return this.repo.getFamiliesForAttribute(desc).stream().map(this::resolveRequired).collect(Collectors.toSet());
    }

    public Stream<DirectAttributeFamilyDescriptor> getAllFamilies() {
        return this.repo.getAllFamilies().map(this::resolveRequired);
    }

    public DirectAttributeFamilyDescriptor getFamilyByName(String name) {
        return this.findFamilyByName(name).orElseThrow(() -> new IllegalArgumentException("Family " + name + " not found"));
    }

    public Optional<DirectAttributeFamilyDescriptor> findFamilyByName(String name) {
        return this.repo.findFamilyByName(name).flatMap(f -> Optional.ofNullable(this.familyMap.get(f)));
    }

    public ClientTransactionManager getClientTransactionManager() {
        return this.getTransactionManager();
    }

    public ServerTransactionManager getServerTransactionManager() {
        return this.getTransactionManager();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TransactionResourceManager getTransactionManager() {
        if (this.transactionManager == null) {
            DirectDataOperator directDataOperator = this;
            synchronized (directDataOperator) {
                if (this.transactionManager == null) {
                    Config config = ((ConfigRepository)this.repo).getConfig();
                    Map<String, Object> cfg = config.hasPath("transactions") ? config.getObject("transactions").unwrapped() : Collections.emptyMap();
                    this.transactionManager = new TransactionResourceManager(this, cfg);
                }
            }
        }
        return this.transactionManager;
    }

    public Repository getRepository() {
        return this.repo;
    }

    @VisibleForTesting
    public static class DelegateDataAccessorFactory
    implements DataAccessorFactory {
        private static final long serialVersionUID = 1L;
        private final DataAccessorFactory delegate;

        public DelegateDataAccessorFactory(DataAccessorFactory delegate) {
            this.delegate = delegate;
        }

        public void setup(Repository repo) {
            this.delegate.setup(repo);
        }

        public AbstractDataAccessorFactory.Accept accepts(URI uri) {
            return this.delegate.accepts(uri);
        }

        public DataAccessor createAccessor(DirectDataOperator operator, AttributeFamilyDescriptor familyDescriptor) {
            return new ForwardingDataAccessor((DataAccessor)this.delegate.createAccessor(operator, familyDescriptor), familyDescriptor.getCfg());
        }

        @Generated
        public DataAccessorFactory getDelegate() {
            return this.delegate;
        }

        private static class ForwardingDataAccessor
        implements DataAccessor {
            private static final long serialVersionUID = 1L;
            private final DataAccessor delegate;
            @Nullable
            private final ThroughputLimiter limiter;

            public ForwardingDataAccessor(DataAccessor delegate, Map<String, Object> cfg) {
                this.delegate = delegate;
                this.limiter = this.configureLimiter(cfg);
                log.info("Created new {} for {}", (Object)this, (Object)delegate.getUri());
            }

            @Nullable
            private ThroughputLimiter configureLimiter(Map<String, Object> cfg) {
                Map<String, Object> prefixed = cfg.entrySet().stream().filter(e -> ((String)e.getKey()).startsWith(DirectDataOperator.THROUGHPUT_LIMITER_PREFIX)).map(e -> Pair.of((Object)((String)e.getKey()).substring(DirectDataOperator.THROUGHPUT_LIMITER_PREFIX.length()), e.getValue())).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
                return Optional.ofNullable(prefixed.get(DirectDataOperator.KW_CLASS)).map(Object::toString).map(cls -> (ThroughputLimiter)ExceptionUtils.uncheckedFactory((ExceptionUtils.ThrowingFactory & Serializable)() -> (ThroughputLimiter)Classpath.newInstance((String)cls, ThroughputLimiter.class))).map(l -> {
                    l.setup(prefixed);
                    return l;
                }).orElse(null);
            }

            public URI getUri() {
                return this.delegate.getUri();
            }

            @Override
            public Optional<AttributeWriterBase> getWriter(Context context) {
                return this.delegate.getWriter(context);
            }

            @Override
            public Optional<CommitLogReader> getCommitLogReader(Context context) {
                return this.delegate.getCommitLogReader(context).map(reader -> CommitLogReaders.withThroughputLimit(reader, this.limiter));
            }

            @Override
            public Optional<RandomAccessReader> getRandomAccessReader(Context context) {
                return this.delegate.getRandomAccessReader(context);
            }

            @Override
            public Optional<BatchLogReader> getBatchLogReader(Context context) {
                return this.delegate.getBatchLogReader(context).map(reader -> BatchLogReaders.withLimitedThroughput(reader, this.limiter));
            }

            @Override
            public Optional<CachedView> getCachedView(Context context) {
                return this.delegate.getCachedView(context);
            }

            @Override
            public boolean isAcceptable(AttributeFamilyDescriptor familyDescriptor) {
                return this.delegate.isAcceptable(familyDescriptor);
            }

            public String toString() {
                return MoreObjects.toStringHelper((Object)this).add("delegate", (Object)this.delegate).add("limiter", (Object)this.limiter).toString();
            }
        }
    }
}

