/*
 * Decompiled with CFR 0.152.
 */
package estonlabs.cxtl.common.stream.managed;

import estonlabs.cxtl.common.DateFormatUtils;
import estonlabs.cxtl.common.codec.Codec;
import estonlabs.cxtl.common.stream.managed.InboundMessage;
import estonlabs.cxtl.common.stream.managed.OutboundMessage;
import estonlabs.cxtl.common.stream.managed.PeriodicTimer;
import estonlabs.cxtl.common.stream.pojo.PojoWsSession;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;

public class ManagedWsSession<SEND extends OutboundMessage, RECEIVE extends InboundMessage>
extends PojoWsSession<SEND, RECEIVE> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ManagedWsSession.class);
    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private final AtomicInteger pingsSinceLastPong = new AtomicInteger(0);
    private final AtomicInteger failedReconnects = new AtomicInteger(0);
    private final AtomicBoolean isReconnecting = new AtomicBoolean(false);
    private final AtomicBoolean isOpen = new AtomicBoolean(true);
    private Runnable ping;
    private Supplier<SEND> createPong;
    private Supplier<SEND> createLogin;
    private final List<SEND> recoveryMessages = new CopyOnWriteArrayList<SEND>();
    private long nextReconnectMs = 0L;
    private final String id;
    private long pingWindow;
    private boolean expectPingResponse;
    private long staleWindow = -1L;
    private long nextPing = 0L;
    private long lastReceived = 0L;
    private final Map<String, Disposable> timerDisposables = new ConcurrentHashMap<String, Disposable>();

    public ManagedWsSession(String id, Codec<Object> codec, Class<RECEIVE> inboundType) {
        super(codec, inboundType);
        this.id = id;
    }

    public void enableRestartFeedAfter(long restartFeedAfter) {
        this.setTimer("restartFeedAfter", restartFeedAfter, this::restartFeed);
    }

    public void enableStaleFeedCheck(long staleFeedWindow) {
        this.staleWindow = staleFeedWindow;
        this.setTimer("staleWindow", this.staleWindow, this::checkForStaleFeed);
    }

    public void enablePing(long pingWindow, Runnable ping2, boolean expectPingResponse) {
        this.pingWindow = pingWindow;
        this.ping = ping2;
        this.expectPingResponse = expectPingResponse;
        this.setTimer("pingWindow", pingWindow, this::ping);
    }

    public void enablePing(long pingWindow, Supplier<SEND> createPing) {
        this.enablePing(pingWindow, () -> {
            OutboundMessage ping2 = (OutboundMessage)createPing.get();
            if (this.underlyingSession.isOpen()) {
                this.send(ping2);
            } else {
                LOGGER.debug("{} Skipping ping, the underlying session is not open", (Object)this.id);
            }
        }, true);
    }

    @Override
    public void dispose() {
        this.close();
        this.timerDisposables.values().forEach(Disposable::dispose);
        this.timerDisposables.clear();
        super.dispose();
    }

    public void enablePong(Supplier<SEND> createPong) {
        this.createPong = createPong;
    }

    public void enableLogin(Supplier<SEND> createLogin) {
        this.createLogin = createLogin;
    }

    @Override
    protected void processToSend(SEND toSend) {
        if (toSend.getMessageType().isRecoverable()) {
            this.recoveryMessages.add(toSend);
        }
    }

    @Override
    protected boolean processReceived(RECEIVE received) {
        this.lastReceived = System.currentTimeMillis();
        InboundMessage.MessageType messageType = received.getMessageType();
        if (messageType == InboundMessage.MessageType.PONG) {
            this.pingsSinceLastPong.set(0);
            LOGGER.debug("Received pong {}, ping count is now {}", (Object)received, (Object)this.pingsSinceLastPong);
        } else if (messageType == InboundMessage.MessageType.PING) {
            if (this.createPong == null) {
                LOGGER.error("Received ping {}, but no pong supplier provided.", (Object)received);
            } else {
                this.sendDirectly((OutboundMessage)this.createPong.get());
            }
            return false;
        }
        return messageType == InboundMessage.MessageType.DATA;
    }

    @Override
    public void close() {
        this.recoveryMessages.clear();
        this.isOpen.set(false);
        super.close();
        LOGGER.debug("{} Closed session session={}, isOpen={}", this.id, this.underlyingSession, this.isOpen.get());
    }

    public void restart() {
        super.close();
    }

    private void checkForStaleFeed() {
        long now = System.currentTimeMillis();
        long lastAcceptableMs = this.lastReceived + this.staleWindow;
        if (this.lastReceived > 0L && lastAcceptableMs < now) {
            LOGGER.error(" {} STALE FEED!!!  Will restart the feed: now={} vs last= {}", this.id, DateFormatUtils.toTime(now), DateFormatUtils.toTime(this.lastReceived));
            this.restartFeed();
        }
    }

    private void restartFeed() {
        try {
            LOGGER.debug("{} Restarting feed", (Object)this.id);
            this.restartConnection();
        }
        catch (Exception e) {
            LOGGER.error("Error restarting feed {}, will try once more", (Object)this.id, (Object)e);
            this.restartConnection();
        }
    }

    private void ping() {
        long now;
        if (this.underlyingSession == null || !this.isOpen.get()) {
            LOGGER.debug("{} Skipping ping, the underlying session is not initiated yet or the session is closed, session={}, isOpen={}", this.id, this.underlyingSession, this.isOpen.get());
            return;
        }
        if (this.expectPingResponse) {
            if (!this.underlyingSession.isOpen() || this.pingsSinceLastPong.get() > 2) {
                this.restartConnection();
            } else if (this.pingsSinceLastPong.get() > 0) {
                LOGGER.warn("{} missed {} pings.", (Object)this.id, (Object)this.pingsSinceLastPong.get());
            }
            this.pingsSinceLastPong.incrementAndGet();
        }
        if ((now = System.currentTimeMillis()) < this.nextPing) {
            LOGGER.debug("{} Skipping ping, not time to ping yet", (Object)this.id);
            return;
        }
        try {
            this.ping.run();
        }
        catch (Exception e) {
            LOGGER.error("Error sending ping, will try to reconnect.", e);
            this.restartConnection();
        }
        this.nextPing = now + this.pingWindow;
    }

    @Override
    public void processOpened() {
        LOGGER.debug("{} Session is open", (Object)this.id);
        this.failedReconnects.set(0);
        this.pingsSinceLastPong.set(0);
        this.nextReconnectMs = 0L;
        this.isOpen.set(true);
        if (this.createLogin != null) {
            LOGGER.debug("{} Logging in", (Object)this.id);
            this.doSendDirectly((OutboundMessage)this.createLogin.get());
            LOGGER.debug("{} Waiting for  response", (Object)this.id);
            Thread.sleep(1000L);
        }
        this.underlyingListener.processOpened();
        this.recoverSubscriptions();
    }

    private void recoverSubscriptions() {
        for (OutboundMessage s : this.recoveryMessages) {
            LOGGER.debug("Recovering subscription {}", (Object)s);
            this.doSendDirectly(s);
        }
    }

    private void doSendDirectly(SEND s) {
        this.sendDirectly(s);
    }

    @Override
    public void processError(Throwable t) {
        super.processError(t);
        this.restartConnection();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restartConnection() {
        long now = System.currentTimeMillis();
        if (now >= this.nextReconnectMs && !this.isReconnecting.getAndSet(true)) {
            try {
                LOGGER.debug("{} Reconnecting!", (Object)this.id);
                try {
                    LOGGER.debug("{} Closing", (Object)this.id);
                    this.underlyingSession.close();
                    Thread.sleep(1000L);
                }
                catch (Exception e) {
                    LOGGER.error("{}: Error closing the session, will ignore and continue", (Object)this.id, (Object)e);
                }
                try {
                    LOGGER.debug("{} Connecting", (Object)this.id);
                    this.underlyingSession.connect();
                    Thread.sleep(1000L);
                }
                catch (Exception e) {
                    int failed = this.failedReconnects.incrementAndGet();
                    now = System.currentTimeMillis();
                    long ratio = this.failedReconnects.get() / 3;
                    this.nextReconnectMs = now + ratio * 5000L;
                    LOGGER.error("{}: Failed to reconnect {} times, will try again at {}", this.id, failed, this.format.format(this.nextReconnectMs), e);
                }
            }
            finally {
                this.isReconnecting.set(false);
            }
        } else {
            LOGGER.debug("{} Not ready to reconnect", (Object)this.id);
        }
    }

    public void setTimer(String key, long period, Runnable runnable2) {
        Disposable d = this.timerDisposables.remove(key);
        if (d != null) {
            d.dispose();
        }
        if (period > 0L) {
            this.timerDisposables.put(key, PeriodicTimer.INSTANCE.register(period, runnable2));
        }
    }
}

