package org.rx.crawler;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import org.redisson.api.RPriorityBlockingQueue;
import org.redisson.api.RSetCache;
import org.redisson.api.RTopic;
import org.rx.core.ResetEventWait;
import org.rx.core.Sys;
import org.rx.exception.TraceHandler;
import org.rx.redis.RedisCache;
import org.rx.spring.BeanRegister;
import org.rx.util.function.TripleAction;
import org.rx.util.function.TripleFunc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

@ConditionalOnProperty(name = {BeanRegister.REDIS_PROP_NAME})
@Component
/* loaded from: input_file:org/rx/crawler/BrowserAsyncTopic.class */
public class BrowserAsyncTopic {
    private static final Logger log = LoggerFactory.getLogger(BrowserAsyncTopic.class);
    public static final String QUEUE_NAME = "BAsyncQueue";
    public static final String TOPIC_NAME = "BAsyncTopic";
    public static final String IN_PUBLISH_NAME = "BAsyncPublish";
    private final RedisCache<?, ?> redisCache;
    private RPriorityBlockingQueue<BrowserAsyncRequest> queue;
    private RTopic topic;
    private RSetCache<Integer> publishSet;
    private final ConcurrentHashMap<UUID, AsyncFuture> callbacks = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/rx/crawler/BrowserAsyncTopic$AsyncFuture.class */
    private class AsyncFuture<T> implements Future<T> {
        private final UUID asyncId;
        private final Object callback;
        private final ResetEventWait waiter = new ResetEventWait();
        private volatile boolean done;
        private volatile Throwable exception;
        private T result;

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return BrowserAsyncTopic.this.callbacks.remove(this.asyncId) != null;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return !BrowserAsyncTopic.this.callbacks.containsKey(this.asyncId);
        }

        @Override // java.util.concurrent.Future
        public T get() throws ExecutionException {
            try {
                return get(-1L, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                BrowserAsyncTopic.log.warn("ignore", e);
                return null;
            }
        }

        @Override // java.util.concurrent.Future
        public T get(long j, TimeUnit timeUnit) throws ExecutionException, TimeoutException {
            if (!this.waiter.waitOne(TimeUnit.MILLISECONDS.convert(j, timeUnit))) {
                throw new TimeoutException();
            }
            if (this.exception != null) {
                throw new ExecutionException(this.exception);
            }
            return this.result;
        }

        public AsyncFuture(UUID uuid, Object obj) {
            this.asyncId = uuid;
            this.callback = obj;
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return this.done;
        }
    }

    @PostConstruct
    public void init() {
        this.queue = this.redisCache.getClient().getPriorityBlockingQueue(QUEUE_NAME);
        this.topic = this.redisCache.getClient().getTopic(TOPIC_NAME);
        this.publishSet = this.redisCache.getClient().getSetCache(IN_PUBLISH_NAME);
        this.topic.addListener(BrowserAsyncResponse.class, (charSequence, browserAsyncResponse) -> {
            RemoteBrowser wrap;
            log.info("Async consume response {}", Sys.toJsonString(browserAsyncResponse));
            try {
                AsyncFuture asyncFuture = this.callbacks.get(browserAsyncResponse.getRequest().getAsyncId());
                if (asyncFuture != null && !asyncFuture.isCancelled()) {
                    try {
                        if (!asyncFuture.isDone()) {
                            try {
                                wrap = RemoteBrowser.wrap(browserAsyncResponse.getEndpoint());
                                try {
                                } catch (Throwable th) {
                                    if (wrap != null) {
                                        try {
                                            wrap.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    }
                                    throw th;
                                }
                            } catch (Throwable th3) {
                                TraceHandler.INSTANCE.log("Async {} error", new Object[]{asyncFuture.asyncId, th3});
                                asyncFuture.exception = th3;
                                this.callbacks.remove(asyncFuture.asyncId);
                                asyncFuture.done = true;
                                asyncFuture.waiter.set();
                            }
                            if (asyncFuture.isCancelled()) {
                                if (wrap != null) {
                                    wrap.close();
                                }
                                this.publishSet.remove(Integer.valueOf(browserAsyncResponse.getEndpoint().getPort()));
                                return;
                            }
                            if (asyncFuture.callback instanceof TripleFunc) {
                                asyncFuture.result = ((TripleFunc) asyncFuture.callback).invoke(wrap, browserAsyncResponse.getRequest().getUrl());
                                if (wrap != null) {
                                    wrap.close();
                                }
                                this.callbacks.remove(asyncFuture.asyncId);
                                asyncFuture.done = true;
                                asyncFuture.waiter.set();
                                this.publishSet.remove(Integer.valueOf(browserAsyncResponse.getEndpoint().getPort()));
                                return;
                            }
                            ((TripleAction) asyncFuture.callback).invoke(wrap, browserAsyncResponse.getRequest().getUrl());
                            if (wrap != null) {
                                wrap.close();
                            }
                            this.callbacks.remove(asyncFuture.asyncId);
                            asyncFuture.done = true;
                            asyncFuture.waiter.set();
                            this.publishSet.remove(Integer.valueOf(browserAsyncResponse.getEndpoint().getPort()));
                        }
                    } finally {
                        this.callbacks.remove(asyncFuture.asyncId);
                        asyncFuture.done = true;
                        asyncFuture.waiter.set();
                    }
                }
            } finally {
                this.publishSet.remove(Integer.valueOf(browserAsyncResponse.getEndpoint().getPort()));
            }
        });
        log.info("register BrowserAsyncTopic ok");
    }

    public void add(@NonNull BrowserAsyncRequest browserAsyncRequest) {
        if (browserAsyncRequest == null) {
            throw new NullPointerException("request is marked non-null but is null");
        }
        this.queue.add(browserAsyncRequest);
    }

    public Future listen(UUID uuid, TripleAction<RemoteBrowser, String> tripleAction) {
        AsyncFuture asyncFuture = new AsyncFuture(uuid, tripleAction);
        this.callbacks.put(uuid, asyncFuture);
        return asyncFuture;
    }

    public <T> Future<T> listen(UUID uuid, TripleFunc<RemoteBrowser, String, T> tripleFunc) {
        AsyncFuture asyncFuture = new AsyncFuture(uuid, tripleFunc);
        this.callbacks.put(uuid, asyncFuture);
        return asyncFuture;
    }

    public List<BrowserAsyncRequest> poll(int i) {
        return this.queue.poll(i);
    }

    public BrowserAsyncRequest poll() {
        return (BrowserAsyncRequest) this.queue.poll();
    }

    public boolean isPublishing(int i) {
        return this.publishSet.contains(Integer.valueOf(i));
    }

    public void publish(BrowserAsyncResponse browserAsyncResponse) {
        if (browserAsyncResponse == null || browserAsyncResponse.getRequest() == null || browserAsyncResponse.getRequest().getAsyncId() == null || browserAsyncResponse.getEndpoint() == null) {
            log.warn("Async publish invalid response {}", Sys.toJsonString(browserAsyncResponse));
        } else {
            this.publishSet.add(Integer.valueOf(browserAsyncResponse.getEndpoint().getPort()), 6L, TimeUnit.SECONDS);
            this.topic.publish(browserAsyncResponse);
        }
    }

    public BrowserAsyncTopic(RedisCache<?, ?> redisCache) {
        this.redisCache = redisCache;
    }
}
