package com.gs.fw.common.mithra.notification.server;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.collections.impl.set.mutable.UnifiedSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/gs/fw/common/mithra/notification/server/ServerSocketHandler.class */
public class ServerSocketHandler extends SocketHandler {
    public static final int SERVER_PING_PERIOD = 60000;
    public static final int CLIENT_PING_PERIOD = 120000;
    private static AtomicInteger clientIdPool = new AtomicInteger(0);
    private int clientId;
    private Socket socket;
    private OutputStream socketOutputStream;
    private NotificationServer server;
    private ServerReaderThread readerThread;
    private ServerWriterThread writerThread;
    private int lastIncompleteMessageId;
    private String lastSubject;
    private int lastClonedMessageId;
    private volatile long abortTime;
    private volatile long startTime;
    protected Logger logger = LoggerFactory.getLogger((Class<?>) ServerSocketHandler.class);
    private boolean expectingNewMessage = true;
    private int lastPacketNumber = -1;
    private final UnifiedSet<String> subscribedSubjects = new UnifiedSet<>();

    public ServerSocketHandler(Socket socket, NotificationServer notificationServer) {
        this.socket = socket;
        this.server = notificationServer;
    }

    public void start() {
        this.startTime = System.currentTimeMillis();
        InetSocketAddress inetSocketAddress = (InetSocketAddress) this.socket.getRemoteSocketAddress();
        this.diagnosticMessage = " Host: " + inetSocketAddress.getAddress().getHostAddress() + " (" + inetSocketAddress.getHostName() + "): " + inetSocketAddress.getPort();
        try {
            this.socket.setTcpNoDelay(true);
            this.socket.setKeepAlive(true);
            this.socket.setSoTimeout(240000);
            InputStream inputStreamFromSocket = getInputStreamFromSocket();
            this.socketOutputStream = getOutputStreamFromSocket();
            this.readerThread = new ServerReaderThread(inputStreamFromSocket, this);
            this.readerThread.start();
        } catch (IOException e) {
            this.logger.error("could not get stream from socket", (Throwable) e);
            closeSocket();
            this.server.removeUnestablished(this);
        }
    }

    public long getStartTime() {
        return this.startTime;
    }

    protected OutputStream getOutputStreamFromSocket() throws IOException {
        return this.socket.getOutputStream();
    }

    protected InputStream getInputStreamFromSocket() throws IOException {
        return this.socket.getInputStream();
    }

    public int getClientId() {
        return this.clientId;
    }

    public long getAbortTime() {
        return this.abortTime;
    }

    public synchronized void abort(String str, Throwable th) {
        this.abortTime = System.currentTimeMillis();
        if (this.readerThread != null) {
            this.readerThread.abort();
        }
        if (this.writerThread != null) {
            this.writerThread.abort();
        }
        this.logger.warn(str + this.diagnosticMessage, th);
        closeSocket();
        this.readerThread = null;
        this.writerThread = null;
        this.server.abort(this);
    }

    public void closeSocket() {
        try {
            if (this.socket != null) {
                this.socket.close();
            }
        } catch (IOException e) {
            this.logger.warn("could not close socket " + this.diagnosticMessage, (Throwable) e);
        }
        this.socket = null;
    }

    private void createWriterThread() {
        this.writerThread = new ServerWriterThread(this.socketOutputStream, this);
        this.writerThread.start();
    }

    private void assignNewClientId() {
        int incrementAndGet = clientIdPool.incrementAndGet();
        if (incrementAndGet == getSenderId()) {
            incrementAndGet = clientIdPool.incrementAndGet();
        }
        this.clientId = incrementAndGet;
    }

    public void respondToEstablish(Message message) {
        respondToEstablishWithNewClentId((byte) 2);
    }

    private void respondToEstablishWithNewClentId(byte b) {
        Message message = new Message(b, getSenderId());
        assignNewClientId();
        message.setMessageId(getNextMessageId());
        message.setPacketNumber(0);
        message.setPacketStatus((byte) 2);
        message.setPayloadSize(4);
        message.setPayload(new byte[4]);
        message.writeIntInPayload(0, this.clientId);
        this.writerQueue.addFirst(message);
        this.server.markEstablished(this);
        createWriterThread();
    }

    public void respondToReestablish(Message message) {
        if (message.readIntFromPayload(0) != getSenderId()) {
            respondToReestablishWithServerRecycle(message);
            return;
        }
        int senderId = message.getSenderId();
        ServerSocketHandler existingHandler = this.server.getExistingHandler(senderId);
        if (existingHandler == null) {
            respondToReestablishWithServerRecycle(message);
            return;
        }
        sendAck(message);
        this.writerQueue.addAll(existingHandler.unAcknowledgedMessages);
        this.writerQueue.addAll(existingHandler.writerQueue);
        this.subscribedSubjects.addAll(existingHandler.subscribedSubjects);
        this.lastClonedMessageId = existingHandler.lastClonedMessageId;
        this.lastIncompleteMessageId = existingHandler.lastIncompleteMessageId;
        this.lastPacketNumber = existingHandler.lastPacketNumber;
        this.lastSubject = existingHandler.lastSubject;
        this.clientId = senderId;
        this.server.markEstablished(this);
        this.server.removeAborted(existingHandler);
        createWriterThread();
    }

    private void respondToReestablishWithServerRecycle(Message message) {
        respondToEstablishWithNewClentId((byte) 4);
    }

    public void debugReceivedMessage(Message message) {
        this.server.gatherReceivedStats(message);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Received " + message.getPrintableHeader() + this.diagnosticMessage);
        }
    }

    public void debugSendMessage(Message message) {
        this.server.gatherSendStats(message);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Sending " + message.getPrintableHeader() + this.diagnosticMessage);
        }
    }

    @Override // com.gs.fw.common.mithra.notification.server.SocketHandler
    protected int getSenderId() {
        return this.server.getServerId();
    }

    @Override // com.gs.fw.common.mithra.notification.server.SocketHandler
    protected int getNextMessageId() {
        return this.server.getNextMessageId();
    }

    public void respondToNotify(Message message) throws IOException {
        if (this.expectingNewMessage) {
            if (message.getPacketNumber() != 0) {
                if (message.getMessageId() > this.lastIncompleteMessageId) {
                    this.server.broadcastAbort(message, this.lastClonedMessageId, this.lastSubject);
                    abort("message out of sequence", new IOException("Unexpected packet number " + message.getPrintableHeader() + " " + this.diagnosticMessage));
                    return;
                } else {
                    this.logger.warn("Ignoring possible retransmit. Current message id: " + this.lastIncompleteMessageId + " ignored: " + message.getPrintableHeader() + " " + this.diagnosticMessage);
                    sendAck(message);
                    return;
                }
            }
            this.lastIncompleteMessageId = message.getMessageId();
            this.lastSubject = message.readStringFromPayload(0);
            this.lastClonedMessageId = getNextMessageId();
            this.lastPacketNumber = 0;
            this.expectingNewMessage = false;
        } else {
            if (this.lastIncompleteMessageId != message.getMessageId()) {
                if (this.lastIncompleteMessageId > message.getMessageId()) {
                    this.logger.warn("Ignoring possible retransmit. Current message id: " + this.lastIncompleteMessageId + " ignored: " + message.getPrintableHeader() + " " + this.diagnosticMessage);
                    sendAck(message);
                    return;
                } else {
                    this.server.broadcastAbort(message, this.lastClonedMessageId, this.lastSubject);
                    abort("packet out of sequence", new IOException("expecting packet " + (this.lastPacketNumber + 1) + " for message " + this.lastIncompleteMessageId + " but got " + message.getPrintableHeader() + " " + this.diagnosticMessage));
                    return;
                }
            }
            if (this.lastPacketNumber + 1 != message.getPacketNumber()) {
                if (message.getPacketNumber() <= this.lastPacketNumber) {
                    sendAck(message);
                    return;
                } else {
                    this.server.broadcastAbort(message, this.lastClonedMessageId, this.lastSubject);
                    abort("packet out of sequence", new IOException("expecting packet " + (this.lastPacketNumber + 1) + " for message " + this.lastIncompleteMessageId + " but got " + message.getPrintableHeader() + " " + this.diagnosticMessage));
                    return;
                }
            }
            this.lastPacketNumber++;
        }
        if (message.getPacketStatus() == 2) {
            this.expectingNewMessage = true;
        }
        this.server.broadcastNotify(message, this.lastClonedMessageId, this.lastSubject);
        sendAck(message);
    }

    public void sendNotifyMessage(Message message, String str) throws IOException {
        queueIfSubscribed(message, str);
    }

    public void sendAbortMessage(Message message, String str) {
        queueIfSubscribed(message, str);
    }

    private void queueIfSubscribed(Message message, String str) {
        boolean contains;
        synchronized (this.subscribedSubjects) {
            contains = this.subscribedSubjects.contains(str);
        }
        if (contains) {
            this.writerQueue.add(message);
        }
    }

    public void respondToSubscribe(Message message) throws IOException {
        int readIntFromPayload = message.readIntFromPayload(0);
        int i = 4;
        for (int i2 = 0; i2 < readIntFromPayload; i2++) {
            int readIntFromPayload2 = message.readIntFromPayload(i);
            String readStringFromPayload = message.readStringFromPayload(i);
            synchronized (this.subscribedSubjects) {
                this.subscribedSubjects.add(readStringFromPayload);
            }
            i += readIntFromPayload2 + 4;
        }
        sendAck(message);
    }

    public void respondToPing(Message message) {
        sendAck(message);
    }

    public void respondToShutdown(Message message) {
        if (this.readerThread != null) {
            this.readerThread.abort();
        }
        if (this.writerThread != null) {
            this.writerThread.abort();
        }
        closeSocket();
        this.readerThread = null;
        this.writerThread = null;
        this.server.removeHandler(this);
    }
}
