/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.lgo;

import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.core.StreamingAccountService;
import info.bitrich.xchangestream.lgo.LgoAdapter;
import info.bitrich.xchangestream.lgo.LgoStreamingService;
import info.bitrich.xchangestream.lgo.domain.LgoGroupedBalanceUpdate;
import info.bitrich.xchangestream.lgo.dto.LgoBalanceUpdate;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.rxjava3.core.Observable;
import java.util.List;
import org.knowm.xchange.currency.Currency;
import org.knowm.xchange.dto.account.Balance;
import org.knowm.xchange.dto.account.Wallet;

public class LgoStreamingAccountService
implements StreamingAccountService {
    private static final String CHANNEL_NAME = "balance";
    private final LgoStreamingService service;
    private volatile Observable<LgoGroupedBalanceUpdate> subscription = null;

    public LgoStreamingAccountService(LgoStreamingService lgoStreamingService) {
        this.service = lgoStreamingService;
    }

    public Observable<Balance> getBalanceChanges(Currency currency, Object ... args) {
        this.ensureSubscription();
        return this.subscription.map(u -> u.getWallet().get(currency));
    }

    public Observable<Wallet> getWallet() {
        this.ensureSubscription();
        return this.subscription.map(u -> Wallet.Builder.from(u.getWallet().values()).build());
    }

    private void ensureSubscription() {
        if (this.subscription == null) {
            this.createSubscription();
        }
    }

    private synchronized void createSubscription() {
        if (this.subscription != null) {
            return;
        }
        ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
        this.subscription = this.service.subscribeChannel(CHANNEL_NAME, new Object[0]).map(s -> (LgoBalanceUpdate)mapper.treeToValue((TreeNode)s, LgoBalanceUpdate.class)).scan((Object)new LgoGroupedBalanceUpdate(), (acc, s) -> {
            List<Balance> updatedBalances = LgoAdapter.adaptBalances(s.getData());
            if (s.getType().equals("snapshot")) {
                return acc.applySnapshot(s.getSeq(), updatedBalances);
            }
            return acc.applyUpdate(s.getSeq(), updatedBalances);
        }).skip(1L).share();
    }
}

