package info.bitrich.xchangestream.service.pusher;

import com.pusher.client.Pusher;
import com.pusher.client.PusherOptions;
import com.pusher.client.channel.Channel;
import com.pusher.client.channel.PrivateChannel;
import com.pusher.client.channel.PrivateChannelEventListener;
import com.pusher.client.channel.PusherEvent;
import com.pusher.client.connection.ConnectionEventListener;
import com.pusher.client.connection.ConnectionState;
import com.pusher.client.connection.ConnectionStateChange;
import info.bitrich.xchangestream.service.ConnectableService;
import info.bitrich.xchangestream.service.exception.NotConnectedException;
import io.reactivex.Completable;
import io.reactivex.Observable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/service/pusher/PusherStreamingService.class */
public class PusherStreamingService extends ConnectableService {
    private static final Logger LOG = LoggerFactory.getLogger(PusherStreamingService.class);
    private final Pusher pusher;

    public PusherStreamingService(String str) {
        this.pusher = new Pusher(str);
    }

    public PusherStreamingService(String str, String str2) {
        PusherOptions pusherOptions = new PusherOptions();
        pusherOptions.setCluster(str2);
        this.pusher = new Pusher(str, pusherOptions);
    }

    public PusherStreamingService(String str, PusherOptions pusherOptions) {
        this.pusher = new Pusher(str, pusherOptions);
    }

    protected PusherStreamingService(Pusher pusher) {
        this.pusher = pusher;
    }

    protected Completable openConnection() {
        return Completable.create(completableEmitter -> {
            this.pusher.connect(new ConnectionEventListener() { // from class: info.bitrich.xchangestream.service.pusher.PusherStreamingService.1
                public void onConnectionStateChange(ConnectionStateChange connectionStateChange) {
                    PusherStreamingService.LOG.info("State changed to " + connectionStateChange.getCurrentState() + " from " + connectionStateChange.getPreviousState());
                    if (ConnectionState.CONNECTED.equals(connectionStateChange.getCurrentState())) {
                        completableEmitter.onComplete();
                    }
                }

                public void onError(String str, String str2, Exception exc) {
                    if (exc != null) {
                        completableEmitter.onError(exc);
                    } else {
                        completableEmitter.onError(new RuntimeException("No exception found: [code: " + str2 + "], message: " + str));
                    }
                }
            }, new ConnectionState[]{ConnectionState.ALL});
        });
    }

    public Completable disconnect() {
        return Completable.create(completableEmitter -> {
            this.pusher.disconnect();
            completableEmitter.onComplete();
        });
    }

    public Observable<String> subscribePrivateChannel(String str, String str2) {
        return subscribePrivateChannel(str, Collections.singletonList(str2));
    }

    public Observable<String> subscribeChannel(String str, String str2) {
        return subscribeChannel(str, Collections.singletonList(str2));
    }

    public Observable<String> subscribeChannel(String str, List<String> list) {
        LOG.info("Subscribing to channel {}.", str);
        return Observable.create(observableEmitter -> {
            if (!ConnectionState.CONNECTED.equals(this.pusher.getConnection().getState())) {
                observableEmitter.onError(new NotConnectedException());
                return;
            }
            Channel subscribe = this.pusher.subscribe(str);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                subscribe.bind((String) it.next(), pusherEvent -> {
                    LOG.debug("Incoming data: {}", pusherEvent.getData());
                    observableEmitter.onNext(pusherEvent.getData());
                });
            }
        }).doOnDispose(() -> {
            this.pusher.unsubscribe(str);
        });
    }

    public Observable<String> subscribePrivateChannel(String str, List<String> list) {
        LOG.info("Subscribing to channel {}.", str);
        return Observable.create(observableEmitter -> {
            if (!ConnectionState.CONNECTED.equals(this.pusher.getConnection().getState())) {
                observableEmitter.onError(new NotConnectedException());
                return;
            }
            PrivateChannelEventListener privateChannelEventListener = new PrivateChannelEventListener() { // from class: info.bitrich.xchangestream.service.pusher.PusherStreamingService.2
                public void onAuthenticationFailure(String str2, Exception exc) {
                    PusherStreamingService.LOG.error(exc.getMessage(), exc);
                    observableEmitter.onError(exc);
                }

                public void onSubscriptionSucceeded(String str2) {
                    PusherStreamingService.LOG.info("Subscription successful! :{} ", str2);
                }

                public void onEvent(PusherEvent pusherEvent) {
                    PusherStreamingService.LOG.debug("Incoming data: {}", pusherEvent.getData());
                    observableEmitter.onNext(pusherEvent.getData());
                }
            };
            PrivateChannel subscribePrivate = this.pusher.subscribePrivate(str, privateChannelEventListener, new String[0]);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                subscribePrivate.bind((String) it.next(), privateChannelEventListener);
            }
        }).doOnDispose(() -> {
            LOG.info("Disposing " + str);
            this.pusher.unsubscribe(str);
        });
    }

    public boolean isSocketOpen() {
        return this.pusher.getConnection().getState() == ConnectionState.CONNECTED;
    }

    public void useCompressedMessages(boolean z) {
        throw new UnsupportedOperationException();
    }
}
