package info.bitrich.xchangestream.ftx;

import com.fasterxml.jackson.databind.JsonNode;
import info.bitrich.xchangestream.core.StreamingTradeService;
import io.reactivex.Observable;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.trade.UserTrade;

/* loaded from: input_file:info/bitrich/xchangestream/ftx/FtxStreamingTradeService.class */
public class FtxStreamingTradeService implements StreamingTradeService {
    private final Observable<JsonNode> fills;
    private final Observable<JsonNode> orders;

    public FtxStreamingTradeService(FtxStreamingService ftxStreamingService) {
        this.fills = ftxStreamingService.subscribeChannel("fills", new Object[0]);
        this.orders = ftxStreamingService.subscribeChannel("orders", new Object[0]);
    }

    public Observable<UserTrade> getUserTrades(CurrencyPair currencyPair, Object... objArr) {
        return this.fills.filter(jsonNode -> {
            return jsonNode.hasNonNull("data");
        }).filter(jsonNode2 -> {
            return new CurrencyPair(jsonNode2.get("data").get("market").asText()).equals(currencyPair);
        }).map(FtxStreamingAdapters::adaptUserTrade);
    }

    public Observable<Order> getOrderChanges(CurrencyPair currencyPair, Object... objArr) {
        return this.orders.filter(jsonNode -> {
            return jsonNode.hasNonNull("data");
        }).filter(jsonNode2 -> {
            return new CurrencyPair(jsonNode2.get("data").get("market").asText()).equals(currencyPair);
        }).map(FtxStreamingAdapters::adaptOrders);
    }
}
