package info.bitrich.xchangestream.bybit;

import com.fasterxml.jackson.databind.ObjectMapper;
import dto.marketdata.BybitOrderbook;
import dto.marketdata.BybitPublicOrder;
import dto.trade.BybitTrade;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.subjects.PublishSubject;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.knowm.xchange.bybit.BybitAdapters;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.OrderBookUpdate;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.instrument.Instrument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/bybit/BybitStreamingMarketDataService.class */
public class BybitStreamingMarketDataService implements StreamingMarketDataService {
    private final BybitStreamingService streamingService;
    public static final String TRADE = "publicTrade.";
    public static final String ORDERBOOK = "orderbook.";
    public static final String TICKER = "tickers.";
    private final Logger LOG = LoggerFactory.getLogger(BybitStreamingMarketDataService.class);
    private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
    private final Map<String, OrderBook> orderBookMap = new HashMap();
    private final Map<Instrument, PublishSubject<List<OrderBookUpdate>>> orderBookUpdatesSubscriptions = new ConcurrentHashMap();

    public BybitStreamingMarketDataService(BybitStreamingService bybitStreamingService) {
        this.streamingService = bybitStreamingService;
    }

    public Observable<OrderBook> getOrderBook(Instrument instrument, Object... objArr) {
        String str = "50";
        AtomicLong atomicLong = new AtomicLong();
        if (objArr.length > 0 && objArr[0] != null) {
            str = objArr[0].toString();
        }
        String str2 = ORDERBOOK + str + "." + BybitAdapters.convertToBybitSymbol(instrument);
        return this.streamingService.subscribeChannel(str2, new Object[0]).flatMap(jsonNode -> {
            BybitOrderbook bybitOrderbook = (BybitOrderbook) this.mapper.treeToValue(jsonNode, BybitOrderbook.class);
            String dataType = bybitOrderbook.getDataType();
            if (!dataType.equalsIgnoreCase("snapshot")) {
                return dataType.equalsIgnoreCase("delta") ? applyDeltaSnapshot(str2, instrument, bybitOrderbook, atomicLong) : Observable.fromIterable(new LinkedList());
            }
            OrderBook adaptOrderBook = BybitStreamAdapters.adaptOrderBook(bybitOrderbook, instrument);
            atomicLong.set(bybitOrderbook.getData().getU().intValue());
            this.orderBookMap.put(str2, adaptOrderBook);
            return Observable.just(adaptOrderBook);
        });
    }

    private Observable<OrderBook> applyDeltaSnapshot(String str, Instrument instrument, BybitOrderbook bybitOrderbook, AtomicLong atomicLong) {
        OrderBook orDefault = this.orderBookMap.getOrDefault(str, null);
        if (orDefault == null) {
            this.LOG.error("Failed to get orderBook, channelUniqueId= {}", str);
            return Observable.fromIterable(new LinkedList());
        }
        if (atomicLong.incrementAndGet() != bybitOrderbook.getData().getU().intValue()) {
            this.LOG.error("orderBookUpdate id sequence failed, expected {}, in fact {}", atomicLong, bybitOrderbook.getData().getU());
            return Observable.fromIterable(new LinkedList());
        }
        this.LOG.debug("orderBookUpdate id {}, seq {} ", bybitOrderbook.getData().getU(), bybitOrderbook.getData().getSeq());
        List<BybitPublicOrder> ask = bybitOrderbook.getData().getAsk();
        List<BybitPublicOrder> bid = bybitOrderbook.getData().getBid();
        Date date = new Date(Long.parseLong(bybitOrderbook.getTs()));
        ask.forEach(bybitPublicOrder -> {
            orDefault.update(BybitStreamAdapters.adaptOrderBookOrder(bybitPublicOrder, instrument, Order.OrderType.ASK, date));
        });
        bid.forEach(bybitPublicOrder2 -> {
            orDefault.update(BybitStreamAdapters.adaptOrderBookOrder(bybitPublicOrder2, instrument, Order.OrderType.BID, date));
        });
        if (this.orderBookUpdatesSubscriptions.get(instrument) != null) {
            orderBookUpdatesSubscriptions(instrument, ask, bid, date);
        }
        return Observable.just(orDefault);
    }

    public Observable<List<OrderBookUpdate>> getOrderBookUpdates(Instrument instrument, Object... objArr) {
        return this.orderBookUpdatesSubscriptions.computeIfAbsent(instrument, instrument2 -> {
            return PublishSubject.create();
        });
    }

    private void orderBookUpdatesSubscriptions(Instrument instrument, List<BybitPublicOrder> list, List<BybitPublicOrder> list2, Date date) {
        ArrayList arrayList = new ArrayList();
        for (BybitPublicOrder bybitPublicOrder : list) {
            arrayList.add(new OrderBookUpdate(Order.OrderType.ASK, new BigDecimal(bybitPublicOrder.getSize()), instrument, new BigDecimal(bybitPublicOrder.getPrice()), date, new BigDecimal(bybitPublicOrder.getSize())));
        }
        for (BybitPublicOrder bybitPublicOrder2 : list2) {
            arrayList.add(new OrderBookUpdate(Order.OrderType.BID, new BigDecimal(bybitPublicOrder2.getSize()), instrument, new BigDecimal(bybitPublicOrder2.getPrice()), date, new BigDecimal(bybitPublicOrder2.getSize())));
        }
        this.orderBookUpdatesSubscriptions.get(instrument).onNext(arrayList);
    }

    public Observable<Trade> getTrades(Instrument instrument, Object... objArr) {
        return this.streamingService.subscribeChannel(TRADE + BybitAdapters.convertToBybitSymbol(instrument), new Object[0]).filter(jsonNode -> {
            return jsonNode.has("data");
        }).flatMap(jsonNode2 -> {
            return Observable.fromIterable(BybitStreamAdapters.adaptTrades((List) this.mapper.treeToValue(jsonNode2.get("data"), this.mapper.getTypeFactory().constructCollectionType(List.class, BybitTrade.class)), instrument).getTrades());
        });
    }
}
