package org.yamcs.replication;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.ScheduledFuture;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.yamcs.logging.Log;
import org.yamcs.replication.ReplicationMaster;
import org.yamcs.replication.protobuf.Request;
import org.yamcs.replication.protobuf.Response;
import org.yamcs.replication.protobuf.TimeMessage;
import org.yamcs.replication.protobuf.Wakeup;
import org.yamcs.time.TimeService;
import org.yamcs.utils.DecodingException;

/* loaded from: input_file:org/yamcs/replication/MasterChannelHandler.class */
public class MasterChannelHandler extends ChannelInboundHandlerAdapter {
    final ReplicationMaster replMaster;
    final TimeService timeService;
    final Log log;
    Request req;
    private ChannelHandlerContext channelHandlerContext;
    ChannelFuture dataHandlingFuture;
    ReplicationFile currentFile;
    long nextTxToSend;
    ReplicationTail fileTail;
    ReplicationMaster.SlaveServer slaveServer;
    private ScheduledFuture<?> timeMsgFuture;

    public MasterChannelHandler(TimeService timeService, ReplicationMaster replicationMaster, ReplicationMaster.SlaveServer slaveServer) {
        this.replMaster = replicationMaster;
        this.slaveServer = slaveServer;
        this.req = null;
        this.timeService = timeService;
        this.log = new Log(MasterChannelHandler.class, replicationMaster.getYamcsInstance());
    }

    public MasterChannelHandler(TimeService timeService, ReplicationMaster replicationMaster, Request request) {
        this.replMaster = replicationMaster;
        this.req = request;
        this.timeService = timeService;
        this.log = new Log(MasterChannelHandler.class, replicationMaster.getYamcsInstance());
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        ByteBuf byteBuf = (ByteBuf) obj;
        try {
            doChannelRead(channelHandlerContext, byteBuf);
            byteBuf.release();
        } catch (Throwable th) {
            byteBuf.release();
            throw th;
        }
    }

    private void doChannelRead(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        try {
            Message decode = Message.decode(byteBuf.nioBuffer());
            if (decode.type == 2) {
                this.req = decode.protoMsg;
                processRequest();
            } else {
                if (decode.type != 3) {
                    this.log.warn("Unexpected message type {} received, closing the connection", Byte.valueOf(decode.type));
                    channelHandlerContext.close();
                    return;
                }
                Response response = decode.protoMsg;
                if (response.getResult() == 0) {
                    this.log.info("Received response {}", response);
                } else {
                    this.log.warn("Received negative response: {}, closing the connection", response.getErrorMsg());
                    channelHandlerContext.close();
                }
            }
        } catch (DecodingException e) {
            this.log.warn("Failed to decode message", e);
            channelHandlerContext.close();
        }
    }

    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.handlerAdded(channelHandlerContext);
        this.channelHandlerContext = channelHandlerContext;
        if (this.req != null) {
            processRequest();
        }
    }

    public void shutdown() {
        this.channelHandlerContext.close();
        if (this.timeMsgFuture != null) {
            this.timeMsgFuture.cancel(true);
        }
    }

    private void processRequest() {
        if (this.dataHandlingFuture != null) {
            this.dataHandlingFuture.cancel(true);
        }
        if (this.req.hasStartTxId()) {
            this.nextTxToSend = this.req.getStartTxId();
        } else {
            this.log.info("The slave did not provide a startTxId, starting from 0");
            this.nextTxToSend = 0L;
        }
        scheduleTimeMsgs();
        goToNextFile();
    }

    private void scheduleTimeMsgs() {
        this.timeMsgFuture = this.channelHandlerContext.executor().scheduleAtFixedRate(this::sendTimeMsg, 0L, this.replMaster.timeMsgFreqMillis, TimeUnit.MILLISECONDS);
    }

    void goToNextFile() {
        this.log.trace("Looking for a new file for transaction {}", Long.valueOf(this.nextTxToSend));
        this.currentFile = this.replMaster.getFile(this.nextTxToSend);
        if (this.currentFile == null) {
            this.log.warn("next TX to send {} is in the future, checking back in 60 seconds", Long.valueOf(this.nextTxToSend));
            ReplicationServer.workerGroup.schedule(() -> {
                goToNextFile();
            }, 60L, TimeUnit.SECONDS);
            return;
        }
        this.log.trace("Found file with firstTxId={} nextTxId={}", Long.valueOf(this.currentFile.getFirstId()), Long.valueOf(this.currentFile.getNextTxId()));
        if (this.nextTxToSend < this.currentFile.getFirstId()) {
            this.log.warn("Requested start from {} but the first available transaction is {}. Replaying from there", Long.valueOf(this.nextTxToSend), Long.valueOf(this.currentFile.getFirstId()));
            this.nextTxToSend = this.currentFile.getFirstId();
        } else if (this.nextTxToSend > this.currentFile.getFirstId()) {
            Iterator<ByteBuffer> metadataIterator = this.currentFile.metadataIterator();
            while (metadataIterator.hasNext()) {
                ByteBuffer next = metadataIterator.next();
                long j = next.getLong(next.position() + 8);
                if (j >= this.nextTxToSend) {
                    break;
                }
                this.log.debug("Sending metadata TX{} length: {} ", Long.valueOf(j), Integer.valueOf(next.remaining()));
                this.channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(next));
            }
        }
        this.fileTail = null;
        sendMoreData();
    }

    void sendMoreData() {
        if (this.channelHandlerContext.channel().isActive()) {
            if (this.fileTail == null) {
                this.fileTail = this.currentFile.tail(this.nextTxToSend);
            } else {
                this.currentFile.getNewData(this.fileTail);
            }
            this.log.trace("nextTxToSend: {}, FileTail: {} ", Long.valueOf(this.nextTxToSend), this.fileTail);
            if (this.fileTail.nextTxId != this.nextTxToSend) {
                this.dataHandlingFuture = this.channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(this.fileTail.buf)).addListener(future -> {
                    this.fileTail.buf.position(this.fileTail.buf.limit());
                    this.nextTxToSend = this.fileTail.nextTxId;
                    sendMoreData();
                });
            } else if (this.fileTail.eof) {
                goToNextFile();
            } else {
                ReplicationServer.workerGroup.schedule(() -> {
                    sendMoreData();
                }, 200L, TimeUnit.MILLISECONDS);
            }
        }
    }

    public long getNextTxId() {
        return this.nextTxToSend;
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        this.log.warn("Caught exception {}", th.getMessage());
    }

    private void sendTimeMsg() {
        this.channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(Message.get(TimeMessage.newBuilder().setLocalTime(System.currentTimeMillis()).setMissionTime(this.timeService.getMissionTime()).setSpeed(this.timeService.getSpeed()).m528build()).encode()));
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        this.log.debug("Connection {} opened, sending a wakeup message", channelHandlerContext.channel().remoteAddress());
        channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(Message.get(Wakeup.newBuilder().setYamcsInstance(this.slaveServer.instance).m575build()).encode()));
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.log.info("Replication connection {} closed", channelHandlerContext.channel().remoteAddress());
        super.channelInactive(channelHandlerContext);
        if (this.dataHandlingFuture != null) {
            this.dataHandlingFuture.cancel(true);
        }
    }
}
