/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.gemini;

import com.fasterxml.jackson.databind.JsonNode;
import info.bitrich.xchangestream.gemini.GeminiProductStreamingService;
import info.bitrich.xchangestream.service.netty.ConnectionStateModel;
import io.reactivex.Observable;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.Subject;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.knowm.xchange.currency.CurrencyPair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GeminiStreamingService {
    private static final Logger LOG = LoggerFactory.getLogger(GeminiStreamingService.class);
    private final String baseUri;
    private final Map<CurrencyPair, GeminiProductStreamingService> productStreamingServices = new ConcurrentHashMap<CurrencyPair, GeminiProductStreamingService>();
    private final Map<CurrencyPair, Observable<JsonNode>> productSubscriptions = new ConcurrentHashMap<CurrencyPair, Observable<JsonNode>>();
    private final Subject<ConnectionStateModel.State> stateSubject = BehaviorSubject.create();

    public GeminiStreamingService(String baseUri) {
        this.baseUri = baseUri;
    }

    public Observable<JsonNode> subscribeChannel(CurrencyPair currencyPair, Object ... args) {
        if (!this.productStreamingServices.containsKey(currencyPair)) {
            String symbolUri = this.baseUri + currencyPair.base.toString() + currencyPair.counter.toString();
            GeminiProductStreamingService productStreamingService = new GeminiProductStreamingService(symbolUri, currencyPair);
            productStreamingService.connect().blockingAwait();
            Observable productSubscription = productStreamingService.subscribeChannel(currencyPair.toString(), args);
            this.productStreamingServices.put(currencyPair, productStreamingService);
            this.productSubscriptions.put(currencyPair, (Observable<JsonNode>)productSubscription);
            productStreamingService.subscribeConnectionState().subscribe(this.stateSubject);
        }
        return this.productSubscriptions.get(currencyPair);
    }

    public boolean isAlive() {
        return this.productStreamingServices.values().stream().allMatch(ps -> ps.isSocketOpen());
    }

    public Observable<ConnectionStateModel.State> connectionStateObservable() {
        return this.stateSubject.share();
    }
}

