/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.lgo;

import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.lgo.LgoLevel2BatchSubscription;
import info.bitrich.xchangestream.lgo.LgoStreamingService;
import info.bitrich.xchangestream.lgo.LgoTradeBatchSubscription;
import io.reactivex.rxjava3.core.Observable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Trade;

public class LgoStreamingMarketDataService
implements StreamingMarketDataService {
    private final LgoStreamingService service;
    private final Map<CurrencyPair, LgoLevel2BatchSubscription> level2Subscriptions = new ConcurrentHashMap<CurrencyPair, LgoLevel2BatchSubscription>();
    private final Map<CurrencyPair, LgoTradeBatchSubscription> tradeSubscriptions = new ConcurrentHashMap<CurrencyPair, LgoTradeBatchSubscription>();

    LgoStreamingMarketDataService(LgoStreamingService lgoStreamingService) {
        this.service = lgoStreamingService;
    }

    public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object ... args) {
        return this.level2Subscriptions.computeIfAbsent(currencyPair, this::createLevel2Subscription).getSubscription();
    }

    private LgoLevel2BatchSubscription createLevel2Subscription(CurrencyPair currencyPair) {
        return LgoLevel2BatchSubscription.create(this.service, currencyPair);
    }

    public Observable<Trade> getTrades(CurrencyPair currencyPair, Object ... args) {
        return this.tradeSubscriptions.computeIfAbsent(currencyPair, this::createTradeSubscription).getSubscription();
    }

    private LgoTradeBatchSubscription createTradeSubscription(CurrencyPair currencyPair) {
        return LgoTradeBatchSubscription.create(this.service, currencyPair);
    }
}

