/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.server;

import com.google.common.collect.Sets;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.proto.service.Rpc;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.repository.TransformationDescriptor;
import cz.o2.proxima.server.IngestService;
import cz.o2.proxima.server.RetrieveService;
import cz.o2.proxima.server.TransformationObserver;
import cz.o2.proxima.server.metrics.Metrics;
import cz.o2.proxima.storage.AttributeWriterBase;
import cz.o2.proxima.storage.BulkAttributeWriter;
import cz.o2.proxima.storage.OnlineAttributeWriter;
import cz.o2.proxima.storage.StorageFilter;
import cz.o2.proxima.storage.StorageType;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.AbstractRetryableLogObserver;
import cz.o2.proxima.storage.commitlog.BulkLogObserver;
import cz.o2.proxima.storage.commitlog.CommitLogReader;
import cz.o2.proxima.storage.commitlog.LogObserver;
import cz.o2.proxima.storage.commitlog.Offset;
import cz.o2.proxima.storage.commitlog.RetryableBulkObserver;
import cz.o2.proxima.storage.commitlog.RetryableLogObserver;
import cz.o2.proxima.transform.Transformation;
import cz.o2.proxima.util.Pair;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.File;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IngestServer {
    private static final Logger log = LoggerFactory.getLogger(IngestServer.class);
    static final int CORES = Math.max(2, Runtime.getRuntime().availableProcessors());
    final Executor executor = new ThreadPoolExecutor(CORES, 10 * CORES, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10 * CORES));
    final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(5);
    final Repository repo;
    final Config cfg;
    final boolean ignoreErrors;
    RetryPolicy retryPolicy = new RetryPolicy().withMaxRetries(3).withBackoff(3000L, 20000L, TimeUnit.MILLISECONDS, 2.0);

    public static void main(String[] args) throws Throwable {
        IngestServer server = args.length == 0 ? new IngestServer(ConfigFactory.load().resolve()) : new IngestServer(ConfigFactory.parseFile((File)new File(args[0])).resolve());
        server.run();
    }

    protected IngestServer(Config cfg) {
        this.cfg = cfg;
        this.repo = Repository.of((Config)cfg);
        if (log.isDebugEnabled()) {
            this.repo.getAllEntities().forEach(e -> e.getAllAttributes(true).stream().forEach(a -> log.debug("Configured attribute {}", a)));
        }
        if (this.repo.isEmpty()) {
            throw new IllegalArgumentException("No valid entities found in provided config!");
        }
        this.ignoreErrors = cfg.hasPath("ingest.ignore-errors") && cfg.getBoolean("ingest.ignore-errors");
    }

    static boolean ingestRequest(Repository repo, StreamElement ingest, String uuid, Consumer<Rpc.Status> responseConsumer) {
        boolean valid;
        EntityDescriptor entityDesc = ingest.getEntityDescriptor();
        AttributeDescriptor attributeDesc = ingest.getAttributeDescriptor();
        OnlineAttributeWriter writer = (OnlineAttributeWriter)repo.getWriter(attributeDesc).orElseThrow(() -> new IllegalStateException("Writer for attribute " + attributeDesc.getName() + " not found"));
        if (writer == null) {
            log.warn("Missing writer for request {}", (Object)ingest);
            responseConsumer.accept(IngestServer.status(uuid, 503, "No writer for attribute " + attributeDesc.getName()));
            return false;
        }
        boolean bl = valid = ingest.isDelete() || attributeDesc.getValueSerializer().isValid(ingest.getValue());
        if (!valid) {
            log.info("Request {} is not valid", (Object)ingest);
            responseConsumer.accept(IngestServer.status(uuid, 412, "Invalid scheme for " + entityDesc.getName() + "." + attributeDesc.getName()));
            return false;
        }
        if (ingest.isDelete()) {
            if (ingest.isDeleteWildcard()) {
                Metrics.DELETE_WILDCARD_REQUESTS.increment();
            } else {
                Metrics.DELETE_REQUESTS.increment();
            }
        } else {
            Metrics.UPDATE_REQUESTS.increment();
        }
        Metrics.COMMIT_LOG_APPEND.increment();
        log.debug("Writing {} to commit log {}", (Object)ingest, (Object)writer.getUri());
        writer.write(ingest, (s, exc) -> {
            if (s) {
                responseConsumer.accept(IngestServer.ok(uuid));
            } else {
                responseConsumer.accept(IngestServer.status(uuid, 500, exc.getMessage()));
            }
        });
        return true;
    }

    static Rpc.Status notFound(String uuid, String what) {
        return Rpc.Status.newBuilder().setUuid(uuid).setStatus(404).setStatusMessage(what).build();
    }

    static Rpc.Status ok(String uuid) {
        return Rpc.Status.newBuilder().setStatus(200).setUuid(uuid).build();
    }

    static Rpc.Status status(String uuid, int status, String message) {
        return Rpc.Status.newBuilder().setUuid(uuid).setStatus(status).setStatusMessage(message).build();
    }

    private void run() {
        int port = this.cfg.hasPath("ingest.server.port") ? this.cfg.getInt("ingest.server.port") : 4001;
        Server server = ServerBuilder.forPort((int)port).executor(this.executor).addService((BindableService)new IngestService(this.repo, this.scheduler)).addService((BindableService)new RetrieveService(this.repo)).build();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("Gracefully shuting down server.");
            server.shutdown();
        }));
        Metrics.register();
        this.startConsumerThreads();
        try {
            server.start();
            log.info("Successfully started server 0.0.0.0:{}", (Object)server.getPort());
            server.awaitTermination();
            log.info("Server shutdown.");
        }
        catch (Exception ex) {
            IngestServer.die("Failed to start the server", ex);
        }
    }

    protected void startConsumerThreads() {
        Map<AttributeFamilyDescriptor, Set<AttributeFamilyDescriptor>> familyToCommitLog = this.indexFamilyToCommitLogs();
        log.info("Starting consumer threads for familyToCommitLog {}", familyToCommitLog);
        familyToCommitLog.forEach((family, logs) -> {
            for (AttributeFamilyDescriptor commitLogFamily : logs) {
                if (!family.getAccess().isReadonly()) {
                    CommitLogReader commitLog = (CommitLogReader)commitLogFamily.getCommitLogReader().orElseThrow(() -> new IllegalStateException("Failed to find commit-log reader in family " + commitLogFamily));
                    AttributeWriterBase writer = (AttributeWriterBase)family.getWriter().orElseThrow(() -> new IllegalStateException("Unable to get writer for family " + family.getName() + "."));
                    StorageFilter filter = family.getFilter();
                    HashSet allowedAttributes = new HashSet(family.getAttributes());
                    String name = "consumer-" + family.getName();
                    this.registerWriterTo(name, commitLog, allowedAttributes, filter, writer, this.retryPolicy);
                    log.info("Started consumer {} consuming from log {} with URI {} into {} attributes {}", new Object[]{name, commitLog, commitLog.getUri(), writer.getUri(), allowedAttributes});
                    continue;
                }
                log.debug("Not starting thread for read-only family {}", family);
            }
        });
        this.repo.getTransformations().forEach(this::runTransformer);
    }

    private void runTransformer(String name, TransformationDescriptor transform) {
        AttributeFamilyDescriptor family = transform.getAttributes().stream().map(a -> this.repo.getFamiliesForAttribute(a).stream().filter(af -> af.getAccess().canReadCommitLog()).collect(Collectors.toSet())).reduce(Sets::intersection).filter(s -> !s.isEmpty()).map(s -> s.stream().filter(f -> f.getCommitLogReader().isPresent()).findAny().orElse(null)).filter(Objects::nonNull).orElseThrow(() -> new IllegalArgumentException("Cannot obtain attribute family for " + transform.getAttributes()));
        Transformation t = transform.getTransformation();
        StorageFilter f = transform.getFilter();
        String consumer = "transformer-" + name;
        CommitLogReader reader = (CommitLogReader)family.getCommitLogReader().orElseThrow(() -> new IllegalStateException("Unable to get reader for family " + family.getName() + "."));
        this.startTransformationObserver(consumer, reader, t, f, name);
        log.info("Started transformer {} reading from {} using {}", new Object[]{consumer, reader.getUri(), t.getClass()});
    }

    private void startTransformationObserver(String consumer, CommitLogReader reader, Transformation transformation, StorageFilter filter, String name) {
        new TransformationObserver(3, consumer, reader, this.repo, name, transformation, filter).start();
    }

    private Map<AttributeFamilyDescriptor, Set<AttributeFamilyDescriptor>> indexFamilyToCommitLogs() {
        Map<AttributeDescriptor, AttributeFamilyDescriptor> attrToCommitLog = this.repo.getAllFamilies().filter(af -> af.getType() == StorageType.PRIMARY).flatMap(af -> af.getAttributes().stream().map(attr -> Pair.of((Object)attr, (Object)af))).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
        return this.repo.getAllFamilies().filter(af -> af.getType() == StorageType.REPLICA).map(af -> {
            if (af.getSource().isPresent()) {
                String source = (String)af.getSource().get();
                return Pair.of((Object)af, Collections.singleton(this.repo.getAllFamilies().filter(af2 -> af2.getName().equals(source)).findAny().orElseThrow(() -> new IllegalArgumentException("Unknown family " + source))));
            }
            return Pair.of((Object)af, af.getAttributes().stream().map(attr -> {
                AttributeFamilyDescriptor commitFamily = (AttributeFamilyDescriptor)attrToCommitLog.get(attr);
                Optional writer = this.repo.getWriter(attr);
                if (commitFamily == null && writer.isPresent()) {
                    throw new IllegalStateException("Missing source commit log family for " + attr);
                }
                return commitFamily;
            }).filter(Objects::nonNull).collect(Collectors.toSet()));
        }).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
    }

    private void registerWriterTo(String consumerName, CommitLogReader commitLog, Set<AttributeDescriptor<?>> allowedAttributes, StorageFilter filter, AttributeWriterBase writerBase, RetryPolicy retry) {
        AbstractRetryableLogObserver observer;
        log.info("Registering {} writer to {} from commit log {}", new Object[]{writerBase.getType(), writerBase.getUri(), commitLog.getUri()});
        if (writerBase.getType() == AttributeWriterBase.Type.ONLINE) {
            OnlineAttributeWriter writer = writerBase.online();
            observer = this.getOnlineObserver(consumerName, commitLog, allowedAttributes, filter, writer);
        } else {
            BulkAttributeWriter writer = writerBase.bulk();
            observer = this.getBulkObserver(consumerName, commitLog, allowedAttributes, filter, writer, retry);
        }
        observer.start();
    }

    private AbstractRetryableLogObserver getBulkObserver(final String consumerName, final CommitLogReader commitLog, final Set<AttributeDescriptor<?>> allowedAttributes, final StorageFilter filter, final BulkAttributeWriter writer, final RetryPolicy retry) {
        return new RetryableBulkObserver(3, consumerName, commitLog){

            public boolean onNextInternal(StreamElement ingest, BulkLogObserver.OffsetCommitter committer) {
                boolean allowed = allowedAttributes.contains(ingest.getAttributeDescriptor());
                log.debug("Consumer {}: received new ingest element {}", (Object)consumerName, (Object)ingest);
                if (allowed && filter.apply(ingest)) {
                    Failsafe.with((RetryPolicy)retry).run(() -> this.ingestBulkInternal(ingest, committer));
                } else {
                    Metrics.COMMIT_UPDATE_DISCARDED.increment();
                    log.debug("Consumer {]: discarding write of {} to {} because of {}, with allowedAttributes {} and filter class {}", new Object[]{consumerName, ingest, writer.getUri(), allowed ? "applied filter" : "invalid attribute", allowedAttributes, filter.getClass()});
                }
                return true;
            }

            protected void failure() {
                IngestServer.die(String.format("Consumer %s: too many errors retrying the consumption of commit log %s. Killing self.", consumerName, commitLog.getUri()));
            }

            public void onRestart(List<Offset> offsets) {
                log.info("Consumer {}: restarting bulk processing of {} from {}, rollbacking the writer", new Object[]{consumerName, writer.getUri(), offsets});
                writer.rollback();
            }

            private void ingestBulkInternal(StreamElement ingest, BulkLogObserver.OffsetCommitter committer) {
                log.debug("Consumer {}: writing element {} into {}", new Object[]{consumerName, ingest, writer});
                writer.write(ingest, (succ, exc) -> IngestServer.this.confirmWrite(consumerName, ingest, (AttributeWriterBase)writer, succ, exc, () -> ((BulkLogObserver.OffsetCommitter)committer).confirm(), arg_0 -> ((BulkLogObserver.OffsetCommitter)committer).fail(arg_0)));
            }
        };
    }

    private AbstractRetryableLogObserver getOnlineObserver(final String consumerName, final CommitLogReader commitLog, final Set<AttributeDescriptor<?>> allowedAttributes, final StorageFilter filter, final OnlineAttributeWriter writer) {
        return new RetryableLogObserver(3, consumerName, commitLog){

            public boolean onNextInternal(StreamElement ingest, LogObserver.OffsetCommitter committer) {
                boolean allowed = allowedAttributes.contains(ingest.getAttributeDescriptor());
                log.debug("Consumer {}: received new stream element {}", (Object)consumerName, (Object)ingest);
                if (allowed && filter.apply(ingest)) {
                    Failsafe.with((RetryPolicy)IngestServer.this.retryPolicy).run(() -> this.ingestOnlineInternal(ingest, committer));
                } else {
                    Metrics.COMMIT_UPDATE_DISCARDED.increment();
                    log.debug("Consumer {}: discarding write of {} to {} because of {}, with allowedAttributes {} and filter class {}", new Object[]{consumerName, ingest, writer.getUri(), allowed ? "applied filter" : "invalid attribute", allowedAttributes, filter.getClass()});
                    committer.confirm();
                }
                return true;
            }

            protected void failure() {
                IngestServer.die(String.format("Consumer %s: too many errors retrying the consumption of commit log %s. Killing self.", consumerName, commitLog.getUri()));
            }

            private void ingestOnlineInternal(StreamElement ingest, LogObserver.OffsetCommitter committer) {
                log.debug("Consumer {}: writing element {} into {}", new Object[]{consumerName, ingest, writer});
                writer.write(ingest, (success, exc) -> IngestServer.this.confirmWrite(consumerName, ingest, (AttributeWriterBase)writer, success, exc, () -> ((LogObserver.OffsetCommitter)committer).confirm(), arg_0 -> ((LogObserver.OffsetCommitter)committer).fail(arg_0)));
            }
        };
    }

    private void confirmWrite(String consumerName, StreamElement ingest, AttributeWriterBase writer, boolean success, Throwable exc, Runnable onSuccess, Consumer<Throwable> onError) {
        if (!success) {
            log.error("Consumer {}: failed to write ingest {} to {}", new Object[]{consumerName, ingest, writer.getUri(), exc});
            Metrics.NON_COMMIT_WRITES_RETRIES.increment();
            if (this.ignoreErrors) {
                log.error("Consumer {}: retries exhausted trying to ingest {} to {}. Configured to ignore. Skipping.", new Object[]{consumerName, ingest, writer.getUri()});
                onSuccess.run();
            } else {
                onError.accept(exc);
            }
        } else {
            if (ingest.isDelete()) {
                Metrics.NON_COMMIT_LOG_DELETES.increment();
            } else {
                Metrics.NON_COMMIT_LOG_UPDATES.increment();
            }
            onSuccess.run();
        }
    }

    static void die(String message) {
        IngestServer.die(message, null);
    }

    static void die(String message, @Nullable Throwable error) {
        try {
            Thread.sleep((long)(Math.random() * 10000.0));
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
        if (error == null) {
            log.error(message);
        } else {
            log.error(message, error);
        }
        System.exit(1);
    }

    public static int getCORES() {
        return CORES;
    }

    public Executor getExecutor() {
        return this.executor;
    }

    public ScheduledExecutorService getScheduler() {
        return this.scheduler;
    }

    public Repository getRepo() {
        return this.repo;
    }

    public Config getCfg() {
        return this.cfg;
    }

    public boolean isIgnoreErrors() {
        return this.ignoreErrors;
    }

    public RetryPolicy getRetryPolicy() {
        return this.retryPolicy;
    }
}

