package org.factcast.server.grpc;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.InputStream;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import lombok.NonNull;
import net.devh.boot.grpc.server.service.GrpcService;
import org.factcast.core.store.FactStore;
import org.factcast.core.store.StateToken;
import org.factcast.core.subscription.SubscriptionRequestTO;
import org.factcast.grpc.api.Capabilities;
import org.factcast.grpc.api.CompressionCodecs;
import org.factcast.grpc.api.ConditionalPublishRequest;
import org.factcast.grpc.api.StateForRequest;
import org.factcast.grpc.api.conv.ProtoConverter;
import org.factcast.grpc.api.conv.ProtocolVersion;
import org.factcast.grpc.api.conv.ServerConfig;
import org.factcast.grpc.api.gen.FactStoreProto;
import org.factcast.grpc.api.gen.RemoteFactStoreGrpc;
import org.factcast.server.grpc.auth.FactCastRole;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.annotation.Secured;

@GrpcService
@Secured({FactCastRole.READ})
/* loaded from: input_file:org/factcast/server/grpc/FactStoreGrpcService.class */
public class FactStoreGrpcService extends RemoteFactStoreGrpc.RemoteFactStoreImplBase {
    private final FactStore store;
    private final CompressionCodecs codecs = new CompressionCodecs();
    private final ProtoConverter converter = new ProtoConverter();

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(FactStoreGrpcService.class);
    static final ProtocolVersion PROTOCOL_VERSION = ProtocolVersion.of(1, 1, 0);
    static final AtomicLong subscriptionIdStore = new AtomicLong();

    public void fetchById(FactStoreProto.MSG_UUID msg_uuid, StreamObserver<FactStoreProto.MSG_OptionalFact> streamObserver) {
        try {
            enableResponseCompression(streamObserver);
            UUID fromProto = this.converter.fromProto(msg_uuid);
            log.trace("fetchById {}", fromProto);
            Optional fetchById = this.store.fetchById(fromProto);
            log.debug("fetchById({}) was {}found", fromProto, fetchById.map(fact -> {
                return "";
            }).orElse("NOT "));
            streamObserver.onNext(this.converter.toProto(fetchById));
            streamObserver.onCompleted();
        } catch (Throwable th) {
            streamObserver.onError(th);
        }
    }

    @Secured({FactCastRole.WRITE})
    public void publish(@NonNull FactStoreProto.MSG_Facts mSG_Facts, StreamObserver<FactStoreProto.MSG_Empty> streamObserver) {
        if (mSG_Facts == null) {
            throw new NullPointerException("request is marked non-null but is null");
        }
        Stream stream = mSG_Facts.getFactList().stream();
        ProtoConverter protoConverter = this.converter;
        Objects.requireNonNull(protoConverter);
        List list = (List) stream.map(protoConverter::fromProto).collect(Collectors.toList());
        int size = list.size();
        log.debug("publish {} fact{}", Integer.valueOf(size), size > 1 ? "s" : "");
        log.trace("publish {}", list);
        try {
            log.trace("store publish {}", list);
            this.store.publish(list);
            log.trace("store publish done");
            streamObserver.onNext(FactStoreProto.MSG_Empty.getDefaultInstance());
            streamObserver.onCompleted();
        } catch (Throwable th) {
            log.error("Problem while publishing: ", th);
            streamObserver.onError(th);
        }
    }

    public void subscribe(FactStoreProto.MSG_SubscriptionRequest mSG_SubscriptionRequest, StreamObserver<FactStoreProto.MSG_Notification> streamObserver) {
        enableResponseCompression(streamObserver);
        SubscriptionRequestTO fromProto = this.converter.fromProto(mSG_SubscriptionRequest);
        resetDebugInfo(fromProto);
        BlockingStreamObserver blockingStreamObserver = new BlockingStreamObserver(fromProto.toString(), (ServerCallStreamObserver) streamObserver);
        boolean idOnly = fromProto.idOnly();
        this.store.subscribe(fromProto, new GrpcObserverAdapter(fromProto.toString(), blockingStreamObserver, fact -> {
            return idOnly ? this.converter.createNotificationFor(fact.id()) : this.converter.createNotificationFor(fact);
        }));
    }

    private void enableResponseCompression(StreamObserver<?> streamObserver) {
        if (streamObserver instanceof ServerCallStreamObserver) {
            ((ServerCallStreamObserver) streamObserver).setMessageCompression(true);
        }
    }

    public void handshake(FactStoreProto.MSG_Empty mSG_Empty, StreamObserver<FactStoreProto.MSG_ServerConfig> streamObserver) {
        streamObserver.onNext(this.converter.toProto(ServerConfig.of(PROTOCOL_VERSION, collectProperties())));
        streamObserver.onCompleted();
    }

    private Map<String, String> collectProperties() {
        HashMap<String, String> hashMap = new HashMap<>();
        retrieveImplementationVersion(hashMap);
        hashMap.put(Capabilities.CODECS.toString(), this.codecs.available());
        log.info("Handshake properties: {} ", hashMap);
        return hashMap;
    }

    @VisibleForTesting
    void retrieveImplementationVersion(HashMap<String, String> hashMap) {
        String str = "UNKNOWN";
        URL projectProperties = getProjectProperties();
        Properties properties = new Properties();
        if (projectProperties != null) {
            try {
                InputStream openStream = projectProperties.openStream();
                if (openStream != null) {
                    properties.load(openStream);
                    String property = properties.getProperty("version");
                    if (property != null) {
                        str = property;
                    }
                }
            } catch (Exception e) {
            }
        }
        hashMap.put(Capabilities.FACTCAST_IMPL_VERSION.toString(), str);
    }

    @VisibleForTesting
    URL getProjectProperties() {
        return FactStoreGrpcService.class.getResource("/META-INF/maven/org.factcast/factcast-server-grpc/pom.properties");
    }

    private void resetDebugInfo(SubscriptionRequestTO subscriptionRequestTO) {
        String str = "grpc-sub#" + subscriptionIdStore.incrementAndGet();
        log.info("subscribing {} for {} defined as {}", new Object[]{str, subscriptionRequestTO, subscriptionRequestTO.dump()});
        subscriptionRequestTO.debugInfo(str);
    }

    public void serialOf(FactStoreProto.MSG_UUID msg_uuid, StreamObserver<FactStoreProto.MSG_OptionalSerial> streamObserver) {
        streamObserver.onNext(this.converter.toProto(this.store.serialOf(this.converter.fromProto(msg_uuid))));
        streamObserver.onCompleted();
    }

    public void enumerateNamespaces(FactStoreProto.MSG_Empty mSG_Empty, StreamObserver<FactStoreProto.MSG_StringSet> streamObserver) {
        try {
            streamObserver.onNext(this.converter.toProto(this.store.enumerateNamespaces()));
            streamObserver.onCompleted();
        } catch (Throwable th) {
            streamObserver.onError(th);
        }
    }

    public void enumerateTypes(FactStoreProto.MSG_String mSG_String, StreamObserver<FactStoreProto.MSG_StringSet> streamObserver) {
        enableResponseCompression(streamObserver);
        try {
            streamObserver.onNext(this.converter.toProto(this.store.enumerateTypes(this.converter.fromProto(mSG_String))));
            streamObserver.onCompleted();
        } catch (Throwable th) {
            streamObserver.onError(th);
        }
    }

    @Secured({FactCastRole.WRITE})
    public void publishConditional(FactStoreProto.MSG_ConditionalPublishRequest mSG_ConditionalPublishRequest, StreamObserver<FactStoreProto.MSG_ConditionalPublishResult> streamObserver) {
        try {
            ConditionalPublishRequest fromProto = this.converter.fromProto(mSG_ConditionalPublishRequest);
            streamObserver.onNext(this.converter.toProto(this.store.publishIfUnchanged(fromProto.facts(), fromProto.token())));
            streamObserver.onCompleted();
        } catch (Throwable th) {
            streamObserver.onError(th);
        }
    }

    @Secured({FactCastRole.WRITE})
    public void stateFor(FactStoreProto.MSG_StateForRequest mSG_StateForRequest, StreamObserver<FactStoreProto.MSG_UUID> streamObserver) {
        try {
            StateForRequest fromProto = this.converter.fromProto(mSG_StateForRequest);
            streamObserver.onNext(this.converter.toProto(this.store.stateFor(fromProto.aggIds(), Optional.ofNullable(fromProto.ns())).uuid()));
            streamObserver.onCompleted();
        } catch (Throwable th) {
            streamObserver.onError(th);
        }
    }

    @Secured({FactCastRole.WRITE})
    public void invalidate(FactStoreProto.MSG_UUID msg_uuid, StreamObserver<FactStoreProto.MSG_Empty> streamObserver) {
        try {
            this.store.invalidate(new StateToken(this.converter.fromProto(msg_uuid)));
            streamObserver.onNext(FactStoreProto.MSG_Empty.getDefaultInstance());
            streamObserver.onCompleted();
        } catch (Throwable th) {
            streamObserver.onError(th);
        }
    }

    public void currentTime(FactStoreProto.MSG_Empty mSG_Empty, StreamObserver<FactStoreProto.MSG_CurrentDatabaseTime> streamObserver) {
        try {
            streamObserver.onNext(this.converter.toProto(this.store.currentTime()));
            streamObserver.onCompleted();
        } catch (Throwable th) {
            streamObserver.onError(th);
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public FactStoreGrpcService(FactStore factStore) {
        this.store = factStore;
    }
}
