package org.factcast.client.grpc;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import lombok.Generated;
import lombok.NonNull;
import org.factcast.client.grpc.FactCastGrpcClientProperties;
import org.factcast.core.Fact;
import org.factcast.core.FactStreamPosition;
import org.factcast.core.subscription.FactStreamInfo;
import org.factcast.core.subscription.Subscription;
import org.factcast.core.subscription.SubscriptionClosedException;
import org.factcast.core.subscription.SubscriptionRequestTO;
import org.factcast.core.subscription.observer.FactObserver;
import org.factcast.core.util.ExceptionHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/factcast/client/grpc/ResilientGrpcSubscription.class */
public class ResilientGrpcSubscription implements Subscription {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ResilientGrpcSubscription.class);
    private final GrpcFactStore store;
    private final SubscriptionRequestTO originalRequest;
    private final FactObserver originalObserver;
    private final FactObserver delegatingObserver;
    private final AtomicReference<FactStreamPosition> lastPosition = new AtomicReference<>();
    private final SubscriptionHolder currentSubscription = new SubscriptionHolder();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);

    @VisibleForTesting
    private final Resilience resilience;

    /* loaded from: input_file:org/factcast/client/grpc/ResilientGrpcSubscription$DelegatingFactObserver.class */
    class DelegatingFactObserver implements FactObserver {
        DelegatingFactObserver() {
        }

        public void onNext(@NonNull Fact fact) {
            Objects.requireNonNull(fact, "element is marked non-null but is null");
            if (ResilientGrpcSubscription.this.isClosed.get()) {
                ResilientGrpcSubscription.log.warn("Fact arrived after call to .close() [a few of them is ok...]");
            } else {
                ResilientGrpcSubscription.this.originalObserver.onNext(fact);
                ResilientGrpcSubscription.this.lastPosition.set(FactStreamPosition.from(fact));
            }
        }

        public void onCatchup() {
            ResilientGrpcSubscription.this.originalObserver.onCatchup();
        }

        public void onComplete() {
            ResilientGrpcSubscription.this.originalObserver.onComplete();
        }

        public void onFastForward(@NonNull FactStreamPosition factStreamPosition) {
            Objects.requireNonNull(factStreamPosition, "factIdToFfwdTo is marked non-null but is null");
            ResilientGrpcSubscription.this.originalObserver.onFastForward(factStreamPosition);
        }

        public void onError(@NonNull Throwable th) {
            Objects.requireNonNull(th, "exception is marked non-null but is null");
            ResilientGrpcSubscription.log.info("Closing subscription due to onError triggered.  ({})", ResilientGrpcSubscription.this.originalRequest, th);
            ResilientGrpcSubscription.this.closeAndDetachSubscription();
            ResilientGrpcSubscription.this.store.reset();
            if (!ResilientGrpcSubscription.this.resilience.shouldRetry(th)) {
                ResilientGrpcSubscription.this.fail(th);
                return;
            }
            ResilientGrpcSubscription.log.info("Trying to resubscribe ({})", ResilientGrpcSubscription.this.originalRequest);
            ResilientGrpcSubscription.this.resilience.sleepForInterval();
            ResilientGrpcSubscription.this.reConnect();
        }

        public void onFactStreamInfo(@NonNull FactStreamInfo factStreamInfo) {
            Objects.requireNonNull(factStreamInfo, "info is marked non-null but is null");
            ResilientGrpcSubscription.this.originalObserver.onFactStreamInfo(factStreamInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/factcast/client/grpc/ResilientGrpcSubscription$SubscriptionHolder.class */
    public class SubscriptionHolder {
        private final AtomicReference<Subscription> currentSubscription = new AtomicReference<>();

        SubscriptionHolder() {
        }

        @NonNull
        public Subscription getAndBlock() throws TimeoutException {
            return getAndBlock(0L);
        }

        @NonNull
        public Subscription getAndBlock(long j) throws TimeoutException {
            Subscription subscription;
            long currentTimeMillis = System.currentTimeMillis() + j;
            synchronized (this.currentSubscription) {
                do {
                    ResilientGrpcSubscription.this.assertSubscriptionStateNotClosed();
                    if (this.currentSubscription.get() == null) {
                        try {
                            long currentTimeMillis2 = j == 0 ? 0L : currentTimeMillis - System.currentTimeMillis();
                            if (j != 0 && currentTimeMillis2 < 1) {
                                throw new TimeoutException("Timeout while acquiring subscription");
                                break;
                            }
                            this.currentSubscription.wait(currentTimeMillis2);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                } while (this.currentSubscription.get() == null);
                subscription = this.currentSubscription.get();
            }
            return subscription;
        }

        Subscription getAndSet(Subscription subscription) {
            Subscription andSet;
            synchronized (this.currentSubscription) {
                andSet = this.currentSubscription.getAndSet(subscription);
            }
            return andSet;
        }

        void set(Subscription subscription) {
            synchronized (this.currentSubscription) {
                this.currentSubscription.set(subscription);
                this.currentSubscription.notifyAll();
            }
        }

        public Subscription get() {
            Subscription subscription;
            synchronized (this.currentSubscription) {
                subscription = this.currentSubscription.get();
            }
            return subscription;
        }

        public void unblock() {
            synchronized (this.currentSubscription) {
                this.currentSubscription.notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/factcast/client/grpc/ResilientGrpcSubscription$ThrowingBiConsumer.class */
    public interface ThrowingBiConsumer<T, U> {
        void accept(T t, U u) throws TimeoutException;
    }

    public ResilientGrpcSubscription(@NonNull GrpcFactStore grpcFactStore, @NonNull SubscriptionRequestTO subscriptionRequestTO, @NonNull FactObserver factObserver, @NonNull FactCastGrpcClientProperties.ResilienceConfiguration resilienceConfiguration) {
        Objects.requireNonNull(grpcFactStore, "store is marked non-null but is null");
        Objects.requireNonNull(subscriptionRequestTO, "req is marked non-null but is null");
        Objects.requireNonNull(factObserver, "obs is marked non-null but is null");
        Objects.requireNonNull(resilienceConfiguration, "config is marked non-null but is null");
        this.store = grpcFactStore;
        this.resilience = new Resilience(resilienceConfiguration);
        this.originalObserver = factObserver;
        this.originalRequest = subscriptionRequestTO;
        this.delegatingObserver = new DelegatingFactObserver();
        connect();
    }

    public Subscription awaitCatchup() throws SubscriptionClosedException {
        return delegate((v0) -> {
            v0.awaitCatchup();
        });
    }

    public Subscription awaitCatchup(long j) throws SubscriptionClosedException, TimeoutException {
        return delegate((v0, v1) -> {
            v0.awaitCatchup(v1);
        }, j);
    }

    public Subscription awaitComplete() throws SubscriptionClosedException {
        return delegate((v0) -> {
            v0.awaitComplete();
        });
    }

    public Subscription awaitComplete(long j) throws SubscriptionClosedException, TimeoutException {
        return delegate((v0, v1) -> {
            v0.awaitComplete(v1);
        }, j);
    }

    public void close() {
        if (this.isClosed.getAndSet(true)) {
            return;
        }
        try {
            closeAndDetachSubscription();
        } finally {
            this.isClosed.set(true);
        }
    }

    @VisibleForTesting
    ResilientGrpcSubscription delegate(ThrowingBiConsumer<Subscription, Long> throwingBiConsumer, long j) throws TimeoutException {
        long currentTimeMillis = System.currentTimeMillis();
        do {
            assertSubscriptionStateNotClosed();
            long currentTimeMillis2 = j - (System.currentTimeMillis() - currentTimeMillis);
            try {
                throwingBiConsumer.accept(this.currentSubscription.getAndBlock(currentTimeMillis2), Long.valueOf(currentTimeMillis2));
                return this;
            } catch (TimeoutException e) {
                throw e;
            } catch (Exception e2) {
                if (!this.resilience.shouldRetry(e2)) {
                    throw e2;
                }
            }
        } while (System.currentTimeMillis() - currentTimeMillis <= j);
        throw new TimeoutException();
    }

    @VisibleForTesting
    ResilientGrpcSubscription delegate(Consumer<Subscription> consumer) {
        do {
            assertSubscriptionStateNotClosed();
            try {
                consumer.accept(this.currentSubscription.getAndBlock());
                return this;
            } catch (Exception e) {
            }
        } while (this.resilience.shouldRetry(e));
        throw ExceptionHelper.toRuntime(e);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void assertSubscriptionStateNotClosed() {
        if (this.isClosed.get()) {
            throw new SubscriptionClosedException("Subscription already closed  (" + this.originalRequest + ")");
        }
    }

    private synchronized void connect() {
        log.debug("Connecting ({})", this.originalRequest);
        doConnect();
    }

    @VisibleForTesting
    synchronized void reConnect() {
        log.debug("Reconnecting ({})", this.originalRequest);
        this.store.initializeIfNecessary();
        doConnect();
    }

    @VisibleForTesting
    protected void doConnect() {
        this.resilience.registerAttempt();
        SubscriptionRequestTO forFacts = SubscriptionRequestTO.forFacts(this.originalRequest);
        FactStreamPosition factStreamPosition = this.lastPosition.get();
        if (factStreamPosition != null) {
            forFacts.startingAfter(factStreamPosition.factId());
        }
        if (this.currentSubscription.get() == null) {
            try {
                this.currentSubscription.set(this.store.internalSubscribe(forFacts, this.delegatingObserver));
            } catch (Exception e) {
                fail(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeAndDetachSubscription() {
        Subscription andSet = this.currentSubscription.getAndSet(null);
        if (andSet != null) {
            try {
                andSet.close();
            } catch (Exception e) {
                log.warn("Ignoring Exception while closing a subscription ({})", this.originalRequest, e);
            }
        }
    }

    @VisibleForTesting
    void fail(Throwable th) {
        log.error("Too many failures, giving up. ({})", this.originalRequest);
        close();
        this.currentSubscription.unblock();
        this.originalObserver.onError(th);
        throw ExceptionHelper.toRuntime(th);
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public Resilience resilience() {
        return this.resilience;
    }
}
