package reactor.io.net.impl.zmq.tcp;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import reactor.Environment;
import reactor.bus.registry.Registration;
import reactor.bus.registry.Registries;
import reactor.bus.registry.Registry;
import reactor.bus.selector.Selectors;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.codec.StandardCodecs;
import reactor.io.net.ChannelStream;
import reactor.io.net.NetStreams;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.ReactorPeer;
import reactor.io.net.Spec;
import reactor.io.net.impl.zmq.ZeroMQClientSocketOptions;
import reactor.io.net.impl.zmq.ZeroMQServerSocketOptions;
import reactor.io.net.tcp.TcpClient;
import reactor.io.net.tcp.TcpServer;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Streams;

/* loaded from: input_file:reactor/io/net/impl/zmq/tcp/ZeroMQ.class */
public class ZeroMQ<T> {
    private static final Registry<Integer, String> SOCKET_TYPES = Registries.create();
    private final Environment env;
    private final Dispatcher dispatcher;
    private final ZContext zmqCtx;
    private final List<ReactorPeer<T, T, ChannelStream<T, T>>> peers;
    private volatile Codec<Buffer, T, T> codec;
    private volatile boolean shutdown;

    public ZeroMQ(Environment environment) {
        this(environment, environment.getDefaultDispatcher());
    }

    public ZeroMQ(Environment environment, String str) {
        this(environment, environment.getDispatcher(str));
    }

    public ZeroMQ(Environment environment, Dispatcher dispatcher) {
        this.peers = new ArrayList();
        this.codec = StandardCodecs.PASS_THROUGH_CODEC;
        this.shutdown = false;
        this.env = environment;
        this.dispatcher = dispatcher;
        this.zmqCtx = new ZContext();
        this.zmqCtx.setLinger(100);
    }

    public static String findSocketTypeName(int i) {
        List select = SOCKET_TYPES.select(Integer.valueOf(i));
        return select.isEmpty() ? "" : (String) ((Registration) select.get(0)).getObject();
    }

    public ZeroMQ<T> codec(Codec<Buffer, T, T> codec) {
        this.codec = codec;
        return this;
    }

    public Promise<ChannelStream<T, T>> dealer(String str) {
        return createClient(str, 5);
    }

    public Promise<ChannelStream<T, T>> push(String str) {
        return createClient(str, 8);
    }

    public Promise<ChannelStream<T, T>> pull(String str) {
        return createServer(str, 7);
    }

    public Promise<ChannelStream<T, T>> request(String str) {
        return createClient(str, 3);
    }

    public Promise<ChannelStream<T, T>> reply(String str) {
        return createServer(str, 4);
    }

    public Promise<ChannelStream<T, T>> router(String str) {
        return createServer(str, 6);
    }

    public Promise<ChannelStream<T, T>> createClient(final String str, final int i) {
        Assert.isTrue(!this.shutdown, "This ZeroMQ instance has been shut down");
        TcpClient tcpClient = NetStreams.tcpClient((Class<? extends TcpClient>) ZeroMQTcpClient.class, new NetStreams.TcpClientFactory<T, T>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQ.1
            public Spec.TcpClientSpec<T, T> apply(Spec.TcpClientSpec<T, T> tcpClientSpec) {
                return ((Spec.TcpClientSpec) ((Spec.TcpClientSpec) tcpClientSpec.env(ZeroMQ.this.env)).dispatcher(ZeroMQ.this.dispatcher)).codec(ZeroMQ.this.codec).options(new ZeroMQClientSocketOptions().context(ZeroMQ.this.zmqCtx).connectAddresses(str).socketType(i));
            }
        });
        final Promise<ChannelStream<T, T>> ready = Promises.ready(this.env, this.dispatcher);
        tcpClient.start(new ReactorChannelHandler<T, T, ChannelStream<T, T>>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQ.2
            public Publisher<Void> apply(ChannelStream<T, T> channelStream) {
                ready.onNext(channelStream);
                return Streams.never();
            }
        });
        synchronized (this.peers) {
            this.peers.add(tcpClient);
        }
        return ready;
    }

    public Promise<ChannelStream<T, T>> createServer(final String str, final int i) {
        Assert.isTrue(!this.shutdown, "This ZeroMQ instance has been shut down");
        TcpServer tcpServer = NetStreams.tcpServer((Class<? extends TcpServer>) ZeroMQTcpServer.class, new NetStreams.TcpServerFactory<T, T>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQ.3
            public Spec.TcpServerSpec<T, T> apply(Spec.TcpServerSpec<T, T> tcpServerSpec) {
                return (Spec.TcpServerSpec) ((Spec.TcpServerSpec) ((Spec.TcpServerSpec) tcpServerSpec.env(ZeroMQ.this.env)).dispatcher(ZeroMQ.this.dispatcher)).codec(ZeroMQ.this.codec).options(new ZeroMQServerSocketOptions().context(ZeroMQ.this.zmqCtx).listenAddresses(str).socketType(i));
            }
        });
        final Promise<ChannelStream<T, T>> ready = Promises.ready(this.env, this.dispatcher);
        tcpServer.start(new ReactorChannelHandler<T, T, ChannelStream<T, T>>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQ.4
            public Publisher<Void> apply(ChannelStream<T, T> channelStream) {
                ready.onNext(channelStream);
                return Streams.never();
            }
        });
        synchronized (this.peers) {
            this.peers.add(tcpServer);
        }
        return ready;
    }

    public void shutdown() {
        ArrayList arrayList;
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        synchronized (this.peers) {
            arrayList = new ArrayList(this.peers);
        }
        Streams.from(arrayList).flatMap(new Function<ReactorPeer, Publisher<Void>>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQ.5
            public Publisher<Void> apply(final ReactorPeer reactorPeer) {
                return reactorPeer.shutdown().onSuccess(new Consumer() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQ.5.1
                    public void accept(Object obj) {
                        ZeroMQ.this.peers.remove(reactorPeer);
                    }
                });
            }
        }).consume();
    }

    static {
        for (Field field : ZMQ.class.getDeclaredFields()) {
            if (Integer.TYPE.isAssignableFrom(field.getType())) {
                field.setAccessible(true);
                try {
                    SOCKET_TYPES.register(Selectors.$(Integer.valueOf(field.getInt(null))), field.getName());
                } catch (IllegalAccessException e) {
                }
            }
        }
    }
}
