/*
 * Decompiled with CFR 0.152.
 */
package cloud.prefab.client.internal;

import cloud.prefab.client.internal.PrefabHttpClient;
import cloud.prefab.client.internal.RetryDelayCalculator;
import cloud.prefab.domain.Prefab;
import cloud.prefab.sse.SSEHandler;
import cloud.prefab.sse.events.DataEvent;
import cloud.prefab.sse.events.Event;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Base64;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SseConfigStreamingSubscriber {
    private static final Logger LOG = LoggerFactory.getLogger(SseConfigStreamingSubscriber.class);
    private final PrefabHttpClient prefabHttpClient;
    private final Supplier<Long> highwaterMarkSupplier;
    private final Consumer<Prefab.Configs> configsConsumer;
    private final ScheduledExecutorService scheduledExecutorService;

    public SseConfigStreamingSubscriber(PrefabHttpClient prefabHttpClient, Supplier<Long> highwaterMarkSupplier, Consumer<Prefab.Configs> configsConsumer, ScheduledExecutorService scheduledExecutorService) {
        this.prefabHttpClient = prefabHttpClient;
        this.highwaterMarkSupplier = highwaterMarkSupplier;
        this.configsConsumer = configsConsumer;
        this.scheduledExecutorService = scheduledExecutorService;
    }

    public void start() {
        this.restart(0);
    }

    private void restart(int errorCount) {
        Runnable starter = () -> {
            try {
                SSEHandler sseHandler = new SSEHandler();
                FlowSubscriber flowSubscriber = new FlowSubscriber(this.configsConsumer, hasReceivedData -> this.restart(hasReceivedData != false ? 1 : errorCount + 1));
                sseHandler.subscribe(flowSubscriber);
                this.prefabHttpClient.requestConfigSSE(this.highwaterMarkSupplier.get(), sseHandler);
            }
            catch (Exception e) {
                if (e.getMessage().contains("GOAWAY")) {
                    LOG.info("Got GOAWAY on SSE config stream, will restart connection.");
                }
                LOG.warn("Unexpected exception starting SSE config stream, will retry", (Throwable)e);
            }
        };
        if (errorCount == 0) {
            starter.run();
        } else {
            long delayMillis = RetryDelayCalculator.exponentialMillisToNextTry(errorCount, TimeUnit.SECONDS.toMillis(1L), TimeUnit.SECONDS.toMillis(30L));
            LOG.info("Restarting SSE config connection in {} ms", (Object)delayMillis);
            this.scheduledExecutorService.schedule(starter, delayMillis, TimeUnit.MILLISECONDS);
        }
    }

    static class FlowSubscriber
    implements Flow.Subscriber<Event> {
        private final Consumer<Prefab.Configs> configConsumer;
        private final Consumer<Boolean> restartHandler;
        private Flow.Subscription subscription;
        private final AtomicBoolean hasReceivedData = new AtomicBoolean(false);

        FlowSubscriber(Consumer<Prefab.Configs> configConsumer, Consumer<Boolean> restartHandler) {
            this.configConsumer = configConsumer;
            this.restartHandler = restartHandler;
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1L);
        }

        @Override
        public void onNext(Event item) {
            if (item instanceof DataEvent) {
                DataEvent dataEvent = (DataEvent)item;
                try {
                    this.hasReceivedData.set(true);
                    String dataPayload = dataEvent.getData().trim();
                    if (!dataPayload.isEmpty()) {
                        Prefab.Configs configs = Prefab.Configs.parseFrom(Base64.getDecoder().decode(dataPayload));
                        if (!configs.hasConfigServicePointer()) {
                            LOG.debug("Ignoring empty config keep-alive");
                        } else {
                            this.configConsumer.accept(configs);
                        }
                    }
                }
                catch (InvalidProtocolBufferException e) {
                    LOG.warn("Error parsing configs from event name {} - error message {}", (Object)dataEvent.getEventName(), (Object)e.getMessage());
                }
            }
            this.subscription.request(1L);
        }

        @Override
        public void onError(Throwable throwable) {
            LOG.info("Unexpected error encountered", throwable);
            this.restartHandler.accept(this.getHasReceivedData());
        }

        @Override
        public void onComplete() {
            LOG.info("Unexpected stream completion");
            this.restartHandler.accept(this.getHasReceivedData());
        }

        public boolean getHasReceivedData() {
            return this.hasReceivedData.get();
        }
    }
}

