package info.bitrich.xchangestream.okex;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import info.bitrich.xchangestream.okex.dto.OkexLoginMessage;
import info.bitrich.xchangestream.okex.dto.OkexSubscribeMessage;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import info.bitrich.xchangestream.service.netty.NettyStreamingService;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.exceptions.ExchangeException;
import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException;
import org.knowm.xchange.okex.dto.OkexInstType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/okex/OkexStreamingService.class */
public class OkexStreamingService extends JsonNettyStreamingService {
    private static final Logger LOG = LoggerFactory.getLogger(OkexStreamingService.class);
    private static final String LOGIN_SIGN_METHOD = "GET";
    private static final String LOGIN_SIGN_REQUEST_PATH = "/users/self/verify";
    private static final String SUBSCRIBE = "subscribe";
    private static final String UNSUBSCRIBE = "unsubscribe";
    public static final String TRADES = "trades";
    public static final String ORDERBOOK = "books";
    public static final String ORDERBOOK5 = "books5";
    public static final String FUNDING_RATE = "funding-rate";
    public static final String TICKERS = "tickers";
    public static final String USERTRADES = "orders";
    private final Observable<Long> pingPongSrc;
    private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler;
    private Disposable pingPongSubscription;
    private final ExchangeSpecification xSpec;

    /* loaded from: input_file:info/bitrich/xchangestream/okex/OkexStreamingService$OkxWebSocketClientHandler.class */
    class OkxWebSocketClientHandler extends NettyStreamingService<JsonNode>.NettyWebSocketClientHandler {
        public OkxWebSocketClientHandler(WebSocketClientHandshaker webSocketClientHandshaker, WebSocketClientHandler.WebSocketMessageHandler webSocketMessageHandler) {
            super(OkexStreamingService.this, webSocketClientHandshaker, webSocketMessageHandler);
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) {
            super.channelActive(channelHandlerContext);
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) {
            super.channelInactive(channelHandlerContext);
            if (OkexStreamingService.this.channelInactiveHandler != null) {
                OkexStreamingService.this.channelInactiveHandler.onMessage("WebSocket Client disconnected!");
            }
        }
    }

    public OkexStreamingService(String str, ExchangeSpecification exchangeSpecification) {
        super(str);
        this.pingPongSrc = Observable.interval(15L, 15L, TimeUnit.SECONDS);
        this.channelInactiveHandler = null;
        this.xSpec = exchangeSpecification;
    }

    public Completable connect() {
        return super.connect().andThen(completableObserver -> {
            try {
                if (this.xSpec.getApiKey() != null) {
                    login();
                }
                if (this.pingPongSubscription != null && !this.pingPongSubscription.isDisposed()) {
                    this.pingPongSubscription.dispose();
                }
                this.pingPongSubscription = this.pingPongSrc.subscribe(l -> {
                    sendMessage("ping");
                });
                completableObserver.onComplete();
            } catch (Exception e) {
                completableObserver.onError(e);
            }
        });
    }

    public void login() throws JsonProcessingException {
        try {
            Mac mac = Mac.getInstance("HmacSHA256");
            mac.init(new SecretKeySpec(this.xSpec.getSecretKey().getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
            String valueOf = String.valueOf(System.currentTimeMillis() / 1000);
            String encodeToString = Base64.getEncoder().encodeToString(mac.doFinal((valueOf + LOGIN_SIGN_METHOD + LOGIN_SIGN_REQUEST_PATH).getBytes(StandardCharsets.UTF_8)));
            OkexLoginMessage okexLoginMessage = new OkexLoginMessage();
            okexLoginMessage.getArgs().add(new OkexLoginMessage.LoginArg(this.xSpec.getApiKey(), this.xSpec.getExchangeSpecificParametersItem("passphrase").toString(), valueOf, encodeToString));
            sendMessage(this.objectMapper.writeValueAsString(okexLoginMessage));
        } catch (InvalidKeyException | NoSuchAlgorithmException e) {
            throw new ExchangeException("Invalid API secret", e);
        }
    }

    public void messageHandler(String str) {
        LOG.debug("Received message: {}", str);
        try {
            JsonNode readTree = this.objectMapper.readTree(str);
            if (!processArrayMessageSeparately() || !readTree.isArray()) {
                handleMessage(readTree);
                return;
            }
            Iterator it = readTree.iterator();
            while (it.hasNext()) {
                handleMessage((JsonNode) it.next());
            }
        } catch (IOException e) {
            if ("pong".equals(str)) {
                return;
            }
            LOG.error("Error parsing incoming message to JSON: {}", str);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getChannelNameFromMessage(JsonNode jsonNode) {
        String str = "";
        if (jsonNode.has("arg") && jsonNode.get("arg").has("channel") && jsonNode.get("arg").has("instId")) {
            str = jsonNode.get("arg").get("channel").asText() + jsonNode.get("arg").get("instId").asText();
        }
        return str;
    }

    public String getSubscribeMessage(String str, Object... objArr) throws IOException {
        return this.objectMapper.writeValueAsString(new OkexSubscribeMessage(SUBSCRIBE, Collections.singletonList(getTopic(str))));
    }

    public String getUnsubscribeMessage(String str, Object... objArr) throws IOException {
        return this.objectMapper.writeValueAsString(new OkexSubscribeMessage(UNSUBSCRIBE, Collections.singletonList(getTopic(str))));
    }

    private OkexSubscribeMessage.SubscriptionTopic getTopic(String str) {
        if (str.contains(ORDERBOOK5)) {
            return new OkexSubscribeMessage.SubscriptionTopic(ORDERBOOK5, null, null, str.replace(ORDERBOOK5, ""));
        }
        if (str.contains(ORDERBOOK)) {
            return new OkexSubscribeMessage.SubscriptionTopic(ORDERBOOK, null, null, str.replace(ORDERBOOK, ""));
        }
        if (str.contains(TRADES)) {
            return new OkexSubscribeMessage.SubscriptionTopic(TRADES, null, null, str.replace(TRADES, ""));
        }
        if (str.contains(TICKERS)) {
            return new OkexSubscribeMessage.SubscriptionTopic(TICKERS, null, null, str.replace(TICKERS, ""));
        }
        if (str.contains(USERTRADES)) {
            return new OkexSubscribeMessage.SubscriptionTopic(USERTRADES, OkexInstType.ANY, null, str.replace(USERTRADES, ""));
        }
        if (str.contains(FUNDING_RATE)) {
            return new OkexSubscribeMessage.SubscriptionTopic(FUNDING_RATE, null, null, str.replace(FUNDING_RATE, ""));
        }
        throw new NotYetImplementedForExchangeException("ChannelName: " + str + " has not implemented yet on " + getClass().getSimpleName());
    }

    protected WebSocketClientHandler getWebSocketClientHandler(WebSocketClientHandshaker webSocketClientHandshaker, WebSocketClientHandler.WebSocketMessageHandler webSocketMessageHandler) {
        LOG.info("Registering OkxWebSocketClientHandler");
        return new OkxWebSocketClientHandler(webSocketClientHandshaker, webSocketMessageHandler);
    }

    public void setChannelInactiveHandler(WebSocketClientHandler.WebSocketMessageHandler webSocketMessageHandler) {
        this.channelInactiveHandler = webSocketMessageHandler;
    }

    public void pingPongDisconnectIfConnected() {
        if (this.pingPongSubscription == null || this.pingPongSubscription.isDisposed()) {
            return;
        }
        this.pingPongSubscription.dispose();
    }
}
