package io.vproxy.base.selector.wrap.streamed;

import io.vproxy.base.Config;
import io.vproxy.base.selector.Handler;
import io.vproxy.base.selector.HandlerContext;
import io.vproxy.base.selector.PeriodicEvent;
import io.vproxy.base.selector.SelectorEventLoop;
import io.vproxy.base.selector.wrap.arqudp.ArqUDPBasedFDs;
import io.vproxy.base.selector.wrap.arqudp.ArqUDPServerSocketFD;
import io.vproxy.base.selector.wrap.arqudp.ArqUDPSocketFD;
import io.vproxy.base.selector.wrap.udp.UDPBasedFDs;
import io.vproxy.base.util.LogType;
import io.vproxy.base.util.Logger;
import io.vproxy.base.util.log.ProbeType;
import io.vproxy.vfd.EventSet;
import io.vproxy.vfd.IPPort;
import io.vproxy.vfd.ServerSocketFD;
import io.vproxy.vfd.SocketFD;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

/* loaded from: input_file:io/vproxy/base/selector/wrap/streamed/StreamedArqUDPServerFDs.class */
public class StreamedArqUDPServerFDs implements UDPBasedFDs {
    private final ArqUDPBasedFDs fds;
    private final SelectorEventLoop loop;
    private final IPPort local;
    private ArqUDPServerSocketFD fd;
    private final Supplier<StreamedFDHandler> handlerSupplier;
    private final Map<ArqUDPSocketFD, PeriodicEvent> keepaliveEvents = new HashMap();
    private final Map<ArqUDPSocketFD, StreamedFDHandler> currentHandlers = new HashMap();
    private final StreamedServerSocketFD[] serverPtr = new StreamedServerSocketFD[1];

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamedArqUDPServerFDs(ArqUDPBasedFDs arqUDPBasedFDs, SelectorEventLoop selectorEventLoop, IPPort iPPort, Supplier<StreamedFDHandler> supplier) throws IOException {
        this.fds = arqUDPBasedFDs;
        this.loop = selectorEventLoop;
        this.local = iPPort;
        this.handlerSupplier = supplier;
        init();
    }

    private void init() throws IOException {
        initProbe();
        start();
    }

    private void initProbe() {
        if (Config.probe.contains(ProbeType.STREAMED_ARQ_UDP_RECORD)) {
            this.loop.period(30000, () -> {
                String formatToIPPortString = this.local.formatToIPPortString();
                Iterator<Map.Entry<ArqUDPSocketFD, StreamedFDHandler>> it = this.currentHandlers.entrySet().iterator();
                while (it.hasNext()) {
                    try {
                        Logger.probe("accepted: " + formatToIPPortString + " <- " + it.next().getKey().getRemoteAddress().formatToIPPortString());
                    } catch (Throwable th) {
                        Logger.shouldNotHappen("got exception when probing", th);
                    }
                }
            });
        }
    }

    private void stop() {
        Iterator<PeriodicEvent> it = this.keepaliveEvents.values().iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
        this.keepaliveEvents.clear();
        if (this.fd != null) {
            this.loop.remove(this.fd);
            try {
                this.fd.close();
            } catch (IOException e) {
                Logger.shouldNotHappen("closing fd " + this.fd + " failed", e);
            }
        }
        Iterator<StreamedFDHandler> it2 = this.currentHandlers.values().iterator();
        while (it2.hasNext()) {
            it2.next().clear();
        }
        this.currentHandlers.clear();
    }

    private void restart() {
        stop();
        try {
            start();
        } catch (IOException e) {
            Logger.shouldNotHappen("starting streamed arq udp client failed", e);
        }
    }

    private void start() throws IOException {
        boolean z = true;
        try {
            this.fd = this.fds.openServerSocketFD(this.loop);
            this.fd.bind(this.local);
            this.loop.add(this.fd, EventSet.read(), null, new Handler<ArqUDPServerSocketFD>() { // from class: io.vproxy.base.selector.wrap.streamed.StreamedArqUDPServerFDs.1
                static final /* synthetic */ boolean $assertionsDisabled;

                @Override // io.vproxy.base.selector.Handler
                public void accept(HandlerContext<ArqUDPServerSocketFD> handlerContext) {
                    while (true) {
                        try {
                            ArqUDPSocketFD accept = handlerContext.getChannel().accept();
                            if (accept == null) {
                                return;
                            }
                            StreamedFDHandler streamedFDHandler = StreamedArqUDPServerFDs.this.handlerSupplier.get();
                            streamedFDHandler.init(accept, StreamedArqUDPServerFDs.this.loop, this::ready, this::invalid, this::accepted);
                            try {
                                StreamedArqUDPServerFDs.this.loop.add(accept, EventSet.read(), null, streamedFDHandler);
                                StreamedArqUDPServerFDs.this.currentHandlers.put(accept, streamedFDHandler);
                                SelectorEventLoop selectorEventLoop = StreamedArqUDPServerFDs.this.loop;
                                Objects.requireNonNull(streamedFDHandler);
                                StreamedArqUDPServerFDs.this.keepaliveEvents.put(accept, selectorEventLoop.period(30000, streamedFDHandler::keepalive));
                            } catch (IOException e) {
                                Logger.error(LogType.EVENT_LOOP_ADD_FAIL, "adding fd " + accept + " to event loop failed", e);
                                try {
                                    accept.close();
                                    return;
                                } catch (IOException e2) {
                                    Logger.error(LogType.CONN_ERROR, "closing fd " + accept + " failed", e2);
                                    return;
                                }
                            }
                        } catch (IOException e3) {
                            Logger.error(LogType.CONN_ERROR, "accepting fd " + StreamedArqUDPServerFDs.this.fd + " failed", e3);
                            return;
                        }
                    }
                }

                private void ready(ArqUDPSocketFD arqUDPSocketFD) {
                    StreamedFDHandler streamedFDHandler = StreamedArqUDPServerFDs.this.currentHandlers.get(arqUDPSocketFD);
                    if (!$assertionsDisabled && streamedFDHandler == null) {
                        throw new AssertionError();
                    }
                    Logger.alert("streamed arq udp is ready: " + streamedFDHandler.getClass().getSimpleName() + "(" + arqUDPSocketFD + ")");
                }

                private void invalid(ArqUDPSocketFD arqUDPSocketFD) {
                    StreamedArqUDPServerFDs.this.loop.remove(arqUDPSocketFD);
                    try {
                        arqUDPSocketFD.close();
                    } catch (IOException e) {
                        Logger.error(LogType.CONN_ERROR, "closing fd " + arqUDPSocketFD + " failed", e);
                    }
                    StreamedFDHandler remove = StreamedArqUDPServerFDs.this.currentHandlers.remove(arqUDPSocketFD);
                    if (remove == null) {
                        Logger.shouldNotHappen("currentHandlers map does not contain key fd " + arqUDPSocketFD);
                    } else {
                        remove.clear();
                    }
                    PeriodicEvent remove2 = StreamedArqUDPServerFDs.this.keepaliveEvents.remove(arqUDPSocketFD);
                    if (remove2 == null) {
                        Logger.shouldNotHappen("keepaliveEvents map does not contain key fd " + arqUDPSocketFD);
                    } else {
                        remove2.cancel();
                    }
                }

                private boolean accepted(StreamedFD streamedFD) {
                    StreamedServerSocketFD streamedServerSocketFD = null;
                    synchronized (StreamedArqUDPServerFDs.this.serverPtr) {
                        if (StreamedArqUDPServerFDs.this.serverPtr[0] != null) {
                            streamedServerSocketFD = StreamedArqUDPServerFDs.this.serverPtr[0];
                        }
                    }
                    if (streamedServerSocketFD == null) {
                        return false;
                    }
                    streamedServerSocketFD.accepted(streamedFD);
                    return true;
                }

                @Override // io.vproxy.base.selector.Handler
                public void connected(HandlerContext<ArqUDPServerSocketFD> handlerContext) {
                    Logger.shouldNotHappen("connected should not fire");
                }

                @Override // io.vproxy.base.selector.Handler
                public void readable(HandlerContext<ArqUDPServerSocketFD> handlerContext) {
                    Logger.shouldNotHappen("readable should not fire");
                }

                @Override // io.vproxy.base.selector.Handler
                public void writable(HandlerContext<ArqUDPServerSocketFD> handlerContext) {
                    Logger.shouldNotHappen("writable should not fire");
                }

                @Override // io.vproxy.base.selector.Handler
                public void removed(HandlerContext<ArqUDPServerSocketFD> handlerContext) {
                    Logger.error(LogType.IMPROPER_USE, "the streamed arq udp server fd " + StreamedArqUDPServerFDs.this.fd + " is removed from loop");
                    StreamedArqUDPServerFDs.this.restart();
                }

                static {
                    $assertionsDisabled = !StreamedArqUDPServerFDs.class.desiredAssertionStatus();
                }
            });
            z = false;
            if (0 == 0 || this.fd == null) {
                return;
            }
            this.fd.close();
        } catch (Throwable th) {
            if (z && this.fd != null) {
                this.fd.close();
            }
            throw th;
        }
    }

    @Override // io.vproxy.base.selector.wrap.udp.UDPBasedFDs
    public ServerSocketFD openServerSocketFD(SelectorEventLoop selectorEventLoop) throws IOException {
        if (this.fd == null || !this.fd.isOpen()) {
            throw new IOException("not valid");
        }
        return new StreamedServerSocketFD(this.fd, selectorEventLoop, this.local, this.serverPtr);
    }

    @Override // io.vproxy.base.selector.wrap.udp.UDPBasedFDs
    public SocketFD openSocketFD(SelectorEventLoop selectorEventLoop) {
        throw new UnsupportedOperationException();
    }
}
