/*
 * Decompiled with CFR 0.152.
 */
package org.drasyl.handler.arq.gobackn;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelPromise;
import io.netty.channel.PendingWriteQueue;
import io.netty.util.ReferenceCountUtil;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.drasyl.handler.arq.gobackn.GoBackNArqAck;
import org.drasyl.handler.arq.gobackn.GoBackNArqData;
import org.drasyl.handler.arq.gobackn.PendingQueueWindow;
import org.drasyl.handler.arq.gobackn.SimpleWindow;
import org.drasyl.handler.arq.gobackn.Window;
import org.drasyl.util.UnsignedInteger;
import org.drasyl.util.logging.Logger;
import org.drasyl.util.logging.LoggerFactory;

public class GoBackNArqSenderHandler
extends ChannelDuplexHandler {
    private static final Logger LOG = LoggerFactory.getLogger(GoBackNArqSenderHandler.class);
    private final int windowSize;
    private Window window;
    private PendingWriteQueue overflow;
    private UnsignedInteger base;
    private UnsignedInteger nextSeqNum;
    private final Duration retryTimeout;
    private ScheduledFuture<?> retryTask;
    private final boolean windowShouldAffectWritability;

    public GoBackNArqSenderHandler(int windowSize, Duration retryTimeout, UnsignedInteger base, UnsignedInteger nextSeqNum, boolean windowShouldAffectWritability) {
        this.windowSize = windowSize;
        this.retryTimeout = retryTimeout;
        this.base = base;
        this.nextSeqNum = nextSeqNum;
        this.windowShouldAffectWritability = windowShouldAffectWritability;
    }

    public GoBackNArqSenderHandler(int windowSize, Duration retryTimeout) {
        this(windowSize, retryTimeout, UnsignedInteger.MIN_VALUE, UnsignedInteger.MIN_VALUE, false);
    }

    public GoBackNArqSenderHandler(int windowSize, Duration retryTimeout, boolean windowShouldAffectWritability) {
        this(windowSize, retryTimeout, UnsignedInteger.MIN_VALUE, UnsignedInteger.MIN_VALUE, windowShouldAffectWritability);
    }

    public void handlerAdded(ChannelHandlerContext ctx) {
        Supplier[] supplierArray = new Supplier[3];
        supplierArray[0] = () -> ((Channel)ctx.channel()).id();
        supplierArray[1] = () -> this.windowSize;
        supplierArray[2] = this.retryTimeout::toMillis;
        LOG.trace("[{}] Used windows size of {} and retry timeout of {}ms", supplierArray);
        this.window = this.windowShouldAffectWritability ? new PendingQueueWindow(ctx, this.windowSize) : new SimpleWindow(this.windowSize);
        this.overflow = new PendingWriteQueue(ctx);
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.window.removeAndFailAll(new ClosedChannelException());
        this.overflow.removeAndFailAll((Throwable)new ClosedChannelException());
        ctx.fireChannelInactive();
        this.stopTimer();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof GoBackNArqAck) {
            GoBackNArqAck ack = (GoBackNArqAck)msg;
            LOG.trace("[{}] Got {}", () -> ((ChannelId)ctx.channel().id()).asShortText(), () -> ack);
            if (ack.sequenceNo().safeIncrement().equals((Object)this.nextSeqNum)) {
                this.stopTimer();
            } else {
                this.resetTimer(ctx);
            }
            if (ack.sequenceNo().getValue() >= this.base.getValue() && ack.sequenceNo().getValue() < this.nextSeqNum.getValue()) {
                long cumAck = ack.sequenceNo().getValue() - this.base.getValue() + 1L;
                this.base = ack.sequenceNo().safeIncrement();
                this.succeedWrites(ctx, cumAck);
            } else if (this.base.getValue() > this.nextSeqNum.getValue()) {
                long cumAck = UnsignedInteger.MAX_VALUE.getValue() - this.base.getValue() + (ack.sequenceNo().getValue() + 1L);
                this.base = ack.sequenceNo().safeIncrement();
                this.succeedWrites(ctx, cumAck);
            } else {
                LOG.trace("[{}] Got unexpected (maybe out-of-order) {}. Drop it.", () -> ((ChannelId)ctx.channel().id()).asShortText(), () -> ack);
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (msg instanceof GoBackNArqData) {
            this.overflow.add(msg, promise);
            this.writeData(ctx);
        } else {
            ctx.write(msg, promise);
        }
    }

    private void succeedWrites(ChannelHandlerContext ctx, long cumAck) {
        for (long i = 0L; i < cumAck; ++i) {
            this.window.remove().trySuccess();
        }
        this.writeData(ctx);
        ctx.flush();
    }

    private void writeData(ChannelHandlerContext ctx) {
        int freeSpace = Math.min(this.window.getFreeSpace(), this.overflow.size());
        for (int i = 0; i < freeSpace; ++i) {
            Object o = this.overflow.current();
            if (o == null) {
                this.overflow.remove();
                continue;
            }
            GoBackNArqData msg = (GoBackNArqData)o;
            msg.content().retain();
            ChannelPromise promise = this.overflow.remove();
            if (promise.isDone()) {
                ReferenceCountUtil.safeRelease((Object)msg);
                continue;
            }
            this.window.add(msg, promise);
            this.send(ctx, msg, this.nextSeqNum);
            if (this.base.equals((Object)this.nextSeqNum)) {
                this.resetTimer(ctx);
            }
            this.nextSeqNum = this.nextSeqNum.safeIncrement();
        }
    }

    private void send(ChannelHandlerContext ctx, GoBackNArqData msg, UnsignedInteger seqNo) {
        if (!ctx.channel().isActive()) {
            this.window.removeAndFailAll(new ClosedChannelException());
            this.overflow.removeAndFailAll((Throwable)new ClosedChannelException());
            this.stopTimer();
        } else {
            GoBackNArqData data = new GoBackNArqData(seqNo, msg.content().retain());
            LOG.trace("[{}] Write {}", () -> ((ChannelId)ctx.channel().id()).asShortText(), () -> data);
            ctx.write((Object)data);
        }
    }

    private void resetTimer(ChannelHandlerContext ctx) {
        LOG.trace("[{}] Reset timer", () -> ((ChannelId)ctx.channel().id()).asShortText());
        this.stopTimer();
        this.retryTask = ctx.executor().schedule(() -> this.resend(ctx), this.retryTimeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void stopTimer() {
        if (this.retryTask != null) {
            LOG.trace("Reset timer");
            this.retryTask.cancel(true);
            this.retryTask = null;
        }
    }

    private void resend(ChannelHandlerContext ctx) {
        UnsignedInteger seqNo = UnsignedInteger.of((long)this.base.getValue());
        if (this.window.size() != 0) {
            LOG.info("[{}] ACKs got timeout. Resend complete window of size {}", () -> ((ChannelId)ctx.channel().id()).asShortText(), this.window::size);
            for (Window.Frame frame : this.window.getQueue()) {
                if (frame.getPromise().isDone()) {
                    this.window.remove();
                    continue;
                }
                this.send(ctx, frame.getMsg(), seqNo);
                seqNo = seqNo.safeIncrement();
            }
            ctx.flush();
            this.resetTimer(ctx);
        }
    }
}

