package de.unistuttgart.isw.sfsc.commonjava.zmq.reactor;

import de.unistuttgart.isw.sfsc.commonjava.protocol.pubsub.DataProtocol;
import de.unistuttgart.isw.sfsc.commonjava.protocol.pubsub.SubProtocol;
import de.unistuttgart.isw.sfsc.commonjava.util.Handle;
import de.unistuttgart.isw.sfsc.commonjava.util.Listeners;
import de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable;
import de.unistuttgart.isw.sfsc.commonjava.util.OneShotRunnable;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZLoop;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:de/unistuttgart/isw/sfsc/commonjava/zmq/reactor/ZmqExecutor.class */
public class ZmqExecutor implements NotThrowingAutoCloseable {
    private static final byte[] NOTIFICATION = new byte[0];
    private final ZContext zContext;
    private final Queue<Runnable> commandQueue = new ConcurrentLinkedQueue();
    private final BlockingQueue<Object> notificationQueue = new LinkedTransferQueue();
    private final Listeners<Runnable> shutdownListeners = new Listeners<>();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final String address = "inproc://" + UUID.randomUUID();
    private final CommandExecutor commandExecutor = new CommandExecutor();
    private final NotificationInjector notificationInjector = new NotificationInjector();

    /* loaded from: input_file:de/unistuttgart/isw/sfsc/commonjava/zmq/reactor/ZmqExecutor$CommandExecutor.class */
    class CommandExecutor implements NotThrowingAutoCloseable {
        private final ExecutorService executorService = Executors.newSingleThreadExecutor();
        private final CountDownLatch bound = new CountDownLatch(1);
        private final ZContext commandExecutorZContext;
        private final ZLoop zLoop;

        CommandExecutor() {
            this.commandExecutorZContext = ZContext.shadow(ZmqExecutor.this.zContext);
            this.zLoop = new ZLoop(this.commandExecutorZContext);
        }

        void start() {
            this.executorService.execute(() -> {
                ZMQ.Socket createSocket = this.commandExecutorZContext.createSocket(SocketType.PAIR);
                createSocket.bind(ZmqExecutor.this.address);
                this.bound.countDown();
                this.zLoop.addPoller(new ZMQ.PollItem(createSocket, 1), (zLoop, pollItem, obj) -> {
                    try {
                        createSocket.recv();
                        ZmqExecutor.this.commandQueue.remove().run();
                        return 0;
                    } catch (ZMQException e) {
                        ZmqExecutor.this.close();
                        Thread.currentThread().interrupt();
                        return 0;
                    }
                }, (Object) null);
                this.zLoop.start();
                this.commandExecutorZContext.close();
            });
        }

        Future<ReactiveSocket> createReactiveSocket(SocketType socketType, int i) {
            FutureTask futureTask = new FutureTask(() -> {
                QueuingHandler queuingHandler = new QueuingHandler(i);
                ZMQ.Socket createSocket = this.commandExecutorZContext.createSocket(socketType);
                ZMQ.PollItem pollItem = new ZMQ.PollItem(createSocket, 1);
                this.zLoop.addPoller(pollItem, queuingHandler, (Object) null);
                ZmqExecutor zmqExecutor = ZmqExecutor.this;
                return new ReactiveSocketImpl(zmqExecutor::execute, createSocket, queuingHandler.getInbox(), () -> {
                    closeSocket(pollItem);
                });
            });
            ZmqExecutor.this.execute(futureTask);
            return futureTask;
        }

        private void closeSocket(ZMQ.PollItem pollItem) {
            this.zLoop.removePoller(pollItem);
            this.commandExecutorZContext.destroySocket(pollItem.getSocket());
        }

        void awaitBinding() throws InterruptedException {
            this.bound.await();
        }

        @Override // de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable, java.lang.AutoCloseable
        public void close() {
            this.executorService.shutdownNow();
        }
    }

    /* loaded from: input_file:de/unistuttgart/isw/sfsc/commonjava/zmq/reactor/ZmqExecutor$NotificationInjector.class */
    class NotificationInjector implements NotThrowingAutoCloseable {
        private final ExecutorService executorService = Executors.newSingleThreadExecutor();

        NotificationInjector() {
        }

        void start() {
            this.executorService.execute(() -> {
                ZContext shadow = ZContext.shadow(ZmqExecutor.this.zContext);
                ZMQ.Socket createSocket = shadow.createSocket(SocketType.PAIR);
                createSocket.connect(ZmqExecutor.this.address);
                while (!Thread.interrupted()) {
                    try {
                        ZmqExecutor.this.notificationQueue.take();
                        createSocket.send(ZmqExecutor.NOTIFICATION);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (ZMQException e2) {
                        ZmqExecutor.this.close();
                        Thread.currentThread().interrupt();
                    }
                }
                shadow.close();
            });
        }

        @Override // de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable, java.lang.AutoCloseable
        public void close() {
            this.executorService.shutdownNow();
        }
    }

    ZmqExecutor(ZContext zContext) {
        this.zContext = zContext;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ZmqExecutor create(ZContext zContext) throws InterruptedException {
        ZmqExecutor zmqExecutor = new ZmqExecutor(zContext);
        zmqExecutor.commandExecutor.start();
        zmqExecutor.commandExecutor.awaitBinding();
        zmqExecutor.notificationInjector.start();
        return zmqExecutor;
    }

    public Future<ReactiveSocket> createPublisher() {
        return this.commandExecutor.createReactiveSocket(SocketType.XPUB, SubProtocol.frameCount());
    }

    public Future<ReactiveSocket> createSubscriber() {
        return this.commandExecutor.createReactiveSocket(SocketType.XSUB, DataProtocol.frameCount());
    }

    public Handle addShutdownListener(Runnable runnable) {
        OneShotRunnable oneShotRunnable = new OneShotRunnable(runnable);
        Handle add = this.shutdownListeners.add(oneShotRunnable);
        if (this.closed.get()) {
            oneShotRunnable.run();
            add.close();
        }
        return add;
    }

    void execute(Runnable runnable) {
        this.commandQueue.add(runnable);
        this.notificationQueue.add(NOTIFICATION);
    }

    @Override // de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            new Thread(() -> {
                this.notificationInjector.close();
                this.commandExecutor.close();
                this.zContext.close();
                this.shutdownListeners.forEach((v0) -> {
                    v0.run();
                });
            }).start();
        }
    }
}
