/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.lgo;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.core.StreamingTradeService;
import info.bitrich.xchangestream.lgo.LgoAdapter;
import info.bitrich.xchangestream.lgo.LgoStreamingService;
import info.bitrich.xchangestream.lgo.LgoUserBatchSubscription;
import info.bitrich.xchangestream.lgo.domain.LgoGroupedUserUpdate;
import info.bitrich.xchangestream.lgo.domain.LgoMatchOrderEvent;
import info.bitrich.xchangestream.lgo.domain.LgoOrderEvent;
import info.bitrich.xchangestream.lgo.dto.LgoAckUpdate;
import info.bitrich.xchangestream.lgo.dto.LgoSocketPlaceOrder;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.rxjava3.core.Observable;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.dto.trade.MarketOrder;
import org.knowm.xchange.dto.trade.OpenOrders;
import org.knowm.xchange.dto.trade.UserTrade;
import org.knowm.xchange.lgo.LgoAdapters;
import org.knowm.xchange.lgo.dto.key.LgoKey;
import org.knowm.xchange.lgo.dto.order.LgoEncryptedOrder;
import org.knowm.xchange.lgo.dto.order.LgoOrderSignature;
import org.knowm.xchange.lgo.dto.order.LgoPlaceCancelOrder;
import org.knowm.xchange.lgo.dto.order.LgoPlaceOrder;
import org.knowm.xchange.lgo.service.CryptoUtils;
import org.knowm.xchange.lgo.service.LgoKeyService;
import org.knowm.xchange.lgo.service.LgoSignatureService;
import si.mazi.rescu.SynchronizedValueFactory;

public class LgoStreamingTradeService
implements StreamingTradeService {
    private final LgoStreamingService streamingService;
    private final LgoKeyService keyService;
    private final LgoSignatureService signatureService;
    private final SynchronizedValueFactory<Long> nonceFactory;
    private final Map<CurrencyPair, LgoUserBatchSubscription> batchSubscriptions = new ConcurrentHashMap<CurrencyPair, LgoUserBatchSubscription>();
    private Observable<LgoOrderEvent> afrSubscription;

    LgoStreamingTradeService(LgoStreamingService streamingService, LgoKeyService keyService, LgoSignatureService signatureService, SynchronizedValueFactory<Long> nonceFactory) {
        this.streamingService = streamingService;
        this.keyService = keyService;
        this.signatureService = signatureService;
        this.nonceFactory = nonceFactory;
    }

    public Observable<Order> getOrderChanges(CurrencyPair currencyPair, Object ... args) {
        return this.getOrderBatchChanges(currencyPair).flatMap(Observable::fromIterable);
    }

    public Observable<OpenOrders> getOpenOrders(CurrencyPair currencyPair) {
        return this.getOrderUpdates(currencyPair).map(u -> u.getAllOpenOrders().values().stream().filter(order -> order instanceof LimitOrder).map(order -> (LimitOrder)order).collect(Collectors.toList())).map(OpenOrders::new);
    }

    public Observable<Collection<Order>> getOrderBatchChanges(CurrencyPair currencyPair) {
        return this.getOrderUpdates(currencyPair).map(LgoGroupedUserUpdate::getUpdatedOrders);
    }

    private Observable<LgoGroupedUserUpdate> getOrderUpdates(CurrencyPair currencyPair) {
        return this.batchSubscriptions.computeIfAbsent(currencyPair, this::createBatchSubscription).getPublisher();
    }

    private LgoUserBatchSubscription createBatchSubscription(CurrencyPair currencyPair) {
        return LgoUserBatchSubscription.create(this.streamingService, currencyPair);
    }

    public Observable<UserTrade> getUserTrades(CurrencyPair currencyPair, Object ... args) {
        return this.getRawBatchOrderEvents(currencyPair).filter(lgoOrderEvent -> "match".equals(lgoOrderEvent.getType())).map(matchEvent -> LgoAdapter.adaptUserTrade(currencyPair, (LgoMatchOrderEvent)matchEvent));
    }

    public Observable<LgoOrderEvent> getRawAllOrderEvents(Collection<CurrencyPair> currencyPairs) {
        Observable<LgoOrderEvent> ackObservable = this.getRawReceivedOrderEvents();
        return currencyPairs.stream().map(this::getRawBatchOrderEvents).reduce(Observable::mergeWith).map(arg_0 -> ackObservable.mergeWith(arg_0)).orElse(ackObservable);
    }

    public Observable<LgoOrderEvent> getRawReceivedOrderEvents() {
        if (this.afrSubscription == null) {
            this.createAfrSubscription();
        }
        return this.afrSubscription;
    }

    private void createAfrSubscription() {
        ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
        this.afrSubscription = this.streamingService.subscribeChannel("afr", new Object[0]).map(s -> (LgoAckUpdate)mapper.treeToValue((TreeNode)s, LgoAckUpdate.class)).map(LgoAckUpdate::getData).flatMap(Observable::fromIterable).share();
    }

    public Observable<LgoOrderEvent> getRawBatchOrderEvents(CurrencyPair currencyPair) {
        return this.getOrderUpdates(currencyPair).map(LgoGroupedUserUpdate::getEvents).flatMap(Observable::fromIterable);
    }

    public String placeMarketOrder(MarketOrder marketOrder) throws IOException {
        Long ref = (Long)this.nonceFactory.createValue();
        LgoPlaceOrder lgoOrder = LgoAdapters.adaptEncryptedMarketOrder((MarketOrder)marketOrder);
        return this.placeOrder(ref, lgoOrder);
    }

    public String placeLimitOrder(LimitOrder limitOrder) throws IOException {
        Long ref = (Long)this.nonceFactory.createValue();
        LgoPlaceOrder lgoOrder = LgoAdapters.adaptLimitOrder((LimitOrder)limitOrder);
        return this.placeOrder(ref, lgoOrder);
    }

    public boolean cancelOrder(String orderId) throws IOException {
        Long ref = (Long)this.nonceFactory.createValue();
        LgoPlaceCancelOrder lgoOrder = new LgoPlaceCancelOrder(ref.longValue(), orderId, new Date().toInstant());
        this.placeOrder(ref, (LgoPlaceOrder)lgoOrder);
        return true;
    }

    private String placeOrder(Long ref, LgoPlaceOrder lgoOrder) throws JsonProcessingException {
        LgoKey lgoKey = this.keyService.selectKey();
        String encryptedOrder = CryptoUtils.encryptOrder((LgoKey)lgoKey, (LgoPlaceOrder)lgoOrder);
        LgoOrderSignature signature = this.signatureService.signOrder(encryptedOrder);
        LgoSocketPlaceOrder placeOrder = new LgoSocketPlaceOrder(new LgoEncryptedOrder(lgoKey.getId(), encryptedOrder, signature, ref.longValue()));
        String payload = StreamingObjectMapperHelper.getObjectMapper().writeValueAsString((Object)placeOrder);
        this.streamingService.sendMessage(payload);
        return ref.toString();
    }
}

