package io.reactivex.netty.examples.http.sse;

import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import java.util.concurrent.TimeUnit;
import rx.Notification;
import rx.Observable;
import rx.functions.Func1;

/* loaded from: input_file:io/reactivex/netty/examples/http/sse/HttpSseServer.class */
public final class HttpSseServer {
    static final int DEFAULT_PORT = 8096;
    static final int DEFAULT_INTERVAL = 1000;
    private final int port;
    private final int interval;

    public HttpSseServer(int i, int i2) {
        this.port = i;
        this.interval = i2;
    }

    public HttpServer<ByteBuf, ServerSentEvent> createServer() {
        HttpServer<ByteBuf, ServerSentEvent> createHttpServer = RxNetty.createHttpServer(this.port, new RequestHandler<ByteBuf, ServerSentEvent>() { // from class: io.reactivex.netty.examples.http.sse.HttpSseServer.1
            @Override // io.reactivex.netty.channel.Handler
            public Observable<Void> handle(HttpServerRequest<ByteBuf> httpServerRequest, HttpServerResponse<ServerSentEvent> httpServerResponse) {
                return HttpSseServer.this.getIntervalObservable(httpServerResponse);
            }
        }, PipelineConfigurators.serveSseConfigurator());
        System.out.println("HTTP Server Sent Events server started...");
        return createHttpServer;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Void> getIntervalObservable(final HttpServerResponse<ServerSentEvent> httpServerResponse) {
        return Observable.interval(this.interval, TimeUnit.MILLISECONDS).flatMap(new Func1<Long, Observable<Void>>() { // from class: io.reactivex.netty.examples.http.sse.HttpSseServer.4
            @Override // rx.functions.Func1
            public Observable<Void> call(Long l) {
                System.out.println("Writing SSE event for interval: " + l);
                return httpServerResponse.writeAndFlush(new ServerSentEvent(httpServerResponse.getAllocator().buffer().writeBytes(("hello " + l).getBytes())));
            }
        }).materialize().takeWhile(new Func1<Notification<Void>, Boolean>() { // from class: io.reactivex.netty.examples.http.sse.HttpSseServer.3
            @Override // rx.functions.Func1
            public Boolean call(Notification<Void> notification) {
                if (notification.isOnError()) {
                    System.out.println("Write to client failed, stopping response sending.");
                    notification.getThrowable().printStackTrace(System.err);
                }
                return Boolean.valueOf(!notification.isOnError());
            }
        }).map(new Func1<Notification<Void>, Void>() { // from class: io.reactivex.netty.examples.http.sse.HttpSseServer.2
            @Override // rx.functions.Func1
            public Void call(Notification<Void> notification) {
                return null;
            }
        });
    }

    public static void main(String[] strArr) {
        new HttpSseServer(8096, 1000).createServer().startAndWait();
    }
}
