package org.factcast.server.grpc;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import lombok.NonNull;
import org.factcast.core.Fact;
import org.factcast.core.subscription.FactStreamInfo;
import org.factcast.core.subscription.observer.FactObserver;
import org.factcast.grpc.api.conv.ProtoConverter;
import org.factcast.grpc.api.gen.FactStoreProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/factcast/server/grpc/GrpcObserverAdapter.class */
public class GrpcObserverAdapter implements FactObserver {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(GrpcObserverAdapter.class);
    private final ProtoConverter converter;

    @NonNull
    private final String id;

    @NonNull
    private final StreamObserver<FactStoreProto.MSG_Notification> observer;
    private final int catchupBatchSize;

    @NonNull
    private final ServerExceptionLogger serverExceptionLogger;

    @VisibleForTesting
    private final ServerKeepalive keepalive;
    private final ArrayList<Fact> stagedFacts;
    private final boolean supportsFastForward;
    private final long keepaliveInMilliseconds;
    private final AtomicBoolean caughtUp;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/factcast/server/grpc/GrpcObserverAdapter$ServerKeepalive.class */
    public class ServerKeepalive {
        private Timer t = new Timer("server-keepalive-" + System.currentTimeMillis(), true);

        ServerKeepalive() {
            reschedule();
        }

        @VisibleForTesting
        synchronized void reschedule() {
            if (this.t != null) {
                this.t.schedule(new TimerTask() { // from class: org.factcast.server.grpc.GrpcObserverAdapter.ServerKeepalive.1
                    @Override // java.util.TimerTask, java.lang.Runnable
                    public void run() {
                        GrpcObserverAdapter.this.observer.onNext(GrpcObserverAdapter.this.converter.createKeepaliveNotification());
                        ServerKeepalive.this.reschedule();
                    }
                }, GrpcObserverAdapter.this.keepaliveInMilliseconds);
            }
        }

        @VisibleForTesting
        synchronized void shutdown() {
            this.t.cancel();
        }
    }

    public GrpcObserverAdapter(@NonNull String str, @NonNull StreamObserver<FactStoreProto.MSG_Notification> streamObserver, @NonNull GrpcRequestMetadata grpcRequestMetadata, @NonNull ServerExceptionLogger serverExceptionLogger, long j) {
        this.converter = new ProtoConverter();
        this.caughtUp = new AtomicBoolean(false);
        Objects.requireNonNull(str, "id is marked non-null but is null");
        Objects.requireNonNull(streamObserver, "observer is marked non-null but is null");
        Objects.requireNonNull(grpcRequestMetadata, "meta is marked non-null but is null");
        Objects.requireNonNull(serverExceptionLogger, "serverExceptionLogger is marked non-null but is null");
        this.id = str;
        this.observer = streamObserver;
        this.catchupBatchSize = grpcRequestMetadata.catchupBatch().orElse(1);
        this.supportsFastForward = grpcRequestMetadata.supportsFastForward();
        this.keepaliveInMilliseconds = j;
        this.stagedFacts = new ArrayList<>(this.catchupBatchSize);
        this.serverExceptionLogger = serverExceptionLogger;
        if (j > 0) {
            this.keepalive = new ServerKeepalive();
        } else {
            this.keepalive = null;
        }
    }

    @VisibleForTesting
    GrpcObserverAdapter(@NonNull String str, @NonNull StreamObserver<FactStoreProto.MSG_Notification> streamObserver, @NonNull ServerExceptionLogger serverExceptionLogger) {
        this(str, streamObserver, GrpcRequestMetadata.forTest(), serverExceptionLogger, 0L);
        Objects.requireNonNull(str, "id is marked non-null but is null");
        Objects.requireNonNull(streamObserver, "observer is marked non-null but is null");
        Objects.requireNonNull(serverExceptionLogger, "serverExceptionLogger is marked non-null but is null");
    }

    @VisibleForTesting
    @Deprecated
    GrpcObserverAdapter(@NonNull String str, @NonNull StreamObserver<FactStoreProto.MSG_Notification> streamObserver) {
        this(str, streamObserver, GrpcRequestMetadata.forTest(), new ServerExceptionLogger(), 0L);
        Objects.requireNonNull(str, "id is marked non-null but is null");
        Objects.requireNonNull(streamObserver, "observer is marked non-null but is null");
    }

    @VisibleForTesting
    GrpcObserverAdapter(@NonNull String str, @NonNull StreamObserver<FactStoreProto.MSG_Notification> streamObserver, GrpcRequestMetadata grpcRequestMetadata) {
        this(str, streamObserver, grpcRequestMetadata, new ServerExceptionLogger(), 0L);
        Objects.requireNonNull(str, "id is marked non-null but is null");
        Objects.requireNonNull(streamObserver, "observer is marked non-null but is null");
    }

    @VisibleForTesting
    GrpcObserverAdapter(@NonNull String str, @NonNull StreamObserver<FactStoreProto.MSG_Notification> streamObserver, long j) {
        this(str, streamObserver, GrpcRequestMetadata.forTest(), new ServerExceptionLogger(), j);
        Objects.requireNonNull(str, "id is marked non-null but is null");
        Objects.requireNonNull(streamObserver, "observer is marked non-null but is null");
    }

    @VisibleForTesting
    GrpcObserverAdapter(@NonNull String str, @NonNull StreamObserver<FactStoreProto.MSG_Notification> streamObserver, @NonNull GrpcRequestMetadata grpcRequestMetadata, @NonNull ServerExceptionLogger serverExceptionLogger) {
        this(str, streamObserver, grpcRequestMetadata, serverExceptionLogger, 0L);
        Objects.requireNonNull(str, "id is marked non-null but is null");
        Objects.requireNonNull(streamObserver, "observer is marked non-null but is null");
        Objects.requireNonNull(grpcRequestMetadata, "meta is marked non-null but is null");
        Objects.requireNonNull(serverExceptionLogger, "serverExceptionLogger is marked non-null but is null");
    }

    public void onComplete() {
        disableKeepalive();
        flush();
        log.debug("{} onComplete – sending complete notification", this.id);
        this.observer.onNext(this.converter.createCompleteNotification());
        tryComplete();
    }

    private void disableKeepalive() {
        if (this.keepalive != null) {
            this.keepalive.shutdown();
        }
    }

    public void onError(@NonNull Throwable th) {
        Objects.requireNonNull(th, "e is marked non-null but is null");
        disableKeepalive();
        flush();
        this.serverExceptionLogger.log(th, this.id);
        this.observer.onError(ServerExceptionHelper.translate(th));
    }

    private void tryComplete() {
        try {
            this.observer.onCompleted();
        } catch (Throwable th) {
            log.trace("{} Expected exception on completion {}", this.id, th.getMessage());
        }
    }

    public void onCatchup() {
        flush();
        log.debug("{} onCatchup – sending catchup notification", this.id);
        this.observer.onNext(this.converter.createCatchupNotification());
        this.caughtUp.set(true);
    }

    private void flush() {
        if (this.stagedFacts.isEmpty()) {
            return;
        }
        log.trace("{} flushing batch of {} facts", this.id, Integer.valueOf(this.stagedFacts.size()));
        this.observer.onNext(this.converter.createNotificationFor(this.stagedFacts));
        this.stagedFacts.clear();
    }

    public void onNext(@NonNull Fact fact) {
        Objects.requireNonNull(fact, "element is marked non-null but is null");
        if (this.catchupBatchSize <= 1 || this.caughtUp.get()) {
            this.observer.onNext(this.converter.createNotificationFor(fact));
            return;
        }
        if (this.stagedFacts.size() >= this.catchupBatchSize) {
            flush();
        }
        this.stagedFacts.add(fact);
    }

    public void onFastForward(@NonNull UUID uuid) {
        Objects.requireNonNull(uuid, "factIdToFfwdTo is marked non-null but is null");
        if (this.supportsFastForward) {
            log.debug("{} sending ffwd notification to fact id {}", this.id, uuid);
            this.observer.onNext(this.converter.createNotificationForFastForward(uuid));
        }
    }

    public void onFactStreamInfo(FactStreamInfo factStreamInfo) {
        this.observer.onNext(this.converter.createInfoNotification(factStreamInfo));
    }

    public void shutdown() {
        disableKeepalive();
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    protected ServerKeepalive keepalive() {
        return this.keepalive;
    }
}
