package org.drasyl.handler.arq.stopandwait;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelPromise;
import io.netty.channel.PendingWriteQueue;
import java.nio.channels.ClosedChannelException;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.drasyl.util.Preconditions;
import org.drasyl.util.logging.Logger;
import org.drasyl.util.logging.LoggerFactory;

/* loaded from: input_file:org/drasyl/handler/arq/stopandwait/StopAndWaitArqHandler.class */
public class StopAndWaitArqHandler extends ChannelDuplexHandler {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StopAndWaitArqHandler.class);
    private PendingWriteQueue pendingWrites;
    private final Map<Object, ChannelPromise> promises;
    private int retryTimeout;
    private boolean expectedInboundSequenceNo;
    private long lastWriteAttempt;
    private Object lastWrite;

    public StopAndWaitArqHandler(int i, boolean z) {
        this.promises = new IdentityHashMap();
        setRetryTimeout(i);
        setExpectedInboundSequenceNo(z);
    }

    public StopAndWaitArqHandler(int i) {
        this(i, false);
    }

    public void handlerAdded(ChannelHandlerContext channelHandlerContext) {
        this.pendingWrites = new PendingWriteQueue(channelHandlerContext);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        discardPendingWrites(channelHandlerContext, new ClosedChannelException());
        channelHandlerContext.fireChannelInactive();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        if (obj instanceof StopAndWaitArqData) {
            StopAndWaitArqData stopAndWaitArqData = (StopAndWaitArqData) obj;
            if (this.expectedInboundSequenceNo != stopAndWaitArqData.sequenceNo()) {
                Logger logger = LOG;
                ChannelId id = channelHandlerContext.channel().id();
                Objects.requireNonNull(id);
                logger.trace("[{}] Got unexpected {}. Drop it.", id::asShortText, () -> {
                    return stopAndWaitArqData;
                });
                stopAndWaitArqData.release();
                writeAck(channelHandlerContext);
                return;
            }
            Logger logger2 = LOG;
            ChannelId id2 = channelHandlerContext.channel().id();
            Objects.requireNonNull(id2);
            logger2.trace("[{}] Got expected {}. Pass through.", id2::asShortText, () -> {
                return stopAndWaitArqData;
            });
            this.expectedInboundSequenceNo = !this.expectedInboundSequenceNo;
            writeAck(channelHandlerContext);
            channelHandlerContext.fireChannelRead(obj);
            return;
        }
        if (!(obj instanceof StopAndWaitArqAck)) {
            channelHandlerContext.fireChannelRead(obj);
            return;
        }
        StopAndWaitArqAck stopAndWaitArqAck = (StopAndWaitArqAck) obj;
        Boolean outboundSequenceNo = outboundSequenceNo();
        if (outboundSequenceNo == null || outboundSequenceNo.booleanValue() == stopAndWaitArqAck.sequenceNo()) {
            Logger logger3 = LOG;
            ChannelId id3 = channelHandlerContext.channel().id();
            Objects.requireNonNull(id3);
            logger3.trace("[{}] Got unexpected {}. Drop it.", id3::asShortText, () -> {
                return stopAndWaitArqAck;
            });
            return;
        }
        Logger logger4 = LOG;
        ChannelId id4 = channelHandlerContext.channel().id();
        Objects.requireNonNull(id4);
        logger4.trace("[{}] Got expected {}. Succeed current DATA.", id4::asShortText, () -> {
            return stopAndWaitArqAck;
        });
        succeedCurrentWrite(channelHandlerContext);
        writeNextPending(channelHandlerContext);
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) {
        if (!(obj instanceof StopAndWaitArqData)) {
            channelHandlerContext.write(obj, channelPromise);
            return;
        }
        StopAndWaitArqData stopAndWaitArqData = (StopAndWaitArqData) obj;
        this.promises.put(stopAndWaitArqData, channelPromise);
        this.pendingWrites.add(stopAndWaitArqData, channelPromise);
        channelPromise.addListener(future -> {
            this.promises.remove(stopAndWaitArqData);
        });
    }

    public void flush(ChannelHandlerContext channelHandlerContext) throws Exception {
        writeNextPending(channelHandlerContext);
        channelHandlerContext.flush();
    }

    public void setRetryTimeout(int i) {
        this.retryTimeout = Preconditions.requirePositive(i);
    }

    public void setExpectedInboundSequenceNo(boolean z) {
        this.expectedInboundSequenceNo = z;
    }

    private Boolean outboundSequenceNo() {
        StopAndWaitArqData stopAndWaitArqData = (StopAndWaitArqData) this.pendingWrites.current();
        if (stopAndWaitArqData == null) {
            return null;
        }
        return Boolean.valueOf(stopAndWaitArqData.sequenceNo());
    }

    private void writeNextPending(ChannelHandlerContext channelHandlerContext) {
        if (!channelHandlerContext.channel().isActive()) {
            discardPendingWrites(channelHandlerContext, new ClosedChannelException());
            return;
        }
        while (true) {
            StopAndWaitArqData stopAndWaitArqData = (StopAndWaitArqData) this.pendingWrites.current();
            if (stopAndWaitArqData == null) {
                return;
            }
            ChannelPromise channelPromise = this.promises.get(stopAndWaitArqData);
            if (channelPromise != null && !channelPromise.isDone()) {
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis < this.lastWriteAttempt + this.retryTimeout) {
                    return;
                }
                this.lastWriteAttempt = currentTimeMillis;
                if (LOG.isTraceEnabled()) {
                    if (this.lastWrite == stopAndWaitArqData) {
                        Logger logger = LOG;
                        ChannelId id = channelHandlerContext.channel().id();
                        Objects.requireNonNull(id);
                        logger.trace("[{}] Got no ACK for current DATA. Send again.", id::asShortText);
                    }
                    this.lastWrite = stopAndWaitArqData;
                }
                Logger logger2 = LOG;
                ChannelId id2 = channelHandlerContext.channel().id();
                Objects.requireNonNull(id2);
                logger2.trace("[{}] Write {}", id2::asShortText, () -> {
                    return stopAndWaitArqData;
                });
                channelHandlerContext.writeAndFlush(stopAndWaitArqData.retainedDuplicate()).addListener(channelFuture -> {
                    if (!channelFuture.isSuccess()) {
                        channelPromise.tryFailure(channelFuture.cause());
                        Logger logger3 = LOG;
                        ChannelId id3 = channelHandlerContext.channel().id();
                        Objects.requireNonNull(id3);
                        Objects.requireNonNull(channelFuture);
                        logger3.trace("[{}] Unable to write {}:", id3::asShortText, () -> {
                            return stopAndWaitArqData;
                        }, channelFuture::cause);
                    }
                    channelHandlerContext.executor().schedule(() -> {
                        writeNextPending(channelHandlerContext);
                    }, this.retryTimeout, TimeUnit.MILLISECONDS);
                });
                return;
            }
            this.pendingWrites.remove();
        }
    }

    private void succeedCurrentWrite(ChannelHandlerContext channelHandlerContext) {
        StopAndWaitArqData stopAndWaitArqData = (StopAndWaitArqData) this.pendingWrites.current();
        if (stopAndWaitArqData != null) {
            Logger logger = LOG;
            ChannelId id = channelHandlerContext.channel().id();
            Objects.requireNonNull(id);
            logger.trace("[{}] Succeed {}.", id::asShortText, () -> {
                return stopAndWaitArqData;
            });
            this.pendingWrites.remove().trySuccess();
        }
        this.lastWriteAttempt = 0L;
    }

    private void discardPendingWrites(ChannelHandlerContext channelHandlerContext, Throwable th) {
        Logger logger = LOG;
        ChannelId id = channelHandlerContext.channel().id();
        Objects.requireNonNull(id);
        PendingWriteQueue pendingWriteQueue = this.pendingWrites;
        Objects.requireNonNull(pendingWriteQueue);
        logger.trace("[{}] Discard {} pending writes:", id::asShortText, pendingWriteQueue::size, () -> {
            return th;
        });
        this.pendingWrites.removeAndFailAll(th);
    }

    private void writeAck(ChannelHandlerContext channelHandlerContext) {
        if (channelHandlerContext.channel().isOpen()) {
            StopAndWaitArqAck stopAndWaitArqAck = this.expectedInboundSequenceNo ? StopAndWaitArqAck.STOP_AND_WAIT_ACK_1 : StopAndWaitArqAck.STOP_AND_WAIT_ACK_0;
            Logger logger = LOG;
            ChannelId id = channelHandlerContext.channel().id();
            Objects.requireNonNull(id);
            logger.trace("[{}] Write {}", id::asShortText, () -> {
                return stopAndWaitArqAck;
            });
            channelHandlerContext.writeAndFlush(stopAndWaitArqAck).addListener(channelFuture -> {
                if (channelFuture.isSuccess()) {
                    return;
                }
                Logger logger2 = LOG;
                ChannelId id2 = channelFuture.channel().id();
                Objects.requireNonNull(id2);
                Objects.requireNonNull(channelFuture);
                logger2.trace("[{}] Unable to send {}:", id2::asShortText, () -> {
                    return stopAndWaitArqAck;
                }, channelFuture::cause);
                channelFuture.channel().close();
            });
        }
    }
}
