/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.lgo;

import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.lgo.LgoAdapter;
import info.bitrich.xchangestream.lgo.LgoStreamingService;
import info.bitrich.xchangestream.lgo.domain.LgoBatchOrderEvent;
import info.bitrich.xchangestream.lgo.domain.LgoGroupedUserUpdate;
import info.bitrich.xchangestream.lgo.dto.LgoUserMessage;
import info.bitrich.xchangestream.lgo.dto.LgoUserSnapshot;
import info.bitrich.xchangestream.lgo.dto.LgoUserUpdate;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.rxjava3.core.Observable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.dto.trade.MarketOrder;

class LgoUserBatchSubscription {
    private final Observable<LgoGroupedUserUpdate> downstream;
    private final LgoStreamingService streamingService;
    private final CurrencyPair currencyPair;

    static LgoUserBatchSubscription create(LgoStreamingService streamingService, CurrencyPair currencyPair) {
        return new LgoUserBatchSubscription(streamingService, currencyPair);
    }

    private LgoUserBatchSubscription(LgoStreamingService streamingService, CurrencyPair currencyPair) {
        this.streamingService = streamingService;
        this.currencyPair = currencyPair;
        this.downstream = this.createSubscription();
    }

    Observable<LgoGroupedUserUpdate> getPublisher() {
        return this.downstream;
    }

    private Observable<LgoGroupedUserUpdate> createSubscription() {
        ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
        return this.streamingService.subscribeChannel(LgoAdapter.channelName("user", this.currencyPair), new Object[0]).map(s -> (LgoUserMessage)mapper.treeToValue((TreeNode)s, LgoUserMessage.class)).scan((Object)new LgoGroupedUserUpdate(), (acc, s) -> {
            ArrayList<LgoBatchOrderEvent> events = new ArrayList<LgoBatchOrderEvent>();
            if (s.getType().equals("update")) {
                LgoUserUpdate userUpdate = (LgoUserUpdate)s;
                List<Order> updates = this.updateAllOrders(this.currencyPair, userUpdate.getOrderEvents(), acc.getAllOpenOrders());
                events.addAll(LgoAdapter.adaptOrderEvent(userUpdate.getOrderEvents(), s.getBatchId(), updates));
                return new LgoGroupedUserUpdate(acc.getAllOpenOrders(), updates, events, s.getBatchId(), s.getType());
            }
            Collection<LimitOrder> allOrders = this.handleUserSnapshot(this.currencyPair, (LgoUserSnapshot)s);
            ConcurrentMap<String, Order> ordersById = allOrders.stream().collect(Collectors.toConcurrentMap(Order::getId, this::copyOrder));
            return new LgoGroupedUserUpdate(ordersById, new ArrayList<LimitOrder>(allOrders), events, s.getBatchId(), s.getType());
        }).skip(1L).share();
    }

    private List<Order> updateAllOrders(CurrencyPair currencyPair, List<LgoBatchOrderEvent> orderEvents, Map<String, Order> allOpenOrders) {
        return orderEvents.stream().map(orderEvent -> orderEvent.applyOnOrders(currencyPair, allOpenOrders)).map(this::copyOrder).collect(Collectors.toList());
    }

    private Collection<LimitOrder> handleUserSnapshot(CurrencyPair currencyPair, LgoUserSnapshot s) {
        return LgoAdapter.adaptOrdersSnapshot(s.getSnapshotData(), currencyPair);
    }

    private Order copyOrder(Order order) {
        LimitOrder copy = order instanceof LimitOrder ? LimitOrder.Builder.from((Order)order).build() : MarketOrder.Builder.from((Order)order).build();
        copy.setFee(order.getFee());
        copy.setCumulativeAmount(order.getCumulativeAmount());
        return copy;
    }
}

