package com.github.davidmoten.rx.slf4j;

import com.github.davidmoten.rx.slf4j.Logging;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.observers.Subscribers;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

/* loaded from: input_file:com/github/davidmoten/rx/slf4j/OperatorLogging.class */
public class OperatorLogging<T> implements Observable.Operator<T, T> {
    private final Logging.Parameters<T> p;
    private Subscription sub;

    public OperatorLogging(Logging.Parameters<T> parameters) {
        this.p = parameters;
    }

    public Subscriber<? super T> call(Subscriber<? super T> subscriber) {
        subscriber.add(Subscriptions.create(new Action0() { // from class: com.github.davidmoten.rx.slf4j.OperatorLogging.1
            public void call() {
                if (OperatorLogging.this.p.getUnsubscribedMessage() != null) {
                    Logging.log(OperatorLogging.this.p.getLogger(), OperatorLogging.this.p.getUnsubscribedMessage(), OperatorLogging.this.p.getUnsubscribedLevel(), null);
                }
            }
        }));
        Subscriber<? super T> from = Subscribers.from(createObserver(this.p.getSubject(), subscriber));
        subscriber.add(from);
        this.sub = this.p.getObservable().subscribe(new Observer<Logging.Parameters.Message<T>>() { // from class: com.github.davidmoten.rx.slf4j.OperatorLogging.2
            public void onCompleted() {
                OperatorLogging.this.sub.unsubscribe();
            }

            public void onError(Throwable th) {
                th.printStackTrace();
                OperatorLogging.this.sub.unsubscribe();
            }

            public void onNext(Logging.Parameters.Message<T> message) {
            }
        });
        subscriber.add(this.sub);
        if (this.p.getSubscribedMessage() != null) {
            Logging.log(this.p.getLogger(), this.p.getSubscribedMessage(), this.p.getSubscribedLevel(), null);
        }
        return from;
    }

    static <T> Observer<T> createObserver(final PublishSubject<T> publishSubject, final Subscriber<? super T> subscriber) {
        return new Observer<T>() { // from class: com.github.davidmoten.rx.slf4j.OperatorLogging.3
            public void onCompleted() {
                publishSubject.onCompleted();
                subscriber.onCompleted();
            }

            public void onError(Throwable th) {
                publishSubject.onError(th);
                subscriber.onError(th);
            }

            public void onNext(T t) {
                publishSubject.onNext(t);
                subscriber.onNext(t);
            }
        };
    }
}
