package cz.o2.proxima.server;

import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.proto.service.IngestServiceGrpc;
import cz.o2.proxima.proto.service.RetrieveServiceGrpc;
import cz.o2.proxima.proto.service.Rpc;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.repository.Transformation;
import cz.o2.proxima.repository.TransformationDescriptor;
import cz.o2.proxima.server.metrics.Metrics;
import cz.o2.proxima.storage.AttributeWriterBase;
import cz.o2.proxima.storage.BulkAttributeWriter;
import cz.o2.proxima.storage.OnlineAttributeWriter;
import cz.o2.proxima.storage.StorageFilter;
import cz.o2.proxima.storage.StorageType;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.AbstractRetryableLogObserver;
import cz.o2.proxima.storage.commitlog.BulkLogObserver;
import cz.o2.proxima.storage.commitlog.CommitLogReader;
import cz.o2.proxima.storage.commitlog.LogObserver;
import cz.o2.proxima.storage.commitlog.RetryableBulkObserver;
import cz.o2.proxima.storage.commitlog.RetryableLogObserver;
import cz.o2.proxima.storage.randomaccess.KeyValue;
import cz.o2.proxima.storage.randomaccess.RandomAccessReader;
import cz.o2.proxima.util.Pair;
import cz.seznam.euphoria.shadow.com.google.common.base.Strings;
import cz.seznam.euphoria.shadow.com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import net.jodah.failsafe.SyncFailsafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/server/IngestServer.class */
public class IngestServer {
    private static final Logger log = LoggerFactory.getLogger(IngestServer.class);
    final Repository repo;
    final Config cfg;
    final boolean ignoreErrors;
    final int minCores = 2;
    final Executor executor = new ThreadPoolExecutor(2, 20, 10, TimeUnit.SECONDS, new SynchronousQueue());
    final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(5);
    RetryPolicy retryPolicy = new RetryPolicy().withMaxRetries(3).withBackoff(3000, 20000, TimeUnit.MILLISECONDS, 2.0d);

    /* loaded from: input_file:cz/o2/proxima/server/IngestServer$IngestService.class */
    public class IngestService extends IngestServiceGrpc.IngestServiceImplBase {
        public IngestService() {
        }

        public void ingest(Rpc.Ingest ingest, StreamObserver<Rpc.Status> streamObserver) {
            Metrics.INGEST_SINGLE.increment();
            IngestServer.this.processSingleIngest(ingest, status -> {
                synchronized (streamObserver) {
                    streamObserver.onNext(status);
                    streamObserver.onCompleted();
                }
            });
        }

        public StreamObserver<Rpc.Ingest> ingestSingle(final StreamObserver<Rpc.Status> streamObserver) {
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            return new StreamObserver<Rpc.Ingest>() { // from class: cz.o2.proxima.server.IngestServer.IngestService.1
                public void onNext(Rpc.Ingest ingest) {
                    Metrics.INGEST_SINGLE.increment();
                    atomicInteger.incrementAndGet();
                    IngestServer ingestServer = IngestServer.this;
                    StreamObserver streamObserver2 = streamObserver;
                    AtomicInteger atomicInteger2 = atomicInteger;
                    ingestServer.processSingleIngest(ingest, status -> {
                        synchronized (streamObserver2) {
                            streamObserver2.onNext(status);
                        }
                        if (atomicInteger2.decrementAndGet() == 0) {
                            synchronized (atomicInteger2) {
                                atomicInteger2.notify();
                            }
                        }
                    });
                }

                public void onError(Throwable th) {
                    IngestServer.log.error("Error on channel", th);
                    synchronized (streamObserver) {
                        streamObserver.onError(th);
                    }
                }

                public void onCompleted() {
                    AtomicInteger atomicInteger2 = atomicInteger;
                    AtomicInteger atomicInteger3 = atomicInteger;
                    StreamObserver streamObserver2 = streamObserver;
                    atomicInteger2.accumulateAndGet(0, (i, i2) -> {
                        int i = i + i2;
                        if (i > 0) {
                            synchronized (atomicInteger3) {
                                try {
                                    atomicInteger3.wait();
                                } catch (InterruptedException e) {
                                    Thread.currentThread().interrupt();
                                }
                            }
                        }
                        synchronized (streamObserver2) {
                            streamObserver2.onCompleted();
                        }
                        return i;
                    });
                }
            };
        }

        public StreamObserver<Rpc.IngestBulk> ingestBulk(final StreamObserver<Rpc.StatusBulk> streamObserver) {
            return new StreamObserver<Rpc.IngestBulk>() { // from class: cz.o2.proxima.server.IngestServer.IngestService.2
                final Queue<Rpc.Status> statusQueue = new ConcurrentLinkedQueue();
                final AtomicBoolean completed = new AtomicBoolean(false);
                final AtomicInteger inflightRequests = new AtomicInteger();
                final AtomicLong lastFlushNanos = new AtomicLong(System.nanoTime());
                final Rpc.StatusBulk.Builder builder = Rpc.StatusBulk.newBuilder();
                final long maxSleepNanos = 100000000;
                final int maxQueuedStatuses = 500;
                Runnable flushTask;
                ScheduledFuture<?> flushFuture;

                {
                    StreamObserver streamObserver2 = streamObserver;
                    this.flushTask = () -> {
                        try {
                            synchronized (this.builder) {
                                while (this.statusQueue.size() > 500) {
                                    peekQueueToBuilderAndFlush();
                                }
                                if (System.nanoTime() - this.lastFlushNanos.get() >= 100000000) {
                                    while (!this.statusQueue.isEmpty()) {
                                        peekQueueToBuilderAndFlush();
                                    }
                                }
                                if (this.builder.getStatusCount() > 0) {
                                    streamObserver2.onNext(this.builder.build());
                                    this.builder.clear();
                                }
                                if (this.completed.get() && this.inflightRequests.get() == 0 && this.statusQueue.isEmpty()) {
                                    streamObserver2.onCompleted();
                                }
                            }
                        } catch (Exception e) {
                            IngestServer.log.error("Failed to send bulk status", e);
                        }
                    };
                    this.flushFuture = IngestServer.this.scheduler.scheduleAtFixedRate(this.flushTask, 100000000L, 100000000L, TimeUnit.NANOSECONDS);
                }

                private void peekQueueToBuilderAndFlush() {
                    synchronized (this.builder) {
                        this.builder.addStatus(this.statusQueue.poll());
                        if (this.builder.getStatusCount() >= 1000) {
                            flush();
                        }
                    }
                }

                private void flush() {
                    synchronized (this.builder) {
                        this.lastFlushNanos.set(System.nanoTime());
                        Rpc.StatusBulk build = this.builder.build();
                        if (build.getStatusCount() > 0) {
                            streamObserver.onNext(build);
                        }
                        this.builder.clear();
                    }
                }

                public void onNext(Rpc.IngestBulk ingestBulk) {
                    Metrics.INGEST_BULK.increment();
                    Metrics.BULK_SIZE.increment(ingestBulk.getIngestCount());
                    this.inflightRequests.addAndGet(ingestBulk.getIngestCount());
                    ingestBulk.getIngestList().stream().forEach(ingest -> {
                        IngestServer.this.processSingleIngest(ingest, status -> {
                            this.statusQueue.add(status);
                            if (this.statusQueue.size() >= 500) {
                                IngestServer.this.scheduler.execute(this.flushTask);
                            }
                            if (this.inflightRequests.decrementAndGet() == 0) {
                                synchronized (this.inflightRequests) {
                                    this.inflightRequests.notify();
                                }
                            }
                        });
                    });
                }

                public void onError(Throwable th) {
                    IngestServer.log.error("Error from client", th);
                    streamObserver.onError(th);
                    this.flushFuture.cancel(true);
                }

                @SuppressFBWarnings(value = {"JLM_JSR166_UTILCONCURRENT_MONITORENTER"}, justification = "The synchronization on `inflighRequests` is used only for waiting before the flush thread finishes (wait() - notify())")
                public void onCompleted() {
                    this.completed.set(true);
                    this.flushFuture.cancel(true);
                    synchronized (this.inflightRequests) {
                        while (this.inflightRequests.get() != 0) {
                            try {
                                this.inflightRequests.wait(100L);
                            } catch (InterruptedException e) {
                                IngestServer.log.warn("Interrupted while waiting to send responses to client");
                            }
                        }
                    }
                    while (!this.statusQueue.isEmpty()) {
                        peekQueueToBuilderAndFlush();
                    }
                    flush();
                    streamObserver.onCompleted();
                }
            };
        }
    }

    /* loaded from: input_file:cz/o2/proxima/server/IngestServer$RetrieveService.class */
    public class RetrieveService extends RetrieveServiceGrpc.RetrieveServiceImplBase {

        /* loaded from: input_file:cz/o2/proxima/server/IngestServer$RetrieveService$Status.class */
        private class Status extends Exception {
            final int status;
            final String message;

            Status(int i, String str) {
                this.status = i;
                this.message = str;
            }
        }

        public RetrieveService() {
        }

        public void listAttributes(Rpc.ListRequest listRequest, StreamObserver<Rpc.ListResponse> streamObserver) {
            try {
                Metrics.LIST_REQUESTS.increment();
                IngestServer.log.info("Processing listAttributes {}", TextFormat.shortDebugString(listRequest));
                if (listRequest.getEntity().isEmpty() || listRequest.getKey().isEmpty() || listRequest.getWildcardPrefix().isEmpty()) {
                    throw new Status(400, "Missing some required fields");
                }
                AttributeDescriptor attributeDescriptor = (AttributeDescriptor) ((EntityDescriptor) IngestServer.this.repo.findEntity(listRequest.getEntity()).orElseThrow(() -> {
                    return new Status(404, "Entity " + listRequest.getEntity() + " not found");
                })).findAttribute(listRequest.getWildcardPrefix() + ".*").orElseThrow(() -> {
                    return new Status(404, "Entity " + listRequest.getEntity() + " does not have wildcard attribute " + listRequest.getWildcardPrefix());
                });
                RandomAccessReader randomAccessReader = (RandomAccessReader) IngestServer.this.repo.getFamiliesForAttribute(attributeDescriptor).stream().filter(attributeFamilyDescriptor -> {
                    return attributeFamilyDescriptor.getRandomAccessReader().isPresent();
                }).map(attributeFamilyDescriptor2 -> {
                    return (RandomAccessReader) attributeFamilyDescriptor2.getRandomAccessReader().get();
                }).findAny().orElseThrow(() -> {
                    return new Status(400, "Attribute " + attributeDescriptor + " has no random reader");
                });
                Rpc.ListResponse.Builder status = Rpc.ListResponse.newBuilder().setStatus(200);
                randomAccessReader.scanWildcard(listRequest.getKey(), attributeDescriptor, randomAccessReader.fetchOffset(RandomAccessReader.Listing.ATTRIBUTE, listRequest.getOffset()), listRequest.getLimit() > 0 ? listRequest.getLimit() : -1, keyValue -> {
                    status.addValue(Rpc.ListResponse.AttrValue.newBuilder().setAttribute(keyValue.getAttribute()).setValue(ByteString.copyFrom(keyValue.getValueBytes())));
                });
                streamObserver.onNext(status.build());
                streamObserver.onCompleted();
            } catch (Status e) {
                streamObserver.onNext(Rpc.ListResponse.newBuilder().setStatus(e.status).setStatusMessage(e.message).build());
                streamObserver.onCompleted();
            } catch (Exception e2) {
                IngestServer.log.error("Failed to process request {}", listRequest, e2);
                streamObserver.onNext(Rpc.ListResponse.newBuilder().setStatus(500).setStatusMessage(e2.getMessage()).build());
                streamObserver.onCompleted();
            }
        }

        public void get(Rpc.GetRequest getRequest, StreamObserver<Rpc.GetResponse> streamObserver) {
            Metrics.GET_REQUESTS.increment();
            IngestServer.log.info("Processing get {}", TextFormat.shortDebugString(getRequest));
            try {
                if (getRequest.getEntity().isEmpty() || getRequest.getKey().isEmpty() || getRequest.getAttribute().isEmpty()) {
                    throw new Status(400, "Missing some required fields");
                }
                AttributeDescriptor attributeDescriptor = (AttributeDescriptor) ((EntityDescriptor) IngestServer.this.repo.findEntity(getRequest.getEntity()).orElseThrow(() -> {
                    return new Status(404, "Entity " + getRequest.getEntity() + " not found");
                })).findAttribute(getRequest.getAttribute()).orElseThrow(() -> {
                    return new Status(404, "Entity " + getRequest.getEntity() + " does not have attribute " + getRequest.getAttribute());
                });
                streamObserver.onNext(Rpc.GetResponse.newBuilder().setStatus(200).setValue(ByteString.copyFrom(((KeyValue) ((RandomAccessReader) IngestServer.this.repo.getFamiliesForAttribute(attributeDescriptor).stream().filter(attributeFamilyDescriptor -> {
                    return attributeFamilyDescriptor.getRandomAccessReader().isPresent();
                }).map(attributeFamilyDescriptor2 -> {
                    return (RandomAccessReader) attributeFamilyDescriptor2.getRandomAccessReader().get();
                }).findAny().orElseThrow(() -> {
                    return new Status(400, "Attribute " + attributeDescriptor + " has no random reader");
                })).get(getRequest.getKey(), getRequest.getAttribute(), attributeDescriptor).orElseThrow(() -> {
                    return new Status(404, "Key " + getRequest.getKey() + " and/or attribute " + getRequest.getAttribute() + " not found");
                })).getValueBytes())).build());
                streamObserver.onCompleted();
            } catch (Status e) {
                streamObserver.onNext(Rpc.GetResponse.newBuilder().setStatus(e.status).setStatusMessage(e.message).build());
                streamObserver.onCompleted();
            } catch (Exception e2) {
                IngestServer.log.error("Failed to process request {}", getRequest, e2);
                streamObserver.onNext(Rpc.GetResponse.newBuilder().setStatus(500).setStatusMessage(e2.getMessage()).build());
                streamObserver.onCompleted();
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        (strArr.length == 0 ? new IngestServer(ConfigFactory.load().resolve()) : new IngestServer(ConfigFactory.parseFile(new File(strArr[0])).resolve())).run();
    }

    protected IngestServer(Config config) {
        this.cfg = config;
        this.repo = Repository.of(config);
        if (this.repo.isEmpty()) {
            throw new IllegalArgumentException("No valid entities found in provided config!");
        }
        this.ignoreErrors = config.hasPath(Constants.CFG_IGNORE_ERRORS) ? config.getBoolean(Constants.CFG_IGNORE_ERRORS) : false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processSingleIngest(Rpc.Ingest ingest, Consumer<Rpc.Status> consumer) {
        log.info("Processing input ingest {}", TextFormat.shortDebugString(ingest));
        Metrics.INGESTS.increment();
        try {
            if (!writeRequest(ingest, consumer)) {
                Metrics.INVALID_REQUEST.increment();
            }
        } catch (Exception e) {
            log.error("Error processing user request {}", ingest, e);
            consumer.accept(status(ingest.getUuid(), 500, e.getMessage()));
        }
    }

    private boolean writeRequest(Rpc.Ingest ingest, Consumer<Rpc.Status> consumer) throws IOException {
        if (Strings.isNullOrEmpty(ingest.getKey()) || Strings.isNullOrEmpty(ingest.getEntity()) || Strings.isNullOrEmpty(ingest.getAttribute())) {
            consumer.accept(status(ingest.getUuid(), 400, "Missing required fields in input message"));
            return false;
        }
        Optional findEntity = this.repo.findEntity(ingest.getEntity());
        if (!findEntity.isPresent()) {
            consumer.accept(notFound(ingest.getUuid(), "Entity " + ingest.getEntity() + " not found"));
            return false;
        }
        Optional findAttribute = ((EntityDescriptor) findEntity.get()).findAttribute(ingest.getAttribute());
        if (findAttribute.isPresent()) {
            return ingestRequest(toStreamElement(ingest, (EntityDescriptor) findEntity.get(), (AttributeDescriptor) findAttribute.get()), ingest.getUuid(), consumer);
        }
        consumer.accept(notFound(ingest.getUuid(), "Attribute " + ingest.getAttribute() + " of entity " + ((EntityDescriptor) findEntity.get()).getName() + " not found"));
        return false;
    }

    private static StreamElement toStreamElement(Rpc.Ingest ingest, EntityDescriptor entityDescriptor, AttributeDescriptor attributeDescriptor) {
        long currentTimeMillis = ingest.getStamp() == 0 ? System.currentTimeMillis() : ingest.getStamp();
        return ingest.getDelete() ? (attributeDescriptor.isWildcard() && attributeDescriptor.getName().equals(ingest.getAttribute())) ? StreamElement.deleteWildcard(entityDescriptor, attributeDescriptor, ingest.getUuid(), ingest.getKey(), currentTimeMillis) : StreamElement.delete(entityDescriptor, attributeDescriptor, ingest.getUuid(), ingest.getKey(), ingest.getAttribute(), currentTimeMillis) : StreamElement.update(entityDescriptor, attributeDescriptor, ingest.getUuid(), ingest.getKey(), ingest.getAttribute(), currentTimeMillis, ingest.getValue().toByteArray());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean ingestRequest(StreamElement streamElement, String str, Consumer<Rpc.Status> consumer) throws IOException {
        EntityDescriptor entityDescriptor = streamElement.getEntityDescriptor();
        AttributeDescriptor attributeDescriptor = streamElement.getAttributeDescriptor();
        OnlineAttributeWriter writer = attributeDescriptor.getWriter();
        if ((writer == null ? null : writer.online()) == null) {
            log.warn("Missing writer for request {}", streamElement);
            consumer.accept(status(str, 503, "No writer for attribute " + attributeDescriptor.getName()));
            return false;
        }
        if (!(streamElement.isDelete() || attributeDescriptor.getValueSerializer().isValid(streamElement.getValue()))) {
            log.info("Request {} is not valid", streamElement);
            consumer.accept(status(str, 412, "Invalid scheme for " + entityDescriptor.getName() + "." + attributeDescriptor.getName()));
            return false;
        }
        if (!streamElement.isDelete()) {
            Metrics.UPDATE_REQUESTS.increment();
        } else if (streamElement.isDeleteWildcard()) {
            Metrics.DELETE_WILDCARD_REQUESTS.increment();
        } else {
            Metrics.DELETE_REQUESTS.increment();
        }
        Metrics.COMMIT_LOG_APPEND.increment();
        log.debug("Writing request {} to commit log {}", streamElement, writer.getURI());
        writer.write(streamElement, (z, th) -> {
            if (z) {
                consumer.accept(ok(str));
            } else {
                consumer.accept(status(str, 500, th.getMessage()));
            }
        });
        return true;
    }

    private Rpc.Status notFound(String str, String str2) {
        return Rpc.Status.newBuilder().setUuid(str).setStatus(404).setStatusMessage(str2).build();
    }

    private Rpc.Status ok(String str) {
        return Rpc.Status.newBuilder().setStatus(200).setUuid(str).build();
    }

    private Rpc.Status status(String str, int i, String str2) {
        return Rpc.Status.newBuilder().setUuid(str).setStatus(i).setStatusMessage(str2).build();
    }

    private void run() throws Exception {
        final Server build = ServerBuilder.forPort(this.cfg.hasPath(Constants.CFG_PORT) ? this.cfg.getInt(Constants.CFG_PORT) : Constants.DEFALT_PORT).executor(this.executor).addService(new IngestService()).addService(new RetrieveService()).build();
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: cz.o2.proxima.server.IngestServer.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                IngestServer.log.info("Gracefully shuting down server.");
                build.shutdown();
            }
        });
        Metrics.register();
        startConsumerThreads();
        try {
            build.start();
            log.info("Successfully started server 0.0.0.0:{}", Integer.valueOf(build.getPort()));
            build.awaitTermination();
            log.info("Server shutdown.");
        } catch (Exception e) {
            die("Failed to start the server", e);
        }
    }

    protected void startConsumerThreads() throws InterruptedException {
        Map<AttributeFamilyDescriptor, Set<AttributeFamilyDescriptor>> indexFamilyToCommitLogs = indexFamilyToCommitLogs();
        log.info("Starting consumer threads for familyToCommitLog {}", indexFamilyToCommitLogs);
        indexFamilyToCommitLogs.forEach((attributeFamilyDescriptor, set) -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                CommitLogReader commitLogReader = (CommitLogReader) ((AttributeFamilyDescriptor) it.next()).getCommitLogReader().orElseThrow(() -> {
                    return new IllegalStateException("Failed validation on consistency of attribute families. Fix code!");
                });
                if (attributeFamilyDescriptor.getAccess().isReadonly()) {
                    log.debug("Not starting thread for read-only family {}", attributeFamilyDescriptor);
                } else {
                    AttributeWriterBase attributeWriterBase = (AttributeWriterBase) attributeFamilyDescriptor.getWriter().get();
                    StorageFilter filter = attributeFamilyDescriptor.getFilter();
                    Set<AttributeDescriptor<?>> set = (Set) attributeFamilyDescriptor.getAttributes().stream().collect(Collectors.toSet());
                    String str = "consumer-" + attributeFamilyDescriptor.getName();
                    Thread.currentThread().setName(str);
                    registerWriterTo(str, commitLogReader, set, filter, attributeWriterBase, this.retryPolicy);
                    log.info("Started consumer thread {} consuming from log {} with URI {} into {} attributes {}", new Object[]{str, commitLogReader, commitLogReader.getURI(), attributeWriterBase.getURI(), set});
                }
            }
        });
        this.repo.getTransformations().forEach((str, transformationDescriptor) -> {
            runTransformer(str, transformationDescriptor);
        });
    }

    /* JADX WARN: Type inference failed for: r0v22, types: [cz.o2.proxima.server.IngestServer$2] */
    private void runTransformer(String str, TransformationDescriptor transformationDescriptor) {
        AttributeFamilyDescriptor attributeFamilyDescriptor = (AttributeFamilyDescriptor) transformationDescriptor.getAttributes().stream().map(attributeDescriptor -> {
            return (Set) this.repo.getFamiliesForAttribute(attributeDescriptor).stream().filter(attributeFamilyDescriptor2 -> {
                return attributeFamilyDescriptor2.getAccess().canReadCommitLog();
            }).collect(Collectors.toSet());
        }).reduce(Sets::intersection).filter(set -> {
            return !set.isEmpty();
        }).map(set2 -> {
            return (AttributeFamilyDescriptor) set2.stream().filter(attributeFamilyDescriptor2 -> {
                return attributeFamilyDescriptor2.getCommitLogReader().isPresent();
            }).findAny().orElse(null);
        }).filter(attributeFamilyDescriptor2 -> {
            return attributeFamilyDescriptor2 != null;
        }).orElseThrow(() -> {
            return new IllegalArgumentException("Cannot obtain attribute family for " + transformationDescriptor.getAttributes());
        });
        final Transformation transformation = transformationDescriptor.getTransformation();
        final StorageFilter filter = transformationDescriptor.getFilter();
        String str2 = "transformer-" + str;
        CommitLogReader commitLogReader = (CommitLogReader) attributeFamilyDescriptor.getCommitLogReader().get();
        new RetryableLogObserver(3, str2, commitLogReader) { // from class: cz.o2.proxima.server.IngestServer.2
            protected void failure() {
                IngestServer.this.die(String.format("Failed to transform using %s. Bailing out.", transformation));
            }

            public boolean onNextInternal(StreamElement streamElement, LogObserver.ConfirmCallback confirmCallback) {
                if (!filter.apply(streamElement)) {
                    IngestServer.log.debug("Skipping transformation of {} by filter", streamElement);
                    return true;
                }
                AtomicInteger atomicInteger = new AtomicInteger(1);
                try {
                    transformation.apply(streamElement, streamElement2 -> {
                        atomicInteger.incrementAndGet();
                        try {
                            IngestServer.log.info("Writing transformed element {}", streamElement2);
                            IngestServer.this.ingestRequest(streamElement2, streamElement2.getUuid(), status -> {
                                if (status.getStatus() != 200) {
                                    atomicInteger.set(-1);
                                    confirmCallback.fail(new RuntimeException(String.format("Received invalid status %d:%s", Integer.valueOf(status.getStatus()), status.getStatusMessage())));
                                } else if (atomicInteger.decrementAndGet() == 0) {
                                    confirmCallback.confirm();
                                }
                            });
                        } catch (Exception e) {
                            atomicInteger.set(-1);
                            confirmCallback.fail(e);
                        }
                    });
                    if (atomicInteger.decrementAndGet() == 0) {
                        confirmCallback.confirm();
                    }
                    return true;
                } catch (Exception e) {
                    atomicInteger.set(-1);
                    confirmCallback.fail(e);
                    return true;
                }
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 1658851215:
                        if (implMethodName.equals("lambda$onNextInternal$cb6075e8$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/repository/Transformation$Collector") && serializedLambda.getFunctionalInterfaceMethodName().equals("collect") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/server/IngestServer$2") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lcz/o2/proxima/storage/commitlog/LogObserver$ConfirmCallback;Lcz/o2/proxima/storage/StreamElement;)V")) {
                            AnonymousClass2 anonymousClass2 = (AnonymousClass2) serializedLambda.getCapturedArg(0);
                            AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(1);
                            LogObserver.ConfirmCallback confirmCallback = (LogObserver.ConfirmCallback) serializedLambda.getCapturedArg(2);
                            return streamElement2 -> {
                                atomicInteger.incrementAndGet();
                                try {
                                    IngestServer.log.info("Writing transformed element {}", streamElement2);
                                    IngestServer.this.ingestRequest(streamElement2, streamElement2.getUuid(), status -> {
                                        if (status.getStatus() != 200) {
                                            atomicInteger.set(-1);
                                            confirmCallback.fail(new RuntimeException(String.format("Received invalid status %d:%s", Integer.valueOf(status.getStatus()), status.getStatusMessage())));
                                        } else if (atomicInteger.decrementAndGet() == 0) {
                                            confirmCallback.confirm();
                                        }
                                    });
                                } catch (Exception e) {
                                    atomicInteger.set(-1);
                                    confirmCallback.fail(e);
                                }
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        }.start();
        log.info("Started transformer {} reading from {} using {}", new Object[]{str2, commitLogReader.getURI(), transformation.getClass()});
    }

    private Map<AttributeFamilyDescriptor, Set<AttributeFamilyDescriptor>> indexFamilyToCommitLogs() {
        Map map = (Map) this.repo.getAllFamilies().filter(attributeFamilyDescriptor -> {
            return attributeFamilyDescriptor.getType() == StorageType.PRIMARY;
        }).flatMap(attributeFamilyDescriptor2 -> {
            return attributeFamilyDescriptor2.getAttributes().stream().map(attributeDescriptor -> {
                return Pair.of(attributeDescriptor, attributeFamilyDescriptor2);
            });
        }).collect(Collectors.toMap((v0) -> {
            return v0.getFirst();
        }, (v0) -> {
            return v0.getSecond();
        }));
        return (Map) this.repo.getAllFamilies().filter(attributeFamilyDescriptor3 -> {
            return attributeFamilyDescriptor3.getType() == StorageType.REPLICA;
        }).map(attributeFamilyDescriptor4 -> {
            return Pair.of(attributeFamilyDescriptor4, attributeFamilyDescriptor4.getAttributes().stream().map(attributeDescriptor -> {
                AttributeFamilyDescriptor attributeFamilyDescriptor4 = (AttributeFamilyDescriptor) map.get(attributeDescriptor);
                if (attributeFamilyDescriptor4 != null || attributeDescriptor.getWriter() == null) {
                    return attributeFamilyDescriptor4;
                }
                throw new IllegalStateException("Missing source commit log family for " + attributeDescriptor);
            }).filter(attributeFamilyDescriptor4 -> {
                return attributeFamilyDescriptor4 != null;
            }).collect(Collectors.toSet()));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getFirst();
        }, (v0) -> {
            return v0.getSecond();
        }));
    }

    private void registerWriterTo(String str, CommitLogReader commitLogReader, Set<AttributeDescriptor<?>> set, StorageFilter storageFilter, AttributeWriterBase attributeWriterBase, RetryPolicy retryPolicy) {
        log.info("Registering {} writer to {} from commit log {}", new Object[]{attributeWriterBase.getType(), attributeWriterBase.getURI(), commitLogReader.getURI()});
        (attributeWriterBase.getType() == AttributeWriterBase.Type.ONLINE ? getOnlineWriter(str, commitLogReader, set, storageFilter, attributeWriterBase.online(), retryPolicy) : getBulkWriter(str, commitLogReader, set, storageFilter, attributeWriterBase.bulk(), retryPolicy)).start();
    }

    private AbstractRetryableLogObserver getBulkWriter(String str, final CommitLogReader commitLogReader, final Set<AttributeDescriptor<?>> set, final StorageFilter storageFilter, final BulkAttributeWriter bulkAttributeWriter, RetryPolicy retryPolicy) {
        return new RetryableBulkObserver(3, str, commitLogReader) { // from class: cz.o2.proxima.server.IngestServer.3
            public boolean onNextInternal(StreamElement streamElement, BulkLogObserver.BulkCommitter bulkCommitter) {
                return writeInternal(streamElement, bulkCommitter);
            }

            private boolean writeInternal(StreamElement streamElement, BulkLogObserver.BulkCommitter bulkCommitter) {
                boolean contains = set.contains(streamElement.getAttributeDescriptor());
                IngestServer.log.debug("Received new ingest element {}", streamElement);
                if (contains && storageFilter.apply(streamElement)) {
                    SyncFailsafe with = Failsafe.with(IngestServer.this.retryPolicy);
                    BulkAttributeWriter bulkAttributeWriter2 = bulkAttributeWriter;
                    with.run(() -> {
                        IngestServer.log.debug("Writing element {} into {}", streamElement, bulkAttributeWriter2);
                        bulkAttributeWriter2.write(streamElement, (z, th) -> {
                            if (z) {
                                if (streamElement.isDelete()) {
                                    Metrics.NON_COMMIT_LOG_DELETES.increment();
                                } else {
                                    Metrics.NON_COMMIT_LOG_UPDATES.increment();
                                }
                                bulkCommitter.commit();
                                return;
                            }
                            IngestServer.log.error("Failed to write ingest {} to {}", new Object[]{streamElement, bulkAttributeWriter2.getURI(), th});
                            Metrics.NON_COMMIT_WRITES_RETRIES.increment();
                            if (!IngestServer.this.ignoreErrors) {
                                bulkCommitter.fail(th);
                            } else {
                                IngestServer.log.error("Retries exhausted trying to ingest {} to {}. Configured to ignore. Skipping.", streamElement, bulkAttributeWriter2.getURI());
                                bulkCommitter.commit();
                            }
                        });
                    });
                    return true;
                }
                Metrics.COMMIT_UPDATE_DISCARDED.increment();
                Logger logger = IngestServer.log;
                Object[] objArr = new Object[5];
                objArr[0] = streamElement;
                objArr[1] = bulkAttributeWriter.getURI();
                objArr[2] = contains ? "applied filter" : "invalid attribute";
                objArr[3] = set;
                objArr[4] = storageFilter.getClass();
                logger.debug("Discarding write of {} to {} because of {}, with allowedAttributes {} and filter class {}", objArr);
                return true;
            }

            protected void failure() {
                IngestServer.this.die(String.format("Too many errors retrying the consumption of commit log %s. Killing self.", commitLogReader.getURI()));
            }

            public void onRestart() {
                IngestServer.log.info("Restarting bulk processing of {}, rollbacking the writer", bulkAttributeWriter.getURI());
                bulkAttributeWriter.rollback();
            }
        };
    }

    private AbstractRetryableLogObserver getOnlineWriter(String str, final CommitLogReader commitLogReader, final Set<AttributeDescriptor<?>> set, final StorageFilter storageFilter, final OnlineAttributeWriter onlineAttributeWriter, RetryPolicy retryPolicy) {
        return new RetryableLogObserver(3, str, commitLogReader) { // from class: cz.o2.proxima.server.IngestServer.4
            public boolean onNextInternal(StreamElement streamElement, LogObserver.ConfirmCallback confirmCallback) {
                boolean contains = set.contains(streamElement.getAttributeDescriptor());
                IngestServer.log.debug("Received new ingest element {}", streamElement);
                if (contains && storageFilter.apply(streamElement)) {
                    SyncFailsafe with = Failsafe.with(IngestServer.this.retryPolicy);
                    OnlineAttributeWriter onlineAttributeWriter2 = onlineAttributeWriter;
                    with.run(() -> {
                        IngestServer.log.debug("Writing element {} into {}", streamElement, onlineAttributeWriter2);
                        onlineAttributeWriter2.write(streamElement, (z, th) -> {
                            if (z) {
                                if (streamElement.isDelete()) {
                                    Metrics.NON_COMMIT_LOG_DELETES.increment();
                                } else {
                                    Metrics.NON_COMMIT_LOG_UPDATES.increment();
                                }
                                confirmCallback.confirm();
                                return;
                            }
                            IngestServer.log.error("Failed to write ingest {} to {}", new Object[]{streamElement, onlineAttributeWriter2.getURI(), th});
                            Metrics.NON_COMMIT_WRITES_RETRIES.increment();
                            if (!IngestServer.this.ignoreErrors) {
                                confirmCallback.fail(th);
                            } else {
                                IngestServer.log.error("Retries exhausted trying to ingest {} to {}. Configured to ignore. Skipping.", streamElement, onlineAttributeWriter2.getURI());
                                confirmCallback.confirm();
                            }
                        });
                    });
                    return true;
                }
                Metrics.COMMIT_UPDATE_DISCARDED.increment();
                Logger logger = IngestServer.log;
                Object[] objArr = new Object[5];
                objArr[0] = streamElement;
                objArr[1] = onlineAttributeWriter.getURI();
                objArr[2] = contains ? "applied filter" : "invalid attribute";
                objArr[3] = set;
                objArr[4] = storageFilter.getClass();
                logger.debug("Discarding write of {} to {} because of {}, with allowedAttributes {} and filter class {}", objArr);
                confirmCallback.confirm();
                return true;
            }

            protected void failure() {
                IngestServer.this.die(String.format("Too many errors retrying the consumption of commit log %s. Killing self.", commitLogReader.getURI()));
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void die(String str) {
        die(str, null);
    }

    private void die(String str, @Nullable Throwable th) {
        try {
            Thread.sleep((long) (Math.random() * 10000.0d));
        } catch (InterruptedException e) {
        }
        if (th == null) {
            log.error(str);
        } else {
            log.error(str, th);
        }
        System.exit(1);
    }

    public int getMinCores() {
        getClass();
        return 2;
    }

    public Executor getExecutor() {
        return this.executor;
    }

    public ScheduledExecutorService getScheduler() {
        return this.scheduler;
    }

    public Repository getRepo() {
        return this.repo;
    }

    public Config getCfg() {
        return this.cfg;
    }

    public boolean isIgnoreErrors() {
        return this.ignoreErrors;
    }

    public RetryPolicy getRetryPolicy() {
        return this.retryPolicy;
    }
}
