package estonlabs.cxtl.common.stream.managed;

import estonlabs.cxtl.common.DateFormatUtils;
import estonlabs.cxtl.common.codec.Codec;
import estonlabs.cxtl.common.stream.managed.InboundMessage;
import estonlabs.cxtl.common.stream.managed.OutboundMessage;
import estonlabs.cxtl.common.stream.pojo.PojoWsSession;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;

/* loaded from: input_file:estonlabs/cxtl/common/stream/managed/ManagedWsSession.class */
public class ManagedWsSession<SEND extends OutboundMessage, RECEIVE extends InboundMessage> extends PojoWsSession<SEND, RECEIVE> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ManagedWsSession.class);
    private final SimpleDateFormat format;
    private final AtomicInteger pingsSinceLastPong;
    private final AtomicInteger failedReconnects;
    private final AtomicBoolean isReconnecting;
    private final AtomicBoolean isOpen;
    private Runnable ping;
    private Supplier<SEND> createPong;
    private Supplier<SEND> createLogin;
    private final List<SEND> recoveryMessages;
    private long nextReconnectMs;
    private final String id;
    private long pingWindow;
    private boolean expectPingResponse;
    private long staleWindow;
    private long nextPing;
    private long lastReceived;
    private final Map<String, Disposable> timerDisposables;

    public ManagedWsSession(String str, Codec<Object> codec, Class<RECEIVE> cls) {
        super(codec, cls);
        this.format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        this.pingsSinceLastPong = new AtomicInteger(0);
        this.failedReconnects = new AtomicInteger(0);
        this.isReconnecting = new AtomicBoolean(false);
        this.isOpen = new AtomicBoolean(true);
        this.recoveryMessages = new CopyOnWriteArrayList();
        this.nextReconnectMs = 0L;
        this.staleWindow = -1L;
        this.nextPing = 0L;
        this.lastReceived = 0L;
        this.timerDisposables = new ConcurrentHashMap();
        this.id = str;
    }

    public void enableRestartFeedAfter(long j) {
        setTimer("restartFeedAfter", j, this::restartFeed);
    }

    public void enableStaleFeedCheck(long j) {
        this.staleWindow = j;
        setTimer("staleWindow", this.staleWindow, this::checkForStaleFeed);
    }

    public void enablePing(long j, Runnable runnable, boolean z) {
        this.pingWindow = j;
        this.ping = runnable;
        this.expectPingResponse = z;
        setTimer("pingWindow", j, this::ping);
    }

    public void enablePing(long j, Supplier<SEND> supplier) {
        enablePing(j, () -> {
            OutboundMessage outboundMessage = (OutboundMessage) supplier.get();
            if (this.underlyingSession.isOpen()) {
                send(outboundMessage);
            } else {
                LOGGER.debug("{} Skipping ping, the underlying session is not open", this.id);
            }
        }, true);
    }

    @Override // estonlabs.cxtl.common.stream.pojo.PojoWsSession, estonlabs.cxtl.common.stream.core.WsSession, reactor.core.Disposable
    public void dispose() {
        close();
        this.timerDisposables.values().forEach((v0) -> {
            v0.dispose();
        });
        this.timerDisposables.clear();
        super.dispose();
    }

    public void enablePong(Supplier<SEND> supplier) {
        this.createPong = supplier;
    }

    public void enableLogin(Supplier<SEND> supplier) {
        this.createLogin = supplier;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // estonlabs.cxtl.common.stream.pojo.PojoWsSession
    public void processToSend(SEND send) {
        if (send.getMessageType().isRecoverable()) {
            this.recoveryMessages.add(send);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // estonlabs.cxtl.common.stream.pojo.PojoWsSession
    public boolean processReceived(RECEIVE receive) {
        this.lastReceived = System.currentTimeMillis();
        InboundMessage.MessageType messageType = receive.getMessageType();
        if (messageType == InboundMessage.MessageType.PONG) {
            this.pingsSinceLastPong.set(0);
            LOGGER.debug("Received pong {}, ping count is now {}", receive, this.pingsSinceLastPong);
        } else if (messageType == InboundMessage.MessageType.PING) {
            if (this.createPong == null) {
                LOGGER.error("Received ping {}, but no pong supplier provided.", receive);
                return false;
            }
            sendDirectly(this.createPong.get());
            return false;
        }
        return messageType == InboundMessage.MessageType.DATA;
    }

    @Override // estonlabs.cxtl.common.stream.pojo.PojoWsSession, estonlabs.cxtl.common.stream.core.WsSession
    public void close() {
        this.recoveryMessages.clear();
        this.isOpen.set(false);
        super.close();
        LOGGER.debug("{} Closed session session={}, isOpen={}", this.id, this.underlyingSession, Boolean.valueOf(this.isOpen.get()));
    }

    public void restart() {
        super.close();
    }

    private void checkForStaleFeed() {
        long currentTimeMillis = System.currentTimeMillis();
        long j = this.lastReceived + this.staleWindow;
        if (this.lastReceived <= 0 || j >= currentTimeMillis) {
            return;
        }
        LOGGER.error(" {} STALE FEED!!!  Will restart the feed: now={} vs last= {}", this.id, DateFormatUtils.toTime(currentTimeMillis), DateFormatUtils.toTime(this.lastReceived));
        restartFeed();
    }

    private void restartFeed() {
        try {
            LOGGER.debug("{} Restarting feed", this.id);
            restartConnection();
        } catch (Exception e) {
            LOGGER.error("Error restarting feed {}, will try once more", this.id, e);
            restartConnection();
        }
    }

    private void ping() {
        if (this.underlyingSession == null || !this.isOpen.get()) {
            LOGGER.debug("{} Skipping ping, the underlying session is not initiated yet or the session is closed, session={}, isOpen={}", this.id, this.underlyingSession, Boolean.valueOf(this.isOpen.get()));
            return;
        }
        if (this.expectPingResponse) {
            if (!this.underlyingSession.isOpen() || this.pingsSinceLastPong.get() > 2) {
                restartConnection();
            } else if (this.pingsSinceLastPong.get() > 0) {
                LOGGER.warn("{} missed {} pings.", this.id, Integer.valueOf(this.pingsSinceLastPong.get()));
            }
            this.pingsSinceLastPong.incrementAndGet();
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis < this.nextPing) {
            LOGGER.debug("{} Skipping ping, not time to ping yet", this.id);
            return;
        }
        try {
            this.ping.run();
        } catch (Exception e) {
            LOGGER.error("Error sending ping, will try to reconnect.", (Throwable) e);
            restartConnection();
        }
        this.nextPing = currentTimeMillis + this.pingWindow;
    }

    @Override // estonlabs.cxtl.common.stream.pojo.PojoWsSession, estonlabs.cxtl.common.stream.core.WebsocketListener
    public void processOpened() {
        LOGGER.debug("{} Session is open", this.id);
        this.failedReconnects.set(0);
        this.pingsSinceLastPong.set(0);
        this.nextReconnectMs = 0L;
        this.isOpen.set(true);
        if (this.createLogin != null) {
            LOGGER.debug("{} Logging in", this.id);
            doSendDirectly(this.createLogin.get());
            LOGGER.debug("{} Waiting for  response", this.id);
            Thread.sleep(1000L);
        }
        recoverSubscriptions();
    }

    private void recoverSubscriptions() {
        for (SEND send : this.recoveryMessages) {
            LOGGER.debug("Recovering subscription {}", send);
            doSendDirectly(send);
        }
    }

    private void doSendDirectly(SEND send) {
        sendDirectly(send);
    }

    @Override // estonlabs.cxtl.common.stream.pojo.PojoWsSession, estonlabs.cxtl.common.stream.core.WebsocketListener
    public void processError(Throwable th) {
        super.processError(th);
        restartConnection();
    }

    private void restartConnection() {
        if (System.currentTimeMillis() < this.nextReconnectMs || this.isReconnecting.getAndSet(true)) {
            LOGGER.debug("{} Not ready to reconnect", this.id);
            return;
        }
        try {
            LOGGER.debug("{} Reconnecting!", this.id);
            try {
                LOGGER.debug("{} Closing", this.id);
                this.underlyingSession.close();
                Thread.sleep(1000L);
            } catch (Exception e) {
                LOGGER.error("{}: Error closing the session, will ignore and continue", this.id, e);
            }
            try {
                LOGGER.debug("{} Connecting", this.id);
                this.underlyingSession.connect();
                Thread.sleep(1000L);
            } catch (Exception e2) {
                int incrementAndGet = this.failedReconnects.incrementAndGet();
                this.nextReconnectMs = System.currentTimeMillis() + ((this.failedReconnects.get() / 3) * 5000);
                LOGGER.error("{}: Failed to reconnect {} times, will try again at {}", this.id, Integer.valueOf(incrementAndGet), this.format.format(Long.valueOf(this.nextReconnectMs)), e2);
            }
        } finally {
            this.isReconnecting.set(false);
        }
    }

    public void setTimer(String str, long j, Runnable runnable) {
        Disposable remove = this.timerDisposables.remove(str);
        if (remove != null) {
            remove.dispose();
        }
        if (j > 0) {
            this.timerDisposables.put(str, PeriodicTimer.INSTANCE.register(runnable));
        }
    }
}
