/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.annotation.rxjava;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.stream.annotation.rxjava.RxJavaProcessor;
import org.springframework.context.SmartLifecycle;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

@Deprecated
public class SubjectMessageHandler
extends AbstractMessageProducingHandler
implements SmartLifecycle {
    private final Log logger = LogFactory.getLog(((Object)((Object)this)).getClass());
    private final RxJavaProcessor processor;
    private volatile Subject subject;
    private volatile Subscription subscription;
    private volatile boolean running;

    public SubjectMessageHandler(RxJavaProcessor processor) {
        Assert.notNull((Object)processor, (String)"RxJava processor must not be null.");
        this.processor = processor;
    }

    public synchronized void start() {
        if (!this.running) {
            this.subject = new SerializedSubject((Subject)PublishSubject.create());
            Observable outputStream = this.processor.process(this.subject);
            this.subscription = outputStream.subscribe((Action1)new Action1<Object>(){

                public void call(Object outputObject) {
                    if (ClassUtils.isAssignable(Message.class, outputObject.getClass())) {
                        SubjectMessageHandler.this.getOutputChannel().send((Message)outputObject);
                    } else {
                        SubjectMessageHandler.this.getOutputChannel().send(MessageBuilder.withPayload((Object)outputObject).build());
                    }
                }
            }, (Action1)new Action1<Throwable>(){

                public void call(Throwable throwable) {
                    SubjectMessageHandler.this.logger.error((Object)throwable.getMessage(), throwable);
                }
            }, new Action0(){

                public void call() {
                    SubjectMessageHandler.this.logger.info((Object)("Subscription close for [" + SubjectMessageHandler.this.subscription + "]"));
                }
            });
            this.running = true;
        }
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    public boolean isAutoStartup() {
        return false;
    }

    public void stop(Runnable callback) {
        if (this.running) {
            this.stop();
            if (callback != null) {
                callback.run();
            }
        }
    }

    public int getPhase() {
        return 0;
    }

    protected void handleMessageInternal(Message<?> message) throws Exception {
        this.subject.onNext(message.getPayload());
    }

    public synchronized void stop() {
        if (this.running) {
            this.subject.onCompleted();
            this.subscription.unsubscribe();
            this.subscription = null;
            this.subject = null;
            this.running = false;
        }
    }
}

