/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.gemini;

import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.MoreObjects;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.gemini.GeminiAdaptersX;
import info.bitrich.xchangestream.gemini.GeminiStreamingService;
import info.bitrich.xchangestream.gemini.dto.GeminiLimitOrder;
import info.bitrich.xchangestream.gemini.dto.GeminiOrderbook;
import info.bitrich.xchangestream.gemini.dto.GeminiWebSocketTransaction;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.subjects.PublishSubject;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException;
import org.knowm.xchange.gemini.v1.GeminiAdapters;
import org.knowm.xchange.gemini.v1.dto.marketdata.GeminiTrade;
import org.knowm.xchange.instrument.Instrument;

public class GeminiStreamingMarketDataService
implements StreamingMarketDataService {
    private final GeminiStreamingService service;
    private final Map<CurrencyPair, GeminiOrderbook> orderbooks = new HashMap<CurrencyPair, GeminiOrderbook>();
    private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

    public GeminiStreamingMarketDataService(GeminiStreamingService service) {
        this.service = service;
    }

    private boolean filterEventsByReason(JsonNode message, String type, String reason) {
        boolean hasEvents = false;
        if (message.has("events")) {
            for (JsonNode jsonEvent : message.get("events")) {
                boolean reasonResult;
                boolean bl = reasonResult = reason == null || jsonEvent.has("reason") && jsonEvent.get("reason").asText().equals(reason);
                if (!jsonEvent.get("type").asText().equals(type) || !reasonResult) continue;
                hasEvents = true;
                break;
            }
        }
        return hasEvents;
    }

    public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object ... args) {
        int maxDepth = (Integer)MoreObjects.firstNonNull((Object)(args.length > 0 ? args[0] : null), (Object)1);
        Observable subscribedOrderbookSnapshot = this.service.subscribeChannel(currencyPair, maxDepth, maxDepth).filter(s -> this.filterEventsByReason((JsonNode)s, "change", "initial") || this.filterEventsByReason((JsonNode)s, "change", "place") || this.filterEventsByReason((JsonNode)s, "change", "cancel") || this.filterEventsByReason((JsonNode)s, "change", "trade")).filter(s -> this.orderbooks.get(currencyPair) != null || this.filterEventsByReason((JsonNode)s, "change", "initial")).map(s -> {
            if (this.filterEventsByReason((JsonNode)s, "change", "initial")) {
                GeminiWebSocketTransaction transaction = (GeminiWebSocketTransaction)this.mapper.treeToValue((TreeNode)s, GeminiWebSocketTransaction.class);
                GeminiOrderbook orderbook = transaction.toGeminiOrderbook(currencyPair);
                this.orderbooks.put(currencyPair, orderbook);
                return orderbook;
            }
            if (this.filterEventsByReason((JsonNode)s, "change", "place") || this.filterEventsByReason((JsonNode)s, "change", "cancel") || this.filterEventsByReason((JsonNode)s, "change", "trade")) {
                GeminiWebSocketTransaction transaction = (GeminiWebSocketTransaction)this.mapper.treeToValue((TreeNode)s, GeminiWebSocketTransaction.class);
                GeminiLimitOrder[] levels = transaction.toGeminiLimitOrdersUpdate();
                GeminiOrderbook orderbook = this.orderbooks.get(currencyPair);
                orderbook.updateLevels(levels);
                return orderbook;
            }
            throw new NotYetImplementedForExchangeException(" Unknown message type, even after filtering: " + s.toString());
        });
        return subscribedOrderbookSnapshot.map(geminiOrderbook -> GeminiAdaptersX.toOrderbook(geminiOrderbook, maxDepth, new Date()));
    }

    public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object ... args) {
        return PublishSubject.create(emitter -> this.getOrderBook(currencyPair, args).subscribe(orderBook -> {
            LimitOrder firstBid = (LimitOrder)orderBook.getBids().iterator().next();
            LimitOrder firstAsk = (LimitOrder)orderBook.getAsks().iterator().next();
            emitter.onNext((Object)new Ticker.Builder().instrument((Instrument)currencyPair).bid(firstBid.getLimitPrice()).bidSize(firstBid.getOriginalAmount()).ask(firstAsk.getLimitPrice()).askSize(firstAsk.getOriginalAmount()).timestamp(firstBid.getTimestamp().after(firstAsk.getTimestamp()) ? firstBid.getTimestamp() : firstAsk.getTimestamp()).build());
        }, arg_0 -> ((ObservableEmitter)emitter).onError(arg_0)));
    }

    public Observable<Trade> getTrades(CurrencyPair currencyPair, Object ... args) {
        Observable subscribedTrades = this.service.subscribeChannel(currencyPair, args).filter(s -> this.filterEventsByReason((JsonNode)s, "trade", null)).map(s -> {
            GeminiWebSocketTransaction transaction = (GeminiWebSocketTransaction)this.mapper.treeToValue((TreeNode)s, GeminiWebSocketTransaction.class);
            return transaction.toGeminiTrades();
        });
        return subscribedTrades.flatMapIterable(s -> GeminiAdapters.adaptTrades((GeminiTrade[])s, (CurrencyPair)currencyPair).getTrades());
    }
}

