package info.bitrich.xchangestream.lgo;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.core.StreamingTradeService;
import info.bitrich.xchangestream.lgo.domain.LgoGroupedUserUpdate;
import info.bitrich.xchangestream.lgo.domain.LgoMatchOrderEvent;
import info.bitrich.xchangestream.lgo.domain.LgoOrderEvent;
import info.bitrich.xchangestream.lgo.dto.LgoAckUpdate;
import info.bitrich.xchangestream.lgo.dto.LgoSocketPlaceOrder;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.dto.trade.MarketOrder;
import org.knowm.xchange.dto.trade.OpenOrders;
import org.knowm.xchange.dto.trade.UserTrade;
import org.knowm.xchange.lgo.LgoAdapters;
import org.knowm.xchange.lgo.dto.key.LgoKey;
import org.knowm.xchange.lgo.dto.order.LgoEncryptedOrder;
import org.knowm.xchange.lgo.dto.order.LgoPlaceCancelOrder;
import org.knowm.xchange.lgo.dto.order.LgoPlaceOrder;
import org.knowm.xchange.lgo.service.CryptoUtils;
import org.knowm.xchange.lgo.service.LgoKeyService;
import org.knowm.xchange.lgo.service.LgoSignatureService;
import si.mazi.rescu.SynchronizedValueFactory;

/* loaded from: input_file:info/bitrich/xchangestream/lgo/LgoStreamingTradeService.class */
public class LgoStreamingTradeService implements StreamingTradeService {
    private final LgoStreamingService streamingService;
    private final LgoKeyService keyService;
    private final LgoSignatureService signatureService;
    private final SynchronizedValueFactory<Long> nonceFactory;
    private final Map<CurrencyPair, LgoUserBatchSubscription> batchSubscriptions = new ConcurrentHashMap();
    private Observable<LgoOrderEvent> afrSubscription;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LgoStreamingTradeService(LgoStreamingService lgoStreamingService, LgoKeyService lgoKeyService, LgoSignatureService lgoSignatureService, SynchronizedValueFactory<Long> synchronizedValueFactory) {
        this.streamingService = lgoStreamingService;
        this.keyService = lgoKeyService;
        this.signatureService = lgoSignatureService;
        this.nonceFactory = synchronizedValueFactory;
    }

    public Observable<Order> getOrderChanges(CurrencyPair currencyPair, Object... objArr) {
        return getOrderBatchChanges(currencyPair).flatMap((v0) -> {
            return Observable.fromIterable(v0);
        });
    }

    public Observable<OpenOrders> getOpenOrders(CurrencyPair currencyPair) {
        return getOrderUpdates(currencyPair).map(lgoGroupedUserUpdate -> {
            return (List) lgoGroupedUserUpdate.getAllOpenOrders().values().stream().filter(order -> {
                return order instanceof LimitOrder;
            }).map(order2 -> {
                return (LimitOrder) order2;
            }).collect(Collectors.toList());
        }).map(OpenOrders::new);
    }

    public Observable<Collection<Order>> getOrderBatchChanges(CurrencyPair currencyPair) {
        return getOrderUpdates(currencyPair).map((v0) -> {
            return v0.getUpdatedOrders();
        });
    }

    private Observable<LgoGroupedUserUpdate> getOrderUpdates(CurrencyPair currencyPair) {
        return this.batchSubscriptions.computeIfAbsent(currencyPair, this::createBatchSubscription).getPublisher();
    }

    private LgoUserBatchSubscription createBatchSubscription(CurrencyPair currencyPair) {
        return LgoUserBatchSubscription.create(this.streamingService, currencyPair);
    }

    public Observable<UserTrade> getUserTrades(CurrencyPair currencyPair, Object... objArr) {
        return getRawBatchOrderEvents(currencyPair).filter(lgoOrderEvent -> {
            return "match".equals(lgoOrderEvent.getType());
        }).map(lgoOrderEvent2 -> {
            return LgoAdapter.adaptUserTrade(currencyPair, (LgoMatchOrderEvent) lgoOrderEvent2);
        });
    }

    public Observable<LgoOrderEvent> getRawAllOrderEvents(Collection<CurrencyPair> collection) {
        Observable<LgoOrderEvent> rawReceivedOrderEvents = getRawReceivedOrderEvents();
        Optional reduce = collection.stream().map(this::getRawBatchOrderEvents).reduce((v0, v1) -> {
            return v0.mergeWith(v1);
        });
        Objects.requireNonNull(rawReceivedOrderEvents);
        return (Observable) reduce.map((v1) -> {
            return r1.mergeWith(v1);
        }).orElse(rawReceivedOrderEvents);
    }

    public Observable<LgoOrderEvent> getRawReceivedOrderEvents() {
        if (this.afrSubscription == null) {
            createAfrSubscription();
        }
        return this.afrSubscription;
    }

    private void createAfrSubscription() {
        ObjectMapper objectMapper = StreamingObjectMapperHelper.getObjectMapper();
        this.afrSubscription = this.streamingService.subscribeChannel("afr", new Object[0]).map(jsonNode -> {
            return (LgoAckUpdate) objectMapper.readValue(jsonNode.toString(), LgoAckUpdate.class);
        }).map((v0) -> {
            return v0.getData();
        }).flatMap((v0) -> {
            return Observable.fromIterable(v0);
        }).share();
    }

    public Observable<LgoOrderEvent> getRawBatchOrderEvents(CurrencyPair currencyPair) {
        return getOrderUpdates(currencyPair).map((v0) -> {
            return v0.getEvents();
        }).flatMap((v0) -> {
            return Observable.fromIterable(v0);
        });
    }

    public String placeMarketOrder(MarketOrder marketOrder) throws IOException {
        return placeOrder((Long) this.nonceFactory.createValue(), LgoAdapters.adaptEncryptedMarketOrder(marketOrder));
    }

    public String placeLimitOrder(LimitOrder limitOrder) throws IOException {
        return placeOrder((Long) this.nonceFactory.createValue(), LgoAdapters.adaptLimitOrder(limitOrder));
    }

    public boolean cancelOrder(String str) throws IOException {
        Long l = (Long) this.nonceFactory.createValue();
        placeOrder(l, new LgoPlaceCancelOrder(l.longValue(), str, new Date().toInstant()));
        return true;
    }

    private String placeOrder(Long l, LgoPlaceOrder lgoPlaceOrder) throws JsonProcessingException {
        LgoKey selectKey = this.keyService.selectKey();
        String encryptOrder = CryptoUtils.encryptOrder(selectKey, lgoPlaceOrder);
        this.streamingService.sendMessage(StreamingObjectMapperHelper.getObjectMapper().writeValueAsString(new LgoSocketPlaceOrder(new LgoEncryptedOrder(selectKey.getId(), encryptOrder, this.signatureService.signOrder(encryptOrder), l.longValue()))));
        return l.toString();
    }
}
