package info.bitrich.xchangestream.kraken;

import com.google.common.base.MoreObjects;
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.core.StreamingTradeService;
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionStatusMessage;
import info.bitrich.xchangestream.kraken.dto.KrakenSystemStatus;
import info.bitrich.xchangestream.kraken.dto.enums.KrakenEventType;
import info.bitrich.xchangestream.service.netty.ConnectionStateModel;
import io.reactivex.Completable;
import io.reactivex.Observable;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.kraken.KrakenExchange;
import org.knowm.xchange.kraken.dto.account.KrakenWebsocketToken;
import org.knowm.xchange.kraken.service.KrakenAccountServiceRaw;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/kraken/KrakenStreamingExchange.class */
public class KrakenStreamingExchange extends KrakenExchange implements StreamingExchange {
    private static final Logger LOG = LoggerFactory.getLogger(KrakenStreamingExchange.class);
    private static final String USE_BETA = "Use_Beta";
    private static final String USE_SPREAD_FOR_TICKER = "Spread_For_Ticker";
    private static final String API_URI = "wss://ws.kraken.com";
    private static final String API_AUTH_URI = "wss://ws-auth.kraken.com";
    private static final String API_BETA_URI = "wss://beta-ws.kraken.com";
    private KrakenStreamingService streamingService;
    private KrakenStreamingService privateStreamingService;
    private KrakenStreamingMarketDataService streamingMarketDataService;
    private KrakenStreamingTradeService streamingTradeService;

    public static String pickUri(boolean z, boolean z2) {
        return z2 ? API_BETA_URI : z ? API_AUTH_URI : API_URI;
    }

    protected void initServices() {
        super.initServices();
        Boolean bool = (Boolean) MoreObjects.firstNonNull((Boolean) this.exchangeSpecification.getExchangeSpecificParametersItem(USE_BETA), Boolean.FALSE);
        Boolean bool2 = (Boolean) MoreObjects.firstNonNull((Boolean) this.exchangeSpecification.getExchangeSpecificParametersItem(USE_SPREAD_FOR_TICKER), Boolean.FALSE);
        KrakenAccountServiceRaw accountService = getAccountService();
        this.streamingService = new KrakenStreamingService(this, false, pickUri(false, bool.booleanValue()), () -> {
            return authData(accountService);
        });
        applyStreamingSpecification(getExchangeSpecification(), this.streamingService);
        this.streamingMarketDataService = new KrakenStreamingMarketDataService(this.streamingService, bool2.booleanValue());
        if (StringUtils.isNotEmpty(this.exchangeSpecification.getApiKey())) {
            this.privateStreamingService = new KrakenStreamingService(this, true, pickUri(true, bool.booleanValue()), () -> {
                return authData(accountService);
            });
            applyStreamingSpecification(getExchangeSpecification(), this.privateStreamingService);
        }
        this.streamingTradeService = new KrakenStreamingTradeService(this.privateStreamingService);
    }

    public Completable connect(ProductSubscription... productSubscriptionArr) {
        return this.privateStreamingService != null ? this.privateStreamingService.connect().mergeWith(this.streamingService.connect()) : this.streamingService.connect();
    }

    public Completable disconnect() {
        return this.privateStreamingService != null ? this.privateStreamingService.disconnect().mergeWith(this.streamingService.disconnect()) : this.streamingService.disconnect();
    }

    public boolean isAlive() {
        return this.streamingService.isSocketOpen() && (this.privateStreamingService == null || this.privateStreamingService.isSocketOpen());
    }

    public Observable<Object> connectionSuccess() {
        return this.streamingService.subscribeConnectionSuccess();
    }

    public Observable<Object> disconnectObservable() {
        return this.streamingService.subscribeDisconnect();
    }

    public Observable<Throwable> reconnectFailure() {
        return this.streamingService.subscribeReconnectFailure();
    }

    public Observable<ConnectionStateModel.State> connectionStateObservable() {
        return this.streamingService.subscribeConnectionState();
    }

    public Observable<Object> privateConnectionSuccess() {
        return this.privateStreamingService.subscribeConnectionSuccess();
    }

    public Observable<Throwable> privateReconnectFailure() {
        return this.privateStreamingService.subscribeReconnectFailure();
    }

    public Observable<ConnectionStateModel.State> privateConnectionStateObservable() {
        return this.privateStreamingService.subscribeConnectionState();
    }

    public Observable<Object> privateDisconnectObservable() {
        return this.privateStreamingService.subscribeDisconnect();
    }

    public ExchangeSpecification getDefaultExchangeSpecification() {
        ExchangeSpecification defaultExchangeSpecification = super.getDefaultExchangeSpecification();
        defaultExchangeSpecification.setShouldLoadRemoteMetaData(false);
        return defaultExchangeSpecification;
    }

    public StreamingMarketDataService getStreamingMarketDataService() {
        return this.streamingMarketDataService;
    }

    public StreamingTradeService getStreamingTradeService() {
        return this.streamingTradeService;
    }

    public void useCompressedMessages(boolean z) {
        this.streamingService.useCompressedMessages(z);
    }

    public KrakenWebsocketToken authData(KrakenAccountServiceRaw krakenAccountServiceRaw) {
        if (krakenAccountServiceRaw == null) {
            return null;
        }
        try {
            return krakenAccountServiceRaw.getKrakenWebsocketToken();
        } catch (IOException e) {
            this.logger.warn("Failed attempting to acquire Websocket AuthData needed for private data on websocket.  Will only receive public information via API", e);
            return null;
        }
    }

    public void resubscribeChannels() {
        this.logger.debug("Resubscribing channels");
        this.streamingService.resubscribeChannels();
        if (this.privateStreamingService != null) {
            this.privateStreamingService.resubscribeChannels();
        }
    }

    public Observable<KrakenSystemStatus> getSystemStatusChanges() {
        return this.streamingService.subscribeSystemChannel(KrakenEventType.systemStatus).filter(krakenEvent -> {
            return krakenEvent instanceof KrakenSystemStatus;
        }).map(krakenEvent2 -> {
            return (KrakenSystemStatus) krakenEvent2;
        }).share();
    }

    public Observable<KrakenSubscriptionStatusMessage> getPublicSubscriptionStatusChanges() {
        return this.streamingService.subscribeSystemChannel(KrakenEventType.subscriptionStatus).filter(krakenEvent -> {
            return krakenEvent instanceof KrakenSubscriptionStatusMessage;
        }).map(krakenEvent2 -> {
            return (KrakenSubscriptionStatusMessage) krakenEvent2;
        }).share();
    }

    public Observable<KrakenSubscriptionStatusMessage> getPrivateSubscriptionStatusChanges() {
        return this.privateStreamingService.subscribeSystemChannel(KrakenEventType.subscriptionStatus).filter(krakenEvent -> {
            return krakenEvent instanceof KrakenSubscriptionStatusMessage;
        }).map(krakenEvent2 -> {
            return (KrakenSubscriptionStatusMessage) krakenEvent2;
        }).share();
    }
}
