package io.reactivex.netty.examples.tcp.interval;

import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.server.RxServer;
import java.util.concurrent.TimeUnit;
import rx.Notification;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func1;

/* loaded from: input_file:io/reactivex/netty/examples/tcp/interval/TcpIntervalServer.class */
public final class TcpIntervalServer {
    static final int DEFAULT_PORT = 8101;
    private final int port;
    private final int interval;

    public TcpIntervalServer(int i, int i2) {
        this.port = i;
        this.interval = i2;
    }

    public RxServer<String, String> createServer() {
        RxServer<String, String> createTcpServer = RxNetty.createTcpServer(this.port, PipelineConfigurators.textOnlyConfigurator(), new ConnectionHandler<String, String>() { // from class: io.reactivex.netty.examples.tcp.interval.TcpIntervalServer.1
            @Override // io.reactivex.netty.channel.ConnectionHandler
            public Observable<Void> handle(final ObservableConnection<String, String> observableConnection) {
                System.out.println("--- Connection Started ---");
                final Observable<R> map = observableConnection.getInput().map(new Func1<String, String>() { // from class: io.reactivex.netty.examples.tcp.interval.TcpIntervalServer.1.1
                    @Override // rx.functions.Func1
                    public String call(String str) {
                        return str.trim();
                    }
                });
                return map.flatMap(new Func1<String, Observable<Void>>() { // from class: io.reactivex.netty.examples.tcp.interval.TcpIntervalServer.1.3
                    @Override // rx.functions.Func1
                    public Observable<Void> call(String str) {
                        if (str.startsWith("subscribe:")) {
                            System.out.println("-------------------------------------");
                            System.out.println("Received 'subscribe' from client so starting interval ...");
                            return TcpIntervalServer.this.getIntervalObservable(observableConnection).takeUntil(map.filter(new Func1<String, Boolean>() { // from class: io.reactivex.netty.examples.tcp.interval.TcpIntervalServer.1.3.1
                                @Override // rx.functions.Func1
                                public Boolean call(String str2) {
                                    return Boolean.valueOf("unsubscribe:".equals(str2));
                                }
                            }));
                        }
                        if (str.startsWith("unsubscribe:")) {
                            System.out.println("Received 'unsubscribe' from client so stopping interval (or ignoring if nothing subscribed) ...");
                            return Observable.empty();
                        }
                        if (!str.isEmpty() && !"unsubscribe:".equals(str)) {
                            observableConnection.writeAndFlush("\nERROR => Unknown command: " + str + "\nCommands => subscribe:, unsubscribe:\n");
                        }
                        return Observable.empty();
                    }
                }).finallyDo(new Action0() { // from class: io.reactivex.netty.examples.tcp.interval.TcpIntervalServer.1.2
                    @Override // rx.functions.Action0
                    public void call() {
                        System.out.println("--- Connection Closed ---");
                    }
                });
            }
        });
        System.out.println("TCP interval server started...");
        return createTcpServer;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Void> getIntervalObservable(final ObservableConnection<String, String> observableConnection) {
        return Observable.interval(this.interval, TimeUnit.MILLISECONDS).flatMap(new Func1<Long, Observable<Notification<Void>>>() { // from class: io.reactivex.netty.examples.tcp.interval.TcpIntervalServer.4
            @Override // rx.functions.Func1
            public Observable<Notification<Void>> call(Long l) {
                System.out.println("Writing interval: " + l);
                return observableConnection.writeAndFlush("interval => " + l + '\n').materialize();
            }
        }).takeWhile(new Func1<Notification<Void>, Boolean>() { // from class: io.reactivex.netty.examples.tcp.interval.TcpIntervalServer.3
            @Override // rx.functions.Func1
            public Boolean call(Notification<Void> notification) {
                return Boolean.valueOf(!notification.isOnError());
            }
        }).map(new Func1<Notification<Void>, Void>() { // from class: io.reactivex.netty.examples.tcp.interval.TcpIntervalServer.2
            @Override // rx.functions.Func1
            public Void call(Notification<Void> notification) {
                return null;
            }
        });
    }

    public static void main(String[] strArr) {
        int i = 1000;
        if (strArr.length > 0) {
            i = Integer.valueOf(strArr[0]).intValue();
        }
        new TcpIntervalServer(DEFAULT_PORT, i).createServer().startAndWait();
    }
}
