/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.server;

import com.google.common.base.Strings;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.TextFormat;
import cz.o2.proxima.proto.service.IngestServiceGrpc;
import cz.o2.proxima.proto.service.Rpc;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.server.IngestServer;
import cz.o2.proxima.server.metrics.Metrics;
import cz.o2.proxima.storage.StreamElement;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.stub.StreamObserver;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IngestService
extends IngestServiceGrpc.IngestServiceImplBase {
    private static final Logger log = LoggerFactory.getLogger(IngestService.class);
    private final Repository repo;
    private final ScheduledExecutorService scheduler;

    public IngestService(Repository repo, ScheduledExecutorService scheduler) {
        this.repo = repo;
        this.scheduler = scheduler;
    }

    private void processSingleIngest(Rpc.Ingest request, Consumer<Rpc.Status> consumer) {
        if (log.isDebugEnabled()) {
            log.debug("Processing input ingest {}", (Object)TextFormat.shortDebugString((MessageOrBuilder)request));
        }
        Consumer<Rpc.Status> loggingConsumer = rpc -> {
            log.info("Input ingest {}: {}, {}", new Object[]{TextFormat.shortDebugString((MessageOrBuilder)request), rpc.getStatus(), rpc.getStatus() == 200 ? "OK" : rpc.getStatusMessage()});
            consumer.accept((Rpc.Status)rpc);
        };
        Metrics.INGESTS.increment();
        try {
            if (!this.writeRequest(request, loggingConsumer)) {
                Metrics.INVALID_REQUEST.increment();
            }
        }
        catch (Exception err) {
            log.error("Error processing user request {}", (Object)request, (Object)err);
            loggingConsumer.accept(IngestServer.status(request.getUuid(), 500, err.getMessage()));
        }
    }

    private boolean writeRequest(Rpc.Ingest request, Consumer<Rpc.Status> consumer) {
        if (Strings.isNullOrEmpty((String)request.getKey()) || Strings.isNullOrEmpty((String)request.getEntity()) || Strings.isNullOrEmpty((String)request.getAttribute())) {
            consumer.accept(IngestServer.status(request.getUuid(), 400, "Missing required fields in input message"));
            return false;
        }
        Optional entity = this.repo.findEntity(request.getEntity());
        if (!entity.isPresent()) {
            consumer.accept(IngestServer.notFound(request.getUuid(), "Entity " + request.getEntity() + " not found"));
            return false;
        }
        Optional attr = ((EntityDescriptor)entity.get()).findAttribute(request.getAttribute());
        if (!attr.isPresent()) {
            consumer.accept(IngestServer.notFound(request.getUuid(), "Attribute " + request.getAttribute() + " of entity " + ((EntityDescriptor)entity.get()).getName() + " not found"));
            return false;
        }
        return IngestServer.ingestRequest(this.repo, IngestService.toStreamElement(request, (EntityDescriptor)entity.get(), (AttributeDescriptor)attr.get()), request.getUuid(), consumer);
    }

    public void ingest(Rpc.Ingest request, StreamObserver<Rpc.Status> responseObserver) {
        Metrics.INGEST_SINGLE.increment();
        this.processSingleIngest(request, status -> {
            responseObserver.onNext(status);
            responseObserver.onCompleted();
        });
    }

    public StreamObserver<Rpc.Ingest> ingestSingle(StreamObserver<Rpc.Status> responseObserver) {
        return new IngestObserver(responseObserver);
    }

    public StreamObserver<Rpc.IngestBulk> ingestBulk(StreamObserver<Rpc.StatusBulk> responseObserver) {
        return new IngestBulkObserver(responseObserver);
    }

    private static StreamElement toStreamElement(Rpc.Ingest request, EntityDescriptor entity, AttributeDescriptor attr) {
        long stamp;
        long l = stamp = request.getStamp() == 0L ? System.currentTimeMillis() : request.getStamp();
        if (request.getDelete()) {
            return attr.isWildcard() && attr.getName().equals(request.getAttribute()) ? StreamElement.deleteWildcard((EntityDescriptor)entity, (AttributeDescriptor)attr, (String)request.getUuid(), (String)request.getKey(), (long)stamp) : StreamElement.delete((EntityDescriptor)entity, (AttributeDescriptor)attr, (String)request.getUuid(), (String)request.getKey(), (String)request.getAttribute(), (long)stamp);
        }
        return StreamElement.update((EntityDescriptor)entity, (AttributeDescriptor)attr, (String)request.getUuid(), (String)request.getKey(), (String)request.getAttribute(), (long)stamp, (byte[])request.getValue().toByteArray());
    }

    private class IngestBulkObserver
    implements StreamObserver<Rpc.IngestBulk> {
        final StreamObserver<Rpc.StatusBulk> responseObserver;
        final Queue<Rpc.Status> statusQueue = new ConcurrentLinkedQueue<Rpc.Status>();
        final AtomicBoolean completed = new AtomicBoolean(false);
        final AtomicInteger inflightRequests = new AtomicInteger();
        final AtomicLong lastFlushNanos = new AtomicLong(System.nanoTime());
        final Rpc.StatusBulk.Builder builder = Rpc.StatusBulk.newBuilder();
        static final long MAX_SLEEP_NANOS = 100000000L;
        static final int MAX_QUEUED_STATUSES = 500;
        Runnable flushTask = this.createFlushTask();
        ScheduledFuture<?> flushFuture = IngestService.access$200(IngestService.this).scheduleAtFixedRate(this.flushTask, 100000000L, 100000000L, TimeUnit.NANOSECONDS);

        IngestBulkObserver(StreamObserver<Rpc.StatusBulk> responseObserver) {
            this.responseObserver = responseObserver;
        }

        private Runnable createFlushTask() {
            return () -> {
                try {
                    Rpc.StatusBulk.Builder builder = this.builder;
                    synchronized (builder) {
                        while (this.statusQueue.size() > 500) {
                            this.peekQueueToBuilderAndFlush();
                        }
                        long now = System.nanoTime();
                        if (now - this.lastFlushNanos.get() >= 100000000L) {
                            while (!this.statusQueue.isEmpty()) {
                                this.peekQueueToBuilderAndFlush();
                            }
                        }
                        if (this.builder.getStatusCount() > 0) {
                            this.responseObserver.onNext((Object)this.builder.build());
                            this.builder.clear();
                        }
                        if (this.completed.get() && this.inflightRequests.get() == 0 && this.statusQueue.isEmpty()) {
                            this.responseObserver.onCompleted();
                        }
                    }
                }
                catch (Exception ex) {
                    log.error("Failed to send bulk status", (Throwable)ex);
                }
            };
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void peekQueueToBuilderAndFlush() {
            Rpc.StatusBulk.Builder builder = this.builder;
            synchronized (builder) {
                this.builder.addStatus(this.statusQueue.poll());
                if (this.builder.getStatusCount() >= 1000) {
                    this.flush();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void flush() {
            Rpc.StatusBulk.Builder builder = this.builder;
            synchronized (builder) {
                this.lastFlushNanos.set(System.nanoTime());
                Rpc.StatusBulk bulk = this.builder.build();
                if (bulk.getStatusCount() > 0) {
                    this.responseObserver.onNext((Object)bulk);
                }
                this.builder.clear();
            }
        }

        public void onNext(Rpc.IngestBulk bulk) {
            Metrics.INGEST_BULK.increment();
            Metrics.BULK_SIZE.increment((double)bulk.getIngestCount());
            this.inflightRequests.addAndGet(bulk.getIngestCount());
            bulk.getIngestList().stream().forEach(r -> IngestService.this.processSingleIngest(r, status -> {
                this.statusQueue.add((Rpc.Status)status);
                if (this.statusQueue.size() >= 500) {
                    IngestService.this.scheduler.execute(this.flushTask);
                }
                if (this.inflightRequests.decrementAndGet() == 0) {
                    AtomicInteger atomicInteger = this.inflightRequests;
                    synchronized (atomicInteger) {
                        this.inflightRequests.notifyAll();
                    }
                }
            }));
        }

        public void onError(Throwable error) {
            log.error("Error from client", error);
            this.responseObserver.onError(error);
            this.flushFuture.cancel(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @SuppressFBWarnings(value={"JLM_JSR166_UTILCONCURRENT_MONITORENTER"}, justification="The synchronization on `inflighRequests` is used only for waiting before the flush thread finishes (wait() - notify())")
        public void onCompleted() {
            this.completed.set(true);
            this.flushFuture.cancel(true);
            AtomicInteger atomicInteger = this.inflightRequests;
            synchronized (atomicInteger) {
                while (this.inflightRequests.get() > 0) {
                    try {
                        this.inflightRequests.wait(100L);
                    }
                    catch (InterruptedException ex) {
                        log.warn("Interrupted while waiting to send responses to client", (Throwable)ex);
                        Thread.currentThread().interrupt();
                    }
                }
            }
            while (!this.statusQueue.isEmpty()) {
                this.peekQueueToBuilderAndFlush();
            }
            this.flush();
            this.responseObserver.onCompleted();
        }
    }

    private class IngestObserver
    implements StreamObserver<Rpc.Ingest> {
        final StreamObserver<Rpc.Status> responseObserver;
        final AtomicInteger inflightRequests = new AtomicInteger(0);
        final Object responseObserverLock = new Object();

        IngestObserver(StreamObserver<Rpc.Status> responseObserver) {
            this.responseObserver = responseObserver;
        }

        public void onNext(Rpc.Ingest request) {
            Metrics.INGEST_SINGLE.increment();
            this.inflightRequests.incrementAndGet();
            IngestService.this.processSingleIngest(request, status -> {
                Object object = this.responseObserverLock;
                synchronized (object) {
                    this.responseObserver.onNext(status);
                }
                if (this.inflightRequests.decrementAndGet() == 0) {
                    object = this.inflightRequests;
                    synchronized (object) {
                        this.inflightRequests.notifyAll();
                    }
                }
            });
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onError(Throwable thrwbl) {
            log.error("Error on channel", thrwbl);
            Object object = this.responseObserverLock;
            synchronized (object) {
                this.responseObserver.onError(thrwbl);
            }
        }

        public void onCompleted() {
            this.inflightRequests.accumulateAndGet(0, (a, b) -> {
                Object object;
                int res = a + b;
                if (res > 0) {
                    object = this.inflightRequests;
                    synchronized (object) {
                        try {
                            while (this.inflightRequests.get() > 0) {
                                this.inflightRequests.wait();
                            }
                        }
                        catch (InterruptedException ex) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
                object = this.responseObserverLock;
                synchronized (object) {
                    this.responseObserver.onCompleted();
                }
                return res;
            });
        }
    }
}

