package info.bitrich.xchangestream.gemini;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.MoreObjects;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.gemini.dto.GeminiLimitOrder;
import info.bitrich.xchangestream.gemini.dto.GeminiOrderbook;
import info.bitrich.xchangestream.gemini.dto.GeminiWebSocketTransaction;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.PublishSubject;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException;
import org.knowm.xchange.gemini.v1.GeminiAdapters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/gemini/GeminiStreamingMarketDataService.class */
public class GeminiStreamingMarketDataService implements StreamingMarketDataService {
    private static final Logger LOG = LoggerFactory.getLogger(GeminiStreamingMarketDataService.class);
    private final GeminiStreamingService service;
    private final Map<CurrencyPair, GeminiOrderbook> orderbooks = new HashMap();
    private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

    public GeminiStreamingMarketDataService(GeminiStreamingService geminiStreamingService) {
        this.service = geminiStreamingService;
    }

    private boolean filterEventsByReason(JsonNode jsonNode, String str, String str2) {
        boolean z = false;
        if (jsonNode.has("events")) {
            Iterator it = jsonNode.get("events").iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                JsonNode jsonNode2 = (JsonNode) it.next();
                boolean z2 = str2 == null || (jsonNode2.has("reason") && jsonNode2.get("reason").asText().equals(str2));
                if (jsonNode2.get("type").asText().equals(str) && z2) {
                    z = true;
                    break;
                }
            }
        }
        return z;
    }

    public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... objArr) {
        int intValue = ((Integer) MoreObjects.firstNonNull(objArr.length > 0 ? objArr[0] : null, 0)).intValue();
        return this.service.subscribeChannel(currencyPair, Integer.valueOf(intValue), Integer.valueOf(intValue)).filter(jsonNode -> {
            return filterEventsByReason(jsonNode, "change", "initial") || filterEventsByReason(jsonNode, "change", "place") || filterEventsByReason(jsonNode, "change", "cancel") || filterEventsByReason(jsonNode, "change", "trade");
        }).filter(jsonNode2 -> {
            return this.orderbooks.get(currencyPair) != null || filterEventsByReason(jsonNode2, "change", "initial");
        }).map(jsonNode3 -> {
            if (filterEventsByReason(jsonNode3, "change", "initial")) {
                GeminiOrderbook geminiOrderbook = ((GeminiWebSocketTransaction) this.mapper.treeToValue(jsonNode3, GeminiWebSocketTransaction.class)).toGeminiOrderbook(currencyPair);
                this.orderbooks.put(currencyPair, geminiOrderbook);
                return geminiOrderbook;
            }
            if (!filterEventsByReason(jsonNode3, "change", "place") && !filterEventsByReason(jsonNode3, "change", "cancel") && !filterEventsByReason(jsonNode3, "change", "trade")) {
                throw new NotYetImplementedForExchangeException(" Unknown message type, even after filtering: " + jsonNode3.toString());
            }
            GeminiLimitOrder[] geminiLimitOrdersUpdate = ((GeminiWebSocketTransaction) this.mapper.treeToValue(jsonNode3, GeminiWebSocketTransaction.class)).toGeminiLimitOrdersUpdate();
            GeminiOrderbook geminiOrderbook2 = this.orderbooks.get(currencyPair);
            geminiOrderbook2.updateLevels(geminiLimitOrdersUpdate);
            return geminiOrderbook2;
        }).map(geminiOrderbook -> {
            return GeminiAdaptersX.toOrderbook(geminiOrderbook, intValue, new Date());
        });
    }

    public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... objArr) {
        return PublishSubject.create(observableEmitter -> {
            Observable<OrderBook> orderBook = getOrderBook(currencyPair, objArr);
            Consumer consumer = orderBook2 -> {
                LimitOrder limitOrder = (LimitOrder) orderBook2.getBids().iterator().next();
                LimitOrder limitOrder2 = (LimitOrder) orderBook2.getAsks().iterator().next();
                observableEmitter.onNext(new Ticker.Builder().currencyPair(currencyPair).bid(limitOrder.getLimitPrice()).bidSize(limitOrder.getOriginalAmount()).ask(limitOrder2.getLimitPrice()).askSize(limitOrder2.getOriginalAmount()).timestamp(limitOrder.getTimestamp().after(limitOrder2.getTimestamp()) ? limitOrder.getTimestamp() : limitOrder2.getTimestamp()).build());
            };
            Objects.requireNonNull(observableEmitter);
            orderBook.subscribe(consumer, observableEmitter::onError);
        });
    }

    public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... objArr) {
        return this.service.subscribeChannel(currencyPair, objArr).filter(jsonNode -> {
            return filterEventsByReason(jsonNode, "trade", null);
        }).map(jsonNode2 -> {
            return ((GeminiWebSocketTransaction) this.mapper.treeToValue(jsonNode2, GeminiWebSocketTransaction.class)).toGeminiTrades();
        }).flatMapIterable(geminiTradeArr -> {
            return GeminiAdapters.adaptTrades(geminiTradeArr, currencyPair).getTrades();
        });
    }
}
