package com.zhipu.oapi.service.v4.realtime;

import com.zhipu.oapi.Constants;
import java.io.Closeable;
import java.io.IOException;
import java.util.EnumSet;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/zhipu/oapi/service/v4/realtime/OkHttpRealtimeClient.class */
public final class OkHttpRealtimeClient implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(OkHttpRealtimeClient.class);
    private final OkHttpClient client;
    private final CommunicationProvider communicationProvider;
    private final AtomicBoolean isDisposed;
    private final AtomicReference<WebSocket> webSocketRef;
    private final Consumer<RealtimeServerEvent> serverEventHandler;
    private final ConnectivityMonitor connectivityMonitor;
    private final boolean closeClientOnClose;

    /* loaded from: input_file:com/zhipu/oapi/service/v4/realtime/OkHttpRealtimeClient$CommunicationProvider.class */
    public interface CommunicationProvider {
        String getWebSocketUrl();

        String getAuthToken();
    }

    /* loaded from: input_file:com/zhipu/oapi/service/v4/realtime/OkHttpRealtimeClient$ConnectivityMonitor.class */
    public static final class ConnectivityMonitor {
        private final AtomicReference<ConnectivityState> clientState = new AtomicReference<>(ConnectivityState.STOPPED);

        public ConnectivityState get() {
            return this.clientState.get();
        }

        public boolean changeStateOn(ConnectivityState connectivityState, ConnectivityState connectivityState2) {
            boolean compareAndSet = this.clientState.compareAndSet(connectivityState, connectivityState2);
            if (compareAndSet) {
                OkHttpRealtimeClient.logger.info("状态变更：{} -> {}", connectivityState, connectivityState2);
            }
            return compareAndSet;
        }

        public boolean changeStateOnAnyOf(EnumSet<ConnectivityState> enumSet, ConnectivityState connectivityState) {
            ConnectivityState connectivityState2;
            do {
                connectivityState2 = this.clientState.get();
                if (!enumSet.contains(connectivityState2)) {
                    return false;
                }
            } while (!this.clientState.compareAndSet(connectivityState2, connectivityState));
            OkHttpRealtimeClient.logger.info("状态变更：{} -> {}", connectivityState2, connectivityState);
            return true;
        }

        public void changeState(ConnectivityState connectivityState) {
            OkHttpRealtimeClient.logger.info("状态变更：{} -> {}", this.clientState.getAndSet(connectivityState), connectivityState);
        }
    }

    /* loaded from: input_file:com/zhipu/oapi/service/v4/realtime/OkHttpRealtimeClient$ConnectivityState.class */
    public enum ConnectivityState {
        STOPPED,
        CONNECTING,
        CONNECTED,
        DISCONNECTED,
        STOPPING,
        CLOSED
    }

    public OkHttpRealtimeClient(CommunicationProvider communicationProvider, Consumer<RealtimeServerEvent> consumer, OkHttpClient okHttpClient) {
        this.isDisposed = new AtomicBoolean(false);
        this.webSocketRef = new AtomicReference<>();
        this.connectivityMonitor = new ConnectivityMonitor();
        this.client = okHttpClient;
        this.communicationProvider = communicationProvider;
        this.serverEventHandler = consumer;
        this.closeClientOnClose = false;
    }

    public OkHttpRealtimeClient(CommunicationProvider communicationProvider, Consumer<RealtimeServerEvent> consumer, OkHttpClient okHttpClient, boolean z) {
        this.isDisposed = new AtomicBoolean(false);
        this.webSocketRef = new AtomicReference<>();
        this.connectivityMonitor = new ConnectivityMonitor();
        this.client = okHttpClient;
        this.communicationProvider = communicationProvider;
        this.serverEventHandler = consumer;
        this.closeClientOnClose = false;
    }

    public OkHttpRealtimeClient(CommunicationProvider communicationProvider, Consumer<RealtimeServerEvent> consumer) {
        this.isDisposed = new AtomicBoolean(false);
        this.webSocketRef = new AtomicReference<>();
        this.connectivityMonitor = new ConnectivityMonitor();
        Dispatcher dispatcher = new Dispatcher(Executors.newFixedThreadPool(4));
        dispatcher.setMaxRequests(4);
        dispatcher.setMaxRequestsPerHost(2);
        this.client = new OkHttpClient.Builder().connectTimeout(5L, TimeUnit.SECONDS).readTimeout(20L, TimeUnit.SECONDS).writeTimeout(20L, TimeUnit.SECONDS).callTimeout(40L, TimeUnit.SECONDS).pingInterval(10L, TimeUnit.SECONDS).dispatcher(dispatcher).build();
        this.communicationProvider = communicationProvider;
        this.serverEventHandler = consumer;
        this.closeClientOnClose = true;
    }

    public void start() {
        if (this.isDisposed.get()) {
            throw new IllegalStateException("客户端已关闭");
        }
        if (!this.connectivityMonitor.changeStateOnAnyOf(EnumSet.of(ConnectivityState.STOPPED, ConnectivityState.DISCONNECTED), ConnectivityState.CONNECTING)) {
            throw new IllegalStateException("无法在状态 " + this.connectivityMonitor.get() + " 下启动连接");
        }
        WebSocketListener webSocketListener = new WebSocketListener() { // from class: com.zhipu.oapi.service.v4.realtime.OkHttpRealtimeClient.1
            public void onOpen(WebSocket webSocket, Response response) {
                OkHttpRealtimeClient.logger.info("WebSocket连接已建立");
                OkHttpRealtimeClient.this.webSocketRef.set(webSocket);
                OkHttpRealtimeClient.this.connectivityMonitor.changeState(ConnectivityState.CONNECTED);
            }

            public void onMessage(WebSocket webSocket, String str) {
                OkHttpRealtimeClient.logger.debug("收到消息: {}", str);
                RealtimeServerEvent fromJsonToServerEvent = JasonUtil.fromJsonToServerEvent(str);
                if (fromJsonToServerEvent == null) {
                    OkHttpRealtimeClient.logger.error("无法解析服务器事件: {}", str);
                } else {
                    OkHttpRealtimeClient.this.serverEventHandler.accept(fromJsonToServerEvent);
                }
            }

            public void onClosed(WebSocket webSocket, int i, String str) {
                OkHttpRealtimeClient.logger.info("连接正常关闭，原因：{}", str);
                OkHttpRealtimeClient.this.webSocketRef.set(null);
                if (isStoppingState()) {
                    return;
                }
                OkHttpRealtimeClient.this.connectivityMonitor.changeState(ConnectivityState.DISCONNECTED);
            }

            public void onFailure(WebSocket webSocket, Throwable th, Response response) {
                OkHttpRealtimeClient.logger.error("连接异常", th);
                if (response != null) {
                    OkHttpRealtimeClient.logger.error("异常响应码：{}，响应内容：{}", Integer.valueOf(response.code()), response.body() != null ? response.body().toString() : "空内容");
                }
                OkHttpRealtimeClient.this.webSocketRef.set(null);
                if (isStoppingState()) {
                    return;
                }
                OkHttpRealtimeClient.this.connectivityMonitor.changeState(ConnectivityState.DISCONNECTED);
            }

            private boolean isStoppingState() {
                ConnectivityState connectivityState = OkHttpRealtimeClient.this.connectivityMonitor.get();
                return connectivityState == ConnectivityState.STOPPING || connectivityState == ConnectivityState.CLOSED;
            }
        };
        Request build = new Request.Builder().url(this.communicationProvider.getWebSocketUrl()).addHeader(Constants.authHeaderKey, "Bearer " + this.communicationProvider.getAuthToken()).build();
        build.url().redact();
        this.client.newWebSocket(build, webSocketListener);
        this.client.dispatcher().executorService().submit(() -> {
            logger.info("WebSocket连接线程已启动");
        });
    }

    public void stop() {
        if (this.connectivityMonitor.get() == ConnectivityState.CLOSED) {
            throw new IllegalStateException("客户端已关闭");
        }
        if (!this.connectivityMonitor.changeStateOn(ConnectivityState.CONNECTED, ConnectivityState.STOPPING)) {
            logger.warn("停止失败，当前状态：{}", this.connectivityMonitor.get());
            return;
        }
        WebSocket webSocket = this.webSocketRef.get();
        if (webSocket != null) {
            webSocket.close(1000, "正常关闭");
        }
    }

    public void waitForConnection() throws InterruptedException {
        while (this.connectivityMonitor.get() != ConnectivityState.CONNECTED) {
            Thread.sleep(100L);
        }
    }

    public void sendMessage(RealtimeClientEvent realtimeClientEvent) {
        ConnectivityState connectivityState = this.connectivityMonitor.get();
        if (connectivityState != ConnectivityState.CONNECTED) {
            throw new IllegalStateException("连接未就绪，当前状态：" + connectivityState);
        }
        WebSocket webSocket = this.webSocketRef.get();
        if (webSocket == null) {
            throw new IllegalStateException("WebSocket连接未建立");
        }
        String jsonFromClientEvent = JasonUtil.toJsonFromClientEvent(realtimeClientEvent);
        if (jsonFromClientEvent == null) {
            logger.error("无法序列化客户端事件: type={}, event_id={}", realtimeClientEvent.getType(), realtimeClientEvent.getEventId());
        } else {
            webSocket.send(jsonFromClientEvent);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.isDisposed.compareAndSet(false, true)) {
            this.connectivityMonitor.changeState(ConnectivityState.CLOSED);
            WebSocket webSocket = this.webSocketRef.get();
            if (webSocket != null) {
                webSocket.close(1000, "客户端关闭");
                this.webSocketRef.set(null);
            }
            if (this.closeClientOnClose) {
                this.client.dispatcher().executorService().shutdown();
            }
        }
    }
}
