package info.bitrich.xchangestream.lgo;

import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.lgo.domain.LgoBatchOrderEvent;
import info.bitrich.xchangestream.lgo.domain.LgoGroupedUserUpdate;
import info.bitrich.xchangestream.lgo.dto.LgoUserMessage;
import info.bitrich.xchangestream.lgo.dto.LgoUserSnapshot;
import info.bitrich.xchangestream.lgo.dto.LgoUserUpdate;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.dto.trade.MarketOrder;

/* loaded from: input_file:info/bitrich/xchangestream/lgo/LgoUserBatchSubscription.class */
class LgoUserBatchSubscription {
    private final Observable<LgoGroupedUserUpdate> downstream = createSubscription();
    private final LgoStreamingService streamingService;
    private final CurrencyPair currencyPair;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static LgoUserBatchSubscription create(LgoStreamingService lgoStreamingService, CurrencyPair currencyPair) {
        return new LgoUserBatchSubscription(lgoStreamingService, currencyPair);
    }

    private LgoUserBatchSubscription(LgoStreamingService lgoStreamingService, CurrencyPair currencyPair) {
        this.streamingService = lgoStreamingService;
        this.currencyPair = currencyPair;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Observable<LgoGroupedUserUpdate> getPublisher() {
        return this.downstream;
    }

    private Observable<LgoGroupedUserUpdate> createSubscription() {
        ObjectMapper objectMapper = StreamingObjectMapperHelper.getObjectMapper();
        return this.streamingService.subscribeChannel(LgoAdapter.channelName("user", this.currencyPair), new Object[0]).map(jsonNode -> {
            return (LgoUserMessage) objectMapper.readValue(jsonNode.toString(), LgoUserMessage.class);
        }).scan(new LgoGroupedUserUpdate(), (lgoGroupedUserUpdate, lgoUserMessage) -> {
            ArrayList arrayList = new ArrayList();
            if (!lgoUserMessage.getType().equals("update")) {
                Collection<LimitOrder> handleUserSnapshot = handleUserSnapshot(this.currencyPair, (LgoUserSnapshot) lgoUserMessage);
                return new LgoGroupedUserUpdate((ConcurrentMap) handleUserSnapshot.stream().collect(Collectors.toConcurrentMap((v0) -> {
                    return v0.getId();
                }, (v1) -> {
                    return copyOrder(v1);
                })), new ArrayList(handleUserSnapshot), arrayList, lgoUserMessage.getBatchId(), lgoUserMessage.getType());
            }
            LgoUserUpdate lgoUserUpdate = (LgoUserUpdate) lgoUserMessage;
            List<Order> updateAllOrders = updateAllOrders(this.currencyPair, lgoUserUpdate.getOrderEvents(), lgoGroupedUserUpdate.getAllOpenOrders());
            arrayList.addAll(LgoAdapter.adaptOrderEvent(lgoUserUpdate.getOrderEvents(), Long.valueOf(lgoUserMessage.getBatchId()), updateAllOrders));
            return new LgoGroupedUserUpdate(lgoGroupedUserUpdate.getAllOpenOrders(), updateAllOrders, arrayList, lgoUserMessage.getBatchId(), lgoUserMessage.getType());
        }).skip(1L).share();
    }

    private List<Order> updateAllOrders(CurrencyPair currencyPair, List<LgoBatchOrderEvent> list, Map<String, Order> map) {
        return (List) list.stream().map(lgoBatchOrderEvent -> {
            return lgoBatchOrderEvent.applyOnOrders(currencyPair, map);
        }).map(this::copyOrder).collect(Collectors.toList());
    }

    private Collection<LimitOrder> handleUserSnapshot(CurrencyPair currencyPair, LgoUserSnapshot lgoUserSnapshot) {
        return LgoAdapter.adaptOrdersSnapshot(lgoUserSnapshot.getSnapshotData(), currencyPair);
    }

    private Order copyOrder(Order order) {
        LimitOrder build = order instanceof LimitOrder ? LimitOrder.Builder.from(order).build() : MarketOrder.Builder.from(order).build();
        build.setFee(order.getFee());
        build.setCumulativeAmount(order.getCumulativeAmount());
        return build;
    }
}
