package info.bitrich.xchangestream.poloniex2;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.poloniex2.dto.OrderbookInsertEvent;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexOrderbook;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketAdapter;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketEvent;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketOrderbookInsertEvent;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketOrderbookModifiedEvent;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketTickerTransaction;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketTradeEvent;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.poloniex.PoloniexAdapters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/poloniex2/PoloniexStreamingMarketDataService.class */
public class PoloniexStreamingMarketDataService implements StreamingMarketDataService {
    private static final Logger LOG = LoggerFactory.getLogger(PoloniexStreamingMarketDataService.class);
    private static final String TICKER_CHANNEL_ID = "1002";
    private final PoloniexStreamingService service;
    private final Supplier<Observable<Ticker>> streamingTickers;

    public PoloniexStreamingMarketDataService(PoloniexStreamingService poloniexStreamingService, Map<Integer, CurrencyPair> map) {
        this.service = poloniexStreamingService;
        ObjectMapper objectMapper = StreamingObjectMapperHelper.getObjectMapper();
        this.streamingTickers = Suppliers.memoize(() -> {
            return poloniexStreamingService.subscribeChannel(TICKER_CHANNEL_ID, new Object[0]).map(jsonNode -> {
                PoloniexWebSocketTickerTransaction poloniexWebSocketTickerTransaction = (PoloniexWebSocketTickerTransaction) objectMapper.treeToValue(jsonNode, PoloniexWebSocketTickerTransaction.class);
                CurrencyPair currencyPair = (CurrencyPair) map.get(Integer.valueOf(poloniexWebSocketTickerTransaction.getPairId()));
                return PoloniexAdapters.adaptPoloniexTicker(poloniexWebSocketTickerTransaction.toPoloniexTicker(currencyPair), currencyPair);
            }).share();
        });
    }

    public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... objArr) {
        return this.service.subscribeCurrencyPairChannel(currencyPair).scan(Optional.empty(), (optional, list) -> {
            return (Optional) list.stream().filter(poloniexWebSocketEvent -> {
                return (poloniexWebSocketEvent instanceof PoloniexWebSocketOrderbookInsertEvent) || (poloniexWebSocketEvent instanceof PoloniexWebSocketOrderbookModifiedEvent);
            }).reduce(optional, (optional, poloniexWebSocketEvent2) -> {
                return getPoloniexOrderbook(optional, poloniexWebSocketEvent2);
            }, (optional2, optional3) -> {
                throw new UnsupportedOperationException("No parallel execution");
            });
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).map(poloniexOrderbook -> {
            return PoloniexAdapters.adaptPoloniexDepth(poloniexOrderbook.toPoloniexDepth(), currencyPair);
        });
    }

    public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... objArr) {
        return ((Observable) this.streamingTickers.get()).filter(ticker -> {
            return ticker.getCurrencyPair().equals(currencyPair);
        });
    }

    public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... objArr) {
        Observable flatMapIterable = this.service.subscribeCurrencyPairChannel(currencyPair).flatMapIterable(list -> {
            return list;
        });
        Class<PoloniexWebSocketTradeEvent> cls = PoloniexWebSocketTradeEvent.class;
        Objects.requireNonNull(PoloniexWebSocketTradeEvent.class);
        Observable filter = flatMapIterable.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Class<PoloniexWebSocketTradeEvent> cls2 = PoloniexWebSocketTradeEvent.class;
        Objects.requireNonNull(PoloniexWebSocketTradeEvent.class);
        return filter.map((v1) -> {
            return r1.cast(v1);
        }).share().map(poloniexWebSocketTradeEvent -> {
            return PoloniexWebSocketAdapter.convertPoloniexWebSocketTradeEventToTrade(poloniexWebSocketTradeEvent, currencyPair);
        });
    }

    private Optional<PoloniexOrderbook> getPoloniexOrderbook(Optional<PoloniexOrderbook> optional, PoloniexWebSocketEvent poloniexWebSocketEvent) {
        if (poloniexWebSocketEvent.getEventType().equals("i")) {
            OrderbookInsertEvent insert = ((PoloniexWebSocketOrderbookInsertEvent) poloniexWebSocketEvent).getInsert();
            return Optional.of(new PoloniexOrderbook(insert.toDepthLevels(0), insert.toDepthLevels(1)));
        }
        optional.orElseThrow(() -> {
            return new IllegalStateException("Orderbook update received before initial snapshot");
        }).modify(((PoloniexWebSocketOrderbookModifiedEvent) poloniexWebSocketEvent).getModifiedEvent());
        return optional;
    }
}
