package org.yamcs.replication;

import com.google.common.io.ByteStreams;
import com.google.protobuf.TextFormat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import org.yamcs.AbstractYamcsService;
import org.yamcs.InitException;
import org.yamcs.Spec;
import org.yamcs.YConfiguration;
import org.yamcs.YamcsException;
import org.yamcs.replication.ReplicationSlave;
import org.yamcs.replication.protobuf.Request;
import org.yamcs.replication.protobuf.Response;
import org.yamcs.replication.protobuf.Wakeup;
import org.yamcs.utils.DecodingException;

/* loaded from: input_file:org/yamcs/replication/ReplicationServer.class */
public class ReplicationServer extends AbstractYamcsService {
    int port;
    static final EventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap serverBootstrap;
    private Map<String, ReplicationMaster> masters = new HashMap();
    private Map<String, ReplicationSlave> slaves = new HashMap();
    Set<Channel> activeChannels = Collections.newSetFromMap(new ConcurrentHashMap());
    SslContext sslCtx;
    int maxTupleSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/yamcs/replication/ReplicationServer$MyChannelHandler.class */
    public class MyChannelHandler extends ChannelInboundHandlerAdapter {
        ChannelHandlerContext channelHandlerContext;

        MyChannelHandler() {
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            try {
                try {
                    Message decode = Message.decode(((ByteBuf) obj).nioBuffer());
                    ((ByteBuf) obj).release();
                    if (decode.type == 1) {
                        processWakeup((Wakeup) decode.protoMsg);
                    } else if (decode.type == 2) {
                        processRequest((Request) decode.protoMsg);
                    } else {
                        ReplicationServer.this.log.warn("Unexpected message type {} received, closing the connection", Byte.valueOf(decode.type));
                        channelHandlerContext.close();
                    }
                } catch (DecodingException e) {
                    ReplicationServer.this.log.warn("Failed to decode message", e);
                    sendErrorReturn(0, "Failed to decode message: " + e.getMessage());
                    ((ByteBuf) obj).release();
                }
            } catch (Throwable th) {
                ((ByteBuf) obj).release();
                throw th;
            }
        }

        private void processWakeup(Wakeup wakeup) {
            ReplicationServer.this.log.debug("Received wakeup message: {}", TextFormat.shortDebugString(wakeup));
            verifyAuth(wakeup.getAuthToken());
            if (!wakeup.hasYamcsInstance()) {
                sendErrorReturn(0, "instance not present in the request");
                return;
            }
            ReplicationSlave replicationSlave = ReplicationServer.this.slaves.get(wakeup.getYamcsInstance());
            if (replicationSlave == null) {
                ReplicationServer.this.log.warn("No replication slave registered for instance '{}'", wakeup.getYamcsInstance());
                sendErrorReturn(0, "No replication slave registered for instance '" + wakeup.getYamcsInstance() + "''");
                return;
            }
            try {
                ChannelHandler newChannelHandler = replicationSlave.newChannelHandler();
                ChannelPipeline pipeline = this.channelHandlerContext.channel().pipeline();
                pipeline.remove(this);
                pipeline.addLast(new ChannelHandler[]{newChannelHandler});
            } catch (YamcsException e) {
                ReplicationServer.this.log.warn("Got exception when creating a slave handler: " + e);
                sendErrorReturn(0, e.toString());
            }
        }

        private void processRequest(Request request) {
            if (!request.hasYamcsInstance()) {
                sendErrorReturn(0, "instance not present in the request");
                return;
            }
            ReplicationMaster replicationMaster = ReplicationServer.this.masters.get(request.getYamcsInstance());
            if (replicationMaster == null) {
                ReplicationServer.this.log.warn("Received a replication request for non registered master: {}", TextFormat.shortDebugString(request));
                sendErrorReturn(request.getRequestSeq(), "No replication master registered for instance '" + request.getYamcsInstance() + "''");
            } else {
                ReplicationServer.this.log.debug("Received a replication request: {}, starting a new handler on the master", TextFormat.shortDebugString(request));
                ChannelPipeline pipeline = this.channelHandlerContext.channel().pipeline();
                pipeline.remove(this);
                pipeline.addLast(new ChannelHandler[]{replicationMaster.newChannelHandler(request)});
            }
        }

        private void verifyAuth(String str) {
        }

        private void sendErrorReturn(int i, String str) {
            this.channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(Message.get(Response.newBuilder().setRequestSeq(i).setResult(-1).setErrorMsg(str).m502build()).encode()));
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
            ReplicationServer.this.log.debug("New connection from {}", channelHandlerContext.channel().remoteAddress());
        }

        public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.channelHandlerContext = channelHandlerContext;
            ReplicationServer.this.activeChannels.add(channelHandlerContext.channel());
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            ReplicationServer.this.log.debug("Connection {} closed", channelHandlerContext.channel().remoteAddress());
            ReplicationServer.this.activeChannels.remove(channelHandlerContext.channel());
        }
    }

    @Override // org.yamcs.YamcsService
    public Spec getSpec() {
        Spec spec = new Spec();
        spec.addOption("port", Spec.OptionType.INTEGER).withRequired(true);
        spec.addOption("tlsCert", Spec.OptionType.ANY);
        spec.addOption("tlsKey", Spec.OptionType.STRING);
        spec.addOption("maxTupleSize", Spec.OptionType.INTEGER).withDefault(131072);
        return spec;
    }

    @Override // org.yamcs.AbstractYamcsService, org.yamcs.YamcsService
    public void init(String str, String str2, YConfiguration yConfiguration) throws InitException {
        super.init(str, str2, yConfiguration);
        this.port = yConfiguration.getInt("port");
        this.maxTupleSize = yConfiguration.getInt("maxTupleSize");
        if (yConfiguration.containsKey("tlsCert")) {
            String string = yConfiguration.getString("tlsKey");
            List list = yConfiguration.isList("tlsCert") ? yConfiguration.getList("tlsCert") : Arrays.asList(yConfiguration.getString("tlsCert"));
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    InputStream newInputStream = Files.newInputStream(Paths.get((String) it.next(), new String[0]), new OpenOption[0]);
                    try {
                        ByteStreams.copy(newInputStream, byteArrayOutputStream);
                        if (newInputStream != null) {
                            newInputStream.close();
                        }
                    } catch (Throwable th) {
                        if (newInputStream != null) {
                            try {
                                newInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
                try {
                    FileInputStream fileInputStream = new FileInputStream(string);
                    try {
                        this.sslCtx = SslContextBuilder.forServer(byteArrayInputStream, fileInputStream).build();
                        fileInputStream.close();
                        byteArrayInputStream.close();
                    } catch (Throwable th3) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                        throw th3;
                    }
                } catch (Throwable th5) {
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th6) {
                        th5.addSuppressed(th6);
                    }
                    throw th5;
                }
            } catch (SSLException e) {
                throw new InitException("Failed to initialize TLS: " + e.toString());
            } catch (IOException e2) {
                throw new InitException("Failed to process TLS certificates", e2);
            }
        }
    }

    protected void doStart() {
        this.serverBootstrap = new ServerBootstrap();
        this.serverBootstrap.group(workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { // from class: org.yamcs.replication.ReplicationServer.1
            public void initChannel(SocketChannel socketChannel) throws Exception {
                if (ReplicationServer.this.sslCtx != null) {
                    socketChannel.pipeline().addLast(new ChannelHandler[]{ReplicationServer.this.sslCtx.newHandler(socketChannel.alloc())});
                }
                socketChannel.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(ReplicationServer.this.maxTupleSize, 1, 3)});
                socketChannel.pipeline().addLast(new ChannelHandler[]{new MyChannelHandler()});
            }
        }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
        this.log.debug("Starting replication server on port {}", Integer.valueOf(this.port));
        try {
            this.activeChannels.add(this.serverBootstrap.bind(this.port).sync().channel());
            notifyStarted();
        } catch (InterruptedException e) {
            notifyFailed(e);
        }
    }

    protected void doStop() {
        Iterator<Channel> it = this.activeChannels.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        notifyStopped();
    }

    public void registerMaster(ReplicationMaster replicationMaster) {
        this.masters.put(replicationMaster.getYamcsInstance(), replicationMaster);
    }

    public void registerSlave(ReplicationSlave replicationSlave) {
        this.slaves.put(replicationSlave.getYamcsInstance(), replicationSlave);
    }

    public void unregisterSlave(ReplicationSlave replicationSlave) {
        this.slaves.remove(replicationSlave.getYamcsInstance());
    }

    public List<Channel> getActiveChannels(ReplicationMaster replicationMaster) {
        return (List) this.activeChannels.stream().filter(channel -> {
            for (Map.Entry entry : channel.pipeline()) {
                if (entry.getValue() instanceof MasterChannelHandler) {
                    return replicationMaster.equals(((MasterChannelHandler) entry.getValue()).replMaster);
                }
            }
            return false;
        }).collect(Collectors.toList());
    }

    public List<Channel> getActiveChannels(ReplicationSlave replicationSlave) {
        return (List) this.activeChannels.stream().filter(channel -> {
            for (Map.Entry entry : channel.pipeline()) {
                if (entry.getValue() instanceof ReplicationSlave.SlaveChannelHandler) {
                    return replicationSlave.equals(((ReplicationSlave.SlaveChannelHandler) entry.getValue()).replSlave);
                }
            }
            return false;
        }).collect(Collectors.toList());
    }
}
