package org.openqa.selenium.events.zeromq;

import com.google.common.collect.EvictingQueue;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.openqa.selenium.events.Event;
import org.openqa.selenium.events.EventBus;
import org.openqa.selenium.events.EventListener;
import org.openqa.selenium.events.EventName;
import org.openqa.selenium.events.zeromq.ZeroMqEventBus;
import org.openqa.selenium.grid.security.Secret;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.json.Json;
import org.openqa.selenium.json.JsonOutput;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/openqa/selenium/events/zeromq/UnboundZmqEventBus.class */
public class UnboundZmqEventBus implements EventBus {
    static final EventName REJECTED_EVENT = new EventName("selenium-rejected-event");
    private static final Logger LOG = Logger.getLogger(EventBus.class.getName());
    private static final Json JSON = new Json();
    private final ExecutorService socketPollingExecutor;
    private final ExecutorService socketPublishingExecutor;
    private final ExecutorService listenerNotificationExecutor;
    private final String encodedSecret;
    private ZMQ.Poller poller;
    private ZMQ.Socket pub;
    private ZMQ.Socket sub;
    private final AtomicBoolean pollingStarted = new AtomicBoolean(false);
    private final Map<EventName, List<Consumer<Event>>> listeners = new ConcurrentHashMap();
    private final Queue<UUID> recentMessages = EvictingQueue.create(128);

    /* loaded from: input_file:org/openqa/selenium/events/zeromq/UnboundZmqEventBus$PollingRunnable.class */
    private class PollingRunnable implements Runnable {
        private Secret secret;

        public PollingRunnable(Secret secret) {
            this.secret = secret;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    int poll = UnboundZmqEventBus.this.poller.poll(150L);
                    UnboundZmqEventBus.this.pollingStarted.lazySet(true);
                    for (int i = 0; i < poll; i++) {
                        if (UnboundZmqEventBus.this.poller.pollin(i)) {
                            ZMQ.Socket socket = UnboundZmqEventBus.this.poller.getSocket(i);
                            EventName eventName = new EventName(new String(socket.recv(), StandardCharsets.UTF_8));
                            Secret secret = (Secret) UnboundZmqEventBus.JSON.toType(new String(socket.recv(), StandardCharsets.UTF_8), Secret.class);
                            UUID fromString = UUID.fromString(new String(socket.recv(), StandardCharsets.UTF_8));
                            String str = new String(socket.recv(), StandardCharsets.UTF_8);
                            if (UnboundZmqEventBus.this.recentMessages.contains(fromString)) {
                                return;
                            }
                            Event event = new Event(fromString, eventName, UnboundZmqEventBus.JSON.toType(str, Object.class));
                            UnboundZmqEventBus.this.recentMessages.add(fromString);
                            if (!Secret.matches(this.secret, secret)) {
                                UnboundZmqEventBus.LOG.log(Level.SEVERE, "Received message without a valid secret. Rejecting. {0} -> {1}", new Object[]{event, str});
                                notifyListeners(UnboundZmqEventBus.REJECTED_EVENT, new Event(UnboundZmqEventBus.REJECTED_EVENT, new ZeroMqEventBus.RejectedEvent(eventName, str)));
                                return;
                            }
                            notifyListeners(eventName, event);
                        }
                    }
                } catch (Exception e) {
                    if (!(e.getCause() instanceof AssertionError)) {
                        UnboundZmqEventBus.LOG.log(Level.WARNING, e, () -> {
                            return "Caught exception while polling for event bus messages: " + e.getMessage();
                        });
                        throw e;
                    }
                }
            }
        }

        private void notifyListeners(EventName eventName, Event event) {
            ((List) UnboundZmqEventBus.this.listeners.getOrDefault(eventName, new ArrayList())).forEach(consumer -> {
                UnboundZmqEventBus.this.listenerNotificationExecutor.submit(() -> {
                    try {
                        consumer.accept(event);
                    } catch (Exception e) {
                        UnboundZmqEventBus.LOG.log(Level.WARNING, e, () -> {
                            return "Caught exception from listener: " + consumer;
                        });
                    }
                });
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public UnboundZmqEventBus(ZContext zContext, String str, String str2, Secret secret) {
        Require.nonNull("Secret", secret);
        StringBuilder sb = new StringBuilder();
        JsonOutput newOutput = JSON.newOutput(sb);
        try {
            newOutput.setPrettyPrint(false).writeClassName(false).write(secret);
            if (newOutput != null) {
                newOutput.close();
            }
            this.encodedSecret = sb.toString();
            this.socketPollingExecutor = Executors.newSingleThreadExecutor(runnable -> {
                Thread thread = new Thread(runnable);
                thread.setName("Event Bus Poller");
                thread.setDaemon(true);
                return thread;
            });
            this.socketPublishingExecutor = Executors.newSingleThreadExecutor(runnable2 -> {
                Thread thread = new Thread(runnable2);
                thread.setName("Event Bus Publisher");
                thread.setDaemon(true);
                return thread;
            });
            this.listenerNotificationExecutor = Executors.newFixedThreadPool(Math.max(Runtime.getRuntime().availableProcessors() / 2, 2), runnable3 -> {
                Thread thread = new Thread(runnable3);
                thread.setName("Event Bus Listener Notifier");
                thread.setDaemon(true);
                return thread;
            });
            String format = String.format("Connecting to %s and %s", str, str2);
            LOG.info(format);
            Failsafe.with(new RetryPolicy[]{new RetryPolicy().withMaxAttempts(5).withDelay(5L, 10L, ChronoUnit.SECONDS).onFailedAttempt(executionAttemptedEvent -> {
                LOG.log(Level.WARNING, String.format("%s failed", format));
            }).onRetry(executionAttemptedEvent2 -> {
                LOG.log(Level.WARNING, String.format("Failure #%s. Retrying.", Integer.valueOf(executionAttemptedEvent2.getAttemptCount())));
            }).onRetriesExceeded(executionCompletedEvent -> {
                LOG.log(Level.WARNING, "Connection aborted.");
            })}).run(() -> {
                this.sub = zContext.createSocket(SocketType.SUB);
                this.sub.setIPv6(isSubAddressIPv6(str));
                this.sub.connect(str);
                this.sub.subscribe(new byte[0]);
                this.pub = zContext.createSocket(SocketType.PUB);
                this.pub.setIPv6(isSubAddressIPv6(str2));
                this.pub.connect(str2);
            });
            this.poller = zContext.createPoller(1);
            this.poller.register((ZMQ.Socket) Objects.requireNonNull(this.sub), 1);
            LOG.info("Sockets created");
            this.socketPollingExecutor.submit(new PollingRunnable(secret));
            while (!this.pollingStarted.get()) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
            LOG.info("Event bus ready");
        } catch (Throwable th) {
            if (newOutput != null) {
                try {
                    newOutput.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.openqa.selenium.status.HasReadyState
    public boolean isReady() {
        return !this.socketPollingExecutor.isShutdown();
    }

    private boolean isSubAddressIPv6(String str) {
        try {
            URI uri = new URI(str);
            if ("inproc".equals(uri.getScheme())) {
                return false;
            }
            return InetAddress.getByName(uri.getHost()) instanceof Inet6Address;
        } catch (URISyntaxException | UnknownHostException e) {
            LOG.log(Level.WARNING, String.format("Could not determine if the address %s is IPv6 or IPv4", str), e);
            return false;
        }
    }

    @Override // org.openqa.selenium.events.EventBus
    public void addListener(EventListener<?> eventListener) {
        Require.nonNull("Listener", eventListener);
        this.listeners.computeIfAbsent(eventListener.getEventName(), eventName -> {
            return new LinkedList();
        }).add(eventListener);
    }

    @Override // org.openqa.selenium.events.EventBus
    public void fire(Event event) {
        Require.nonNull("Event to send", event);
        this.socketPublishingExecutor.execute(() -> {
            this.pub.sendMore(event.getType().getName().getBytes(StandardCharsets.UTF_8));
            this.pub.sendMore(this.encodedSecret.getBytes(StandardCharsets.UTF_8));
            this.pub.sendMore(event.getId().toString().getBytes(StandardCharsets.UTF_8));
            this.pub.send(event.getRawData().getBytes(StandardCharsets.UTF_8));
        });
    }

    @Override // org.openqa.selenium.events.EventBus, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.socketPollingExecutor.shutdownNow();
        this.socketPublishingExecutor.shutdownNow();
        this.listenerNotificationExecutor.shutdownNow();
        this.poller.close();
        if (this.sub != null) {
            this.sub.close();
        }
        if (this.pub != null) {
            this.pub.close();
        }
    }
}
