package io.split.engine.sse;

import io.split.client.RequestDecorator;
import io.split.engine.sse.client.RawEvent;
import io.split.engine.sse.client.SSEClient;
import io.split.engine.sse.dtos.SegmentQueueDto;
import io.split.engine.sse.exceptions.EventParsingException;
import io.split.engine.sse.workers.FeatureFlagsWorker;
import io.split.engine.sse.workers.Worker;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import split.com.google.common.annotations.VisibleForTesting;
import split.com.google.common.base.Preconditions;
import split.com.google.common.base.Strings;
import split.org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import split.org.apache.hc.core5.net.URIBuilder;

/* loaded from: input_file:io/split/engine/sse/EventSourceClientImp.class */
public class EventSourceClientImp implements EventSourceClient {
    private static final Logger _log = LoggerFactory.getLogger(EventSourceClient.class);
    private static final String ERROR = "error";
    private static final String MESSAGE = "message";
    private final String _baseStreamingUrl;
    private final NotificationParser _notificationParser;
    private final NotificationProcessor _notificationProcessor;
    private final SSEClient _sseClient;
    private final PushStatusTracker _pushStatusTracker;
    private final AtomicBoolean _firstEvent = new AtomicBoolean();

    @VisibleForTesting
    EventSourceClientImp(String str, NotificationParser notificationParser, NotificationProcessor notificationProcessor, PushStatusTracker pushStatusTracker, CloseableHttpClient closeableHttpClient, TelemetryRuntimeProducer telemetryRuntimeProducer, ThreadFactory threadFactory, RequestDecorator requestDecorator) {
        this._baseStreamingUrl = (String) Preconditions.checkNotNull(str);
        this._notificationParser = (NotificationParser) Preconditions.checkNotNull(notificationParser);
        this._notificationProcessor = (NotificationProcessor) Preconditions.checkNotNull(notificationProcessor);
        this._pushStatusTracker = pushStatusTracker;
        this._sseClient = new SSEClient(rawEvent -> {
            onMessage(rawEvent);
            return null;
        }, statusMessage -> {
            this._pushStatusTracker.handleSseStatus(statusMessage);
            return null;
        }, closeableHttpClient, telemetryRuntimeProducer, threadFactory, requestDecorator);
    }

    public static EventSourceClientImp build(String str, FeatureFlagsWorker featureFlagsWorker, Worker<SegmentQueueDto> worker, PushStatusTracker pushStatusTracker, CloseableHttpClient closeableHttpClient, TelemetryRuntimeProducer telemetryRuntimeProducer, ThreadFactory threadFactory, RequestDecorator requestDecorator) {
        return new EventSourceClientImp(str, new NotificationParserImp(), NotificationProcessorImp.build(featureFlagsWorker, worker, pushStatusTracker), pushStatusTracker, closeableHttpClient, telemetryRuntimeProducer, threadFactory, requestDecorator);
    }

    @Override // io.split.engine.sse.EventSourceClient
    public boolean start(String str, String str2) {
        if (this._sseClient.isOpen()) {
            this._sseClient.close();
        }
        try {
            this._firstEvent.set(false);
            return this._sseClient.open(buildUri(str, str2));
        } catch (URISyntaxException e) {
            _log.error("Error building Streaming URI: " + e.getMessage());
            return false;
        }
    }

    @Override // io.split.engine.sse.EventSourceClient
    public void stop() {
        _log.info("Stopping EventSourceClientImp");
        if (this._sseClient.isOpen()) {
            this._sseClient.close();
        } else {
            _log.info("Event Source Client is closed.");
        }
    }

    private URI buildUri(String str, String str2) throws URISyntaxException {
        return new URIBuilder(this._baseStreamingUrl).addParameter("channels", str).addParameter("v", "1.1").addParameter("accessToken", str2).build();
    }

    private void onMessage(RawEvent rawEvent) {
        try {
            String event = rawEvent.event();
            String data = rawEvent.data();
            if (this._firstEvent.compareAndSet(false, true) && !ERROR.equals(event)) {
                this._pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.FIRST_EVENT);
            }
            if (!Strings.isNullOrEmpty(data)) {
                _log.debug(String.format("Payload received: %s", data));
                boolean z = -1;
                switch (event.hashCode()) {
                    case 96784904:
                        if (event.equals(ERROR)) {
                            z = true;
                            break;
                        }
                        break;
                    case 954925063:
                        if (event.equals(MESSAGE)) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        this._notificationProcessor.process(this._notificationParser.parseMessage(data));
                        break;
                    case true:
                        this._pushStatusTracker.handleIncomingAblyError(this._notificationParser.parseError(data));
                        break;
                    default:
                        throw new EventParsingException("Wrong notification type.", data);
                }
            }
        } catch (EventParsingException e) {
            _log.debug(String.format("Error parsing the event: %s. Payload: %s", e.getMessage(), e.getPayload()));
        } catch (Exception e2) {
            _log.debug(String.format("Error parsing the event id: %s. OnMessage: %s", rawEvent.id(), e2.getMessage()), e2);
        }
    }
}
