package io.split.engine.sse.client;

import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.SocketException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import split.com.google.common.base.Preconditions;
import split.com.google.common.base.Strings;
import split.com.google.common.util.concurrent.ThreadFactoryBuilder;

/* loaded from: input_file:io/split/engine/sse/client/SSEClient.class */
public class SSEClient {
    private static final String SOCKET_CLOSED_MESSAGE = "Socket closed";
    private static final String KEEP_ALIVE_PAYLOAD = ":keepalive\n";
    private static final long CONNECT_TIMEOUT = 30000;
    private static final Logger _log = LoggerFactory.getLogger(SSEClient.class);
    private final CloseableHttpClient _client;
    private final Function<RawEvent, Void> _eventCallback;
    private final Function<StatusMessage, Void> _statusCallback;
    private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
    private final ExecutorService _connectionExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SPLIT-SSEConnection-%d").build());
    private final AtomicReference<ConnectionState> _state = new AtomicReference<>(ConnectionState.CLOSED);
    private final AtomicReference<CloseableHttpResponse> _ongoingResponse = new AtomicReference<>();
    private final AtomicReference<HttpGet> _ongoingRequest = new AtomicReference<>();
    private AtomicBoolean _forcedStop = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/split/engine/sse/client/SSEClient$ConnectionState.class */
    public enum ConnectionState {
        OPEN,
        CLOSED
    }

    /* loaded from: input_file:io/split/engine/sse/client/SSEClient$StatusMessage.class */
    public enum StatusMessage {
        CONNECTED,
        RETRYABLE_ERROR,
        NONRETRYABLE_ERROR,
        INITIALIZATION_IN_PROGRESS,
        FORCED_STOP,
        FIRST_EVENT
    }

    public SSEClient(Function<RawEvent, Void> function, Function<StatusMessage, Void> function2, CloseableHttpClient closeableHttpClient, TelemetryRuntimeProducer telemetryRuntimeProducer) {
        this._eventCallback = function;
        this._statusCallback = function2;
        this._client = closeableHttpClient;
        this._telemetryRuntimeProducer = (TelemetryRuntimeProducer) Preconditions.checkNotNull(telemetryRuntimeProducer);
    }

    public synchronized boolean open(URI uri) {
        if (isOpen()) {
            _log.info("SSEClient already open.");
            return false;
        }
        this._statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this._connectionExecutor.submit(() -> {
            connectAndLoop(uri, countDownLatch);
        });
        try {
            if (countDownLatch.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) {
                return isOpen();
            }
            return false;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            _log.info(e.getMessage());
            return false;
        }
    }

    public boolean isOpen() {
        return ConnectionState.OPEN.equals(this._state.get());
    }

    public synchronized void close() {
        this._forcedStop.set(true);
        if (!this._state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED) || this._ongoingResponse.get() == null) {
            return;
        }
        try {
            this._ongoingRequest.get().abort();
            this._ongoingResponse.get().close();
        } catch (IOException e) {
            _log.debug(String.format("SSEClient close forced: %s", e.getMessage()));
        }
    }

    private void connectAndLoop(URI uri, CountDownLatch countDownLatch) {
        Preconditions.checkNotNull(uri);
        Preconditions.checkNotNull(countDownLatch);
        try {
            if (!establishConnection(uri, countDownLatch)) {
                this._statusCallback.apply(StatusMessage.NONRETRYABLE_ERROR);
                return;
            }
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this._ongoingResponse.get().getEntity().getContent()));
                while (isOpen() && !Thread.currentThread().isInterrupted()) {
                    try {
                        handleMessage(readMessageAsString(bufferedReader));
                    } catch (SocketException e) {
                        _log.debug(e.getMessage());
                        if (SOCKET_CLOSED_MESSAGE.equals(e.getMessage())) {
                            this._statusCallback.apply(StatusMessage.FORCED_STOP);
                            this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
                            try {
                                this._ongoingResponse.get().close();
                            } catch (IOException e2) {
                                _log.debug(e2.getMessage());
                            }
                            this._state.set(ConnectionState.CLOSED);
                            _log.debug("SSEClient finished.");
                            this._forcedStop.set(false);
                            return;
                        }
                        this._statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
                        this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
                        try {
                            this._ongoingResponse.get().close();
                        } catch (IOException e3) {
                            _log.debug(e3.getMessage());
                        }
                        this._state.set(ConnectionState.CLOSED);
                        _log.debug("SSEClient finished.");
                        this._forcedStop.set(false);
                        return;
                    } catch (IOException e4) {
                        if (!this._forcedStop.get()) {
                            _log.debug(String.format("SSE connection ended abruptly: %s. Retying", e4.getMessage()));
                            this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
                            this._statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
                            try {
                                this._ongoingResponse.get().close();
                            } catch (IOException e5) {
                                _log.debug(e5.getMessage());
                            }
                            this._state.set(ConnectionState.CLOSED);
                            _log.debug("SSEClient finished.");
                            this._forcedStop.set(false);
                            return;
                        }
                        this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
                    }
                }
                try {
                    this._ongoingResponse.get().close();
                } catch (IOException e6) {
                    _log.debug(e6.getMessage());
                }
                this._state.set(ConnectionState.CLOSED);
                _log.debug("SSEClient finished.");
                this._forcedStop.set(false);
            } catch (Exception e7) {
                this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
                _log.warn(e7.getMessage(), e7);
                this._statusCallback.apply(StatusMessage.NONRETRYABLE_ERROR);
                try {
                    this._ongoingResponse.get().close();
                } catch (IOException e8) {
                    _log.debug(e8.getMessage());
                }
                this._state.set(ConnectionState.CLOSED);
                _log.debug("SSEClient finished.");
                this._forcedStop.set(false);
            }
        } catch (Throwable th) {
            try {
                this._ongoingResponse.get().close();
            } catch (IOException e9) {
                _log.debug(e9.getMessage());
            }
            this._state.set(ConnectionState.CLOSED);
            _log.debug("SSEClient finished.");
            this._forcedStop.set(false);
            throw th;
        }
    }

    private boolean establishConnection(URI uri, CountDownLatch countDownLatch) {
        this._ongoingRequest.set(new HttpGet(uri));
        try {
            try {
                this._ongoingResponse.set(this._client.execute(this._ongoingRequest.get()));
                if (this._ongoingResponse.get().getCode() != 200) {
                    countDownLatch.countDown();
                    return false;
                }
                this._state.set(ConnectionState.OPEN);
                this._statusCallback.apply(StatusMessage.CONNECTED);
                countDownLatch.countDown();
                return true;
            } catch (IOException e) {
                _log.error(String.format("Error establishConnection: %s", e));
                countDownLatch.countDown();
                return false;
            }
        } catch (Throwable th) {
            countDownLatch.countDown();
            throw th;
        }
    }

    private static String readMessageAsString(BufferedReader bufferedReader) throws IOException {
        StringBuilder sb = new StringBuilder();
        while (true) {
            String readLine = bufferedReader.readLine();
            if (null == readLine) {
                throw new EOFException("connection closed by remote host");
            }
            if (readLine.isEmpty()) {
                return sb.toString();
            }
            sb.append(readLine).append("\n");
        }
    }

    private void handleMessage(String str) {
        if (Strings.isNullOrEmpty(str) || KEEP_ALIVE_PAYLOAD.equals(str)) {
            _log.debug("Keep Alive event");
        } else {
            this._eventCallback.apply(RawEvent.fromString(str));
        }
    }
}
