package info.bitrich.xchangestream.ftx;

import com.google.common.collect.Lists;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import io.reactivex.rxjava3.core.Observable;
import java.util.Date;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.ftx.FtxAdapters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/ftx/FtxStreamingMarketDataService.class */
public class FtxStreamingMarketDataService implements StreamingMarketDataService {
    private static final Logger LOG = LoggerFactory.getLogger(FtxStreamingMarketDataService.class);
    private final FtxStreamingService service;

    public FtxStreamingMarketDataService(FtxStreamingService ftxStreamingService) {
        this.service = ftxStreamingService;
    }

    public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... objArr) {
        OrderBook orderBook = new OrderBook((Date) null, Lists.newArrayList(), Lists.newArrayList());
        String str = "orderbook:" + FtxAdapters.adaptCurrencyPairToFtxMarket(currencyPair);
        return this.service.subscribeChannel(str, new Object[0]).map(jsonNode -> {
            try {
                return FtxStreamingAdapters.adaptOrderbookMessage(orderBook, currencyPair, jsonNode);
            } catch (IllegalStateException e) {
                LOG.warn("Resubscribing {} channel after adapter error {}", currencyPair, e.getMessage());
                orderBook.getBids().clear();
                orderBook.getAsks().clear();
                this.service.sendMessage(this.service.getUnsubscribeMessage(str, objArr));
                this.service.sendMessage(this.service.getSubscribeMessage(str, objArr));
                return new OrderBook((Date) null, Lists.newArrayList(), Lists.newArrayList(), false);
            }
        }).filter(orderBook2 -> {
            return orderBook2.getBids().size() > 0 && orderBook2.getAsks().size() > 0;
        });
    }

    public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... objArr) {
        return this.service.subscribeChannel("ticker:" + FtxAdapters.adaptCurrencyPairToFtxMarket(currencyPair), new Object[0]).map(jsonNode -> {
            return FtxStreamingAdapters.adaptTickerMessage(currencyPair, jsonNode);
        }).filter(ticker -> {
            return ticker != FtxStreamingAdapters.NULL_TICKER;
        });
    }

    public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... objArr) {
        return this.service.subscribeChannel("trades:" + FtxAdapters.adaptCurrencyPairToFtxMarket(currencyPair), new Object[0]).flatMapIterable(jsonNode -> {
            return FtxStreamingAdapters.adaptTradesMessage(currencyPair, jsonNode);
        });
    }
}
