package info.bitrich.xchangestream.service.netty;

import info.bitrich.xchangestream.service.ConnectableService;
import info.bitrich.xchangestream.service.exception.NotConnectedException;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.internal.SocketUtils;
import io.netty.util.internal.StringUtil;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.subjects.PublishSubject;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/service/netty/NettyStreamingService.class */
public abstract class NettyStreamingService<T> extends ConnectableService {
    private final Logger LOG;
    protected static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(10);
    protected static final Duration DEFAULT_RETRY_DURATION = Duration.ofSeconds(15);
    protected static final int DEFAULT_IDLE_TIMEOUT = 15;
    private final int maxFramePayloadLength;
    private final URI uri;
    private final AtomicBoolean isManualDisconnect;
    private Channel webSocketChannel;
    private final Duration retryDuration;
    private final Duration connectionTimeout;
    private final int idleTimeoutSeconds;
    private volatile NioEventLoopGroup eventLoopGroup;
    protected final Map<String, NettyStreamingService<T>.Subscription> channels;
    private boolean compressedMessages;
    private final PublishSubject<Throwable> reconnFailEmitters;
    private final PublishSubject<Object> connectionSuccessEmitters;
    private final PublishSubject<Object> disconnectEmitters;
    private final PublishSubject<Object> subjectIdle;
    private boolean acceptAllCertificates;
    private boolean enableLoggingHandler;
    private boolean autoReconnect;
    private LogLevel loggingHandlerLevel;
    private String socksProxyHost;
    private Integer socksProxyPort;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:info/bitrich/xchangestream/service/netty/NettyStreamingService$NettyWebSocketClientHandler.class */
    public class NettyWebSocketClientHandler extends WebSocketClientHandler {
        protected NettyWebSocketClientHandler(WebSocketClientHandshaker webSocketClientHandshaker, WebSocketClientHandler.WebSocketMessageHandler webSocketMessageHandler) {
            super(webSocketClientHandshaker, webSocketMessageHandler);
        }

        @Override // info.bitrich.xchangestream.service.netty.WebSocketClientHandler
        public void channelInactive(ChannelHandlerContext channelHandlerContext) {
            if (NettyStreamingService.this.isManualDisconnect.compareAndSet(true, false)) {
                return;
            }
            super.channelInactive(channelHandlerContext);
            NettyStreamingService.this.disconnectEmitters.onNext(new Object());
            NettyStreamingService.this.LOG.info("Reopening websocket because it was closed");
            NettyStreamingService.this.scheduleReconnect();
        }

        public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            if ((obj instanceof IdleStateEvent) && ((IdleStateEvent) obj).state().equals(IdleState.READER_IDLE)) {
                NettyStreamingService.this.onIdle(channelHandlerContext);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:info/bitrich/xchangestream/service/netty/NettyStreamingService$Subscription.class */
    public class Subscription {
        final ObservableEmitter<T> emitter;
        final String channelName;
        final Object[] args;

        public Subscription(ObservableEmitter<T> observableEmitter, String str, Object[] objArr) {
            this.emitter = observableEmitter;
            this.channelName = str;
            this.args = objArr;
        }
    }

    public NettyStreamingService(String str) {
        this(str, 65536);
    }

    public NettyStreamingService(String str, int i) {
        this(str, i, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_RETRY_DURATION);
    }

    public NettyStreamingService(String str, int i, Duration duration, Duration duration2) {
        this(str, i, duration, duration2, DEFAULT_IDLE_TIMEOUT);
    }

    public NettyStreamingService(String str, int i, Duration duration, Duration duration2, int i2) {
        this.LOG = LoggerFactory.getLogger(getClass());
        this.isManualDisconnect = new AtomicBoolean();
        this.channels = new ConcurrentHashMap();
        this.compressedMessages = false;
        this.reconnFailEmitters = PublishSubject.create();
        this.connectionSuccessEmitters = PublishSubject.create();
        this.disconnectEmitters = PublishSubject.create();
        this.subjectIdle = PublishSubject.create();
        this.acceptAllCertificates = false;
        this.enableLoggingHandler = false;
        this.autoReconnect = true;
        this.loggingHandlerLevel = LogLevel.DEBUG;
        this.maxFramePayloadLength = i;
        this.retryDuration = duration2;
        this.connectionTimeout = duration;
        this.idleTimeoutSeconds = i2;
        try {
            this.uri = new URI(str);
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("Error parsing URI " + str, e);
        }
    }

    protected Completable openConnection() {
        return Completable.create(completableEmitter -> {
            SslContext sslContext;
            try {
                this.LOG.info("Connecting to {}", this.uri.toString());
                String scheme = this.uri.getScheme() == null ? "ws" : this.uri.getScheme();
                final String host = this.uri.getHost();
                if (host == null) {
                    throw new IllegalArgumentException("Host cannot be null.");
                }
                int port = this.uri.getPort() == -1 ? "ws".equalsIgnoreCase(scheme) ? 80 : "wss".equalsIgnoreCase(scheme) ? 443 : -1 : this.uri.getPort();
                if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
                    throw new IllegalArgumentException("Only WS(S) is supported.");
                }
                if ("wss".equalsIgnoreCase(scheme)) {
                    SslContextBuilder forClient = SslContextBuilder.forClient();
                    if (this.acceptAllCertificates) {
                        forClient.trustManager(InsecureTrustManagerFactory.INSTANCE);
                    }
                    sslContext = forClient.build();
                } else {
                    sslContext = null;
                }
                final WebSocketClientHandler webSocketClientHandler = getWebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(this.uri, WebSocketVersion.V13, (String) null, true, getCustomHeaders(), this.maxFramePayloadLength), this::messageHandler);
                if (this.eventLoopGroup == null || this.eventLoopGroup.isShutdown()) {
                    this.eventLoopGroup = new NioEventLoopGroup(2);
                }
                final SslContext sslContext2 = sslContext;
                final int i = port;
                new Bootstrap().group(this.eventLoopGroup).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(Math.toIntExact(this.connectionTimeout.toMillis()))).option(ChannelOption.SO_KEEPALIVE, true).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { // from class: info.bitrich.xchangestream.service.netty.NettyStreamingService.1
                    /* JADX INFO: Access modifiers changed from: protected */
                    public void initChannel(SocketChannel socketChannel) {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        if (NettyStreamingService.this.socksProxyHost != null) {
                            pipeline.addLast(new ChannelHandler[]{new Socks5ProxyHandler(SocketUtils.socketAddress(NettyStreamingService.this.socksProxyHost, NettyStreamingService.this.socksProxyPort.intValue()))});
                        }
                        if (sslContext2 != null) {
                            pipeline.addLast(new ChannelHandler[]{sslContext2.newHandler(socketChannel.alloc(), host, i)});
                        }
                        pipeline.addLast(new ChannelHandler[]{new HttpClientCodec()});
                        if (NettyStreamingService.this.enableLoggingHandler) {
                            pipeline.addLast(new ChannelHandler[]{new LoggingHandler(NettyStreamingService.this.loggingHandlerLevel)});
                        }
                        if (NettyStreamingService.this.compressedMessages) {
                            pipeline.addLast(new ChannelHandler[]{WebSocketClientCompressionHandler.INSTANCE});
                        }
                        pipeline.addLast(new ChannelHandler[]{new HttpObjectAggregator(8192)});
                        if (NettyStreamingService.this.idleTimeoutSeconds > 0) {
                            pipeline.addLast(new ChannelHandler[]{new IdleStateHandler(NettyStreamingService.this.idleTimeoutSeconds, 0, 0)});
                        }
                        ChannelHandler webSocketClientExtensionHandler = NettyStreamingService.this.getWebSocketClientExtensionHandler();
                        if (webSocketClientExtensionHandler != null) {
                            pipeline.addLast(new ChannelHandler[]{webSocketClientExtensionHandler});
                        }
                        pipeline.addLast(new ChannelHandler[]{webSocketClientHandler});
                    }
                }).connect(this.uri.getHost(), port).addListener(channelFuture -> {
                    this.webSocketChannel = channelFuture.channel();
                    if (channelFuture.isSuccess()) {
                        webSocketClientHandler.handshakeFuture().addListener(future -> {
                            if (future.isSuccess()) {
                                completableEmitter.onComplete();
                            } else {
                                this.webSocketChannel.disconnect().addListener(future -> {
                                    completableEmitter.onError(future.cause());
                                });
                            }
                        });
                    } else {
                        completableEmitter.onError(channelFuture.cause());
                        scheduleReconnect();
                    }
                });
            } catch (Exception e) {
                completableEmitter.onError(e);
                scheduleReconnect();
            }
        }).doOnError(th -> {
            if (th instanceof WebSocketHandshakeException) {
                this.LOG.warn("Problem with connection: {} - {}", th.getClass(), th.getMessage());
            } else {
                this.LOG.warn("Problem with connection", th);
            }
            this.reconnFailEmitters.onNext(th);
        }).doOnComplete(() -> {
            resubscribeChannels();
            this.connectionSuccessEmitters.onNext(new Object());
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleReconnect() {
        if (this.autoReconnect) {
            this.LOG.info("Scheduling reconnection");
            this.webSocketChannel.eventLoop().schedule(() -> {
                return connect().subscribe(() -> {
                    this.LOG.info("Reconnection complete");
                }, th -> {
                    this.LOG.error("Reconnection failed: {}", th.getMessage());
                });
            }, this.retryDuration.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    protected DefaultHttpHeaders getCustomHeaders() {
        return new DefaultHttpHeaders();
    }

    public Completable disconnect() {
        this.isManualDisconnect.set(true);
        return Completable.create(completableEmitter -> {
            if (this.webSocketChannel == null || !this.webSocketChannel.isOpen()) {
                this.LOG.warn("Disconnect called but already disconnected");
                completableEmitter.onComplete();
            } else {
                this.webSocketChannel.writeAndFlush(new CloseWebSocketFrame()).addListener(future -> {
                    this.channels.clear();
                    this.eventLoopGroup.shutdownGracefully(2L, this.idleTimeoutSeconds, TimeUnit.SECONDS).addListener(future -> {
                        this.LOG.info("Disconnected");
                        this.disconnectEmitters.onNext(new Object());
                        completableEmitter.onComplete();
                    });
                });
            }
        });
    }

    protected abstract String getChannelNameFromMessage(T t) throws IOException;

    public abstract String getSubscribeMessage(String str, Object... objArr) throws IOException;

    public abstract String getUnsubscribeMessage(String str) throws IOException;

    public String getSubscriptionUniqueId(String str, Object... objArr) {
        return str;
    }

    protected void sendMessageRateLimiterAcquire() {
    }

    public abstract void messageHandler(String str);

    public void sendMessage(String str) {
        this.LOG.debug("Sending message: {}", str);
        if (this.webSocketChannel == null || !this.webSocketChannel.isOpen()) {
            this.LOG.warn("WebSocket is not open! Call connect first.");
            return;
        }
        if (!this.webSocketChannel.isWritable()) {
            this.LOG.warn("Cannot send data to WebSocket as it is not writable.");
        } else if (str != null) {
            sendMessageRateLimiterAcquire();
            this.webSocketChannel.writeAndFlush(new TextWebSocketFrame(str));
        }
    }

    public Observable<Throwable> subscribeReconnectFailure() {
        return this.reconnFailEmitters.share();
    }

    public Observable<Object> subscribeConnectionSuccess() {
        return this.connectionSuccessEmitters.share();
    }

    public Observable<Object> subscribeDisconnect() {
        return this.disconnectEmitters.share();
    }

    public Observable<T> subscribeChannel(String str, Object... objArr) {
        String subscriptionUniqueId = getSubscriptionUniqueId(str, objArr);
        this.LOG.info("Subscribing to channel {}", subscriptionUniqueId);
        return Observable.create(observableEmitter -> {
            if (this.webSocketChannel == null || !this.webSocketChannel.isOpen()) {
                observableEmitter.onError(new NotConnectedException());
            }
            this.channels.computeIfAbsent(subscriptionUniqueId, str2 -> {
                Subscription subscription = new Subscription(observableEmitter, str, objArr);
                try {
                    sendMessage(getSubscribeMessage(str, objArr));
                } catch (Exception e) {
                    observableEmitter.onError(e);
                }
                return subscription;
            });
        }).doOnDispose(() -> {
            if (this.channels.remove(subscriptionUniqueId) != null) {
                try {
                    sendMessage(getUnsubscribeMessage(subscriptionUniqueId));
                } catch (IOException e) {
                    this.LOG.debug("Failed to unsubscribe channel: {} {}", subscriptionUniqueId, e.toString());
                } catch (Exception e2) {
                    this.LOG.warn("Failed to unsubscribe channel: {}", subscriptionUniqueId, e2);
                }
            }
        }).share();
    }

    public void resubscribeChannels() {
        for (Map.Entry<String, NettyStreamingService<T>.Subscription> entry : this.channels.entrySet()) {
            try {
                NettyStreamingService<T>.Subscription value = entry.getValue();
                sendMessage(getSubscribeMessage(value.channelName, value.args));
            } catch (IOException e) {
                this.LOG.error("Failed to reconnect channel: {}", entry.getKey());
            }
        }
    }

    protected String getChannel(T t) {
        try {
            return getChannelNameFromMessage(t);
        } catch (IOException e) {
            this.LOG.error("Cannot parse channel from message: {}", t);
            return "";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleMessage(T t) {
        String channel = getChannel(t);
        if (StringUtil.isNullOrEmpty(channel)) {
            return;
        }
        handleChannelMessage(channel, t);
    }

    protected void handleError(T t, Throwable th) {
        String channel = getChannel(t);
        if (StringUtil.isNullOrEmpty(channel)) {
            this.LOG.error("handleError cannot parse channel from message: {}", t);
        } else {
            handleChannelError(channel, th);
        }
    }

    protected void handleIdle(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.writeAndFlush(new PingWebSocketFrame());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onIdle(ChannelHandlerContext channelHandlerContext) {
        this.subjectIdle.onNext(1);
        handleIdle(channelHandlerContext);
    }

    public Observable<Object> subscribeIdle() {
        return this.subjectIdle.share();
    }

    protected void handleChannelMessage(String str, T t) {
        NettyStreamingService<T>.Subscription subscription = this.channels.get(str);
        if (subscription == null) {
            this.LOG.debug("Channel has been closed {}.", str);
            return;
        }
        ObservableEmitter<T> observableEmitter = subscription.emitter;
        if (observableEmitter == null) {
            this.LOG.debug("No subscriber for channel {}.", str);
        } else {
            observableEmitter.onNext(t);
        }
    }

    protected void handleChannelError(String str, Throwable th) {
        NettyStreamingService<T>.Subscription subscription = this.channels.get(str);
        if (subscription == null) {
            this.LOG.debug("Channel {} has been closed.", str);
            return;
        }
        ObservableEmitter<T> observableEmitter = subscription.emitter;
        if (observableEmitter == null) {
            this.LOG.debug("No subscriber for channel {}.", str);
        } else {
            observableEmitter.onError(th);
        }
    }

    protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
        return WebSocketClientCompressionHandler.INSTANCE;
    }

    protected WebSocketClientHandler getWebSocketClientHandler(WebSocketClientHandshaker webSocketClientHandshaker, WebSocketClientHandler.WebSocketMessageHandler webSocketMessageHandler) {
        return new NettyWebSocketClientHandler(webSocketClientHandshaker, webSocketMessageHandler);
    }

    public boolean isSocketOpen() {
        return this.webSocketChannel != null && this.webSocketChannel.isOpen();
    }

    public void useCompressedMessages(boolean z) {
        this.compressedMessages = z;
    }

    public void setAcceptAllCertificates(boolean z) {
        this.acceptAllCertificates = z;
    }

    public void setEnableLoggingHandler(boolean z) {
        this.enableLoggingHandler = z;
    }

    public void setLoggingHandlerLevel(LogLevel logLevel) {
        this.loggingHandlerLevel = logLevel;
    }

    public void setSocksProxyHost(String str) {
        this.socksProxyHost = str;
    }

    public void setSocksProxyPort(Integer num) {
        this.socksProxyPort = num;
    }

    public void setAutoReconnect(boolean z) {
        this.autoReconnect = z;
    }
}
