package infra.web.socket.handler;

import infra.lang.Nullable;
import infra.logging.Logger;
import infra.logging.LoggerFactory;
import infra.web.socket.BinaryMessage;
import infra.web.socket.CloseStatus;
import infra.web.socket.Message;
import infra.web.socket.PingMessage;
import infra.web.socket.PongMessage;
import infra.web.socket.TextMessage;
import infra.web.socket.WebSocketSession;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

/* loaded from: input_file:infra/web/socket/handler/ConcurrentWebSocketSessionDecorator.class */
public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorator {
    private static final Logger logger = LoggerFactory.getLogger(ConcurrentWebSocketSessionDecorator.class);
    private final int sendTimeLimit;
    private final int bufferSizeLimit;
    private final OverflowStrategy overflowStrategy;

    @Nullable
    private Consumer<Message<?>> preSendCallback;
    private final LinkedBlockingQueue<Message<?>> buffer;
    private final AtomicInteger bufferSize;
    private volatile long sendStartTime;
    private volatile boolean limitExceeded;
    private volatile boolean closeInProgress;
    private final ReentrantLock flushLock;
    private final ReentrantLock closeLock;

    /* loaded from: input_file:infra/web/socket/handler/ConcurrentWebSocketSessionDecorator$OverflowStrategy.class */
    public enum OverflowStrategy {
        TERMINATE,
        DROP
    }

    public ConcurrentWebSocketSessionDecorator(WebSocketSession webSocketSession, int i, int i2) {
        this(webSocketSession, i, i2, OverflowStrategy.TERMINATE);
    }

    public ConcurrentWebSocketSessionDecorator(WebSocketSession webSocketSession, int i, int i2, OverflowStrategy overflowStrategy) {
        super(webSocketSession);
        this.buffer = new LinkedBlockingQueue<>();
        this.bufferSize = new AtomicInteger();
        this.flushLock = new ReentrantLock();
        this.closeLock = new ReentrantLock();
        this.sendTimeLimit = i;
        this.bufferSizeLimit = i2;
        this.overflowStrategy = overflowStrategy;
    }

    public int getSendTimeLimit() {
        return this.sendTimeLimit;
    }

    public int getBufferSizeLimit() {
        return this.bufferSizeLimit;
    }

    public int getBufferSize() {
        return this.bufferSize.get();
    }

    public long getTimeSinceSendStarted() {
        long j = this.sendStartTime;
        if (j > 0) {
            return System.currentTimeMillis() - j;
        }
        return 0L;
    }

    public void setMessageCallback(Consumer<Message<?>> consumer) {
        this.preSendCallback = consumer;
    }

    @Override // infra.web.socket.handler.WebSocketSessionDecorator, infra.web.socket.WebSocketSession
    public void sendText(CharSequence charSequence) throws IOException {
        sendMessage(new TextMessage(charSequence, true));
    }

    @Override // infra.web.socket.handler.WebSocketSessionDecorator, infra.web.socket.WebSocketSession
    public void sendBinary(ByteBuffer byteBuffer) throws IOException {
        sendMessage(new BinaryMessage(byteBuffer));
    }

    @Override // infra.web.socket.handler.WebSocketSessionDecorator, infra.web.socket.WebSocketSession
    public void sendPing() throws IOException {
        sendMessage(new PingMessage());
    }

    @Override // infra.web.socket.handler.WebSocketSessionDecorator, infra.web.socket.WebSocketSession
    public void sendPong() throws IOException {
        sendMessage(new PongMessage());
    }

    @Override // infra.web.socket.handler.WebSocketSessionDecorator, infra.web.socket.WebSocketSession
    public void sendMessage(Message<?> message) throws IOException {
        if (shouldNotSend()) {
            return;
        }
        this.buffer.add(message);
        this.bufferSize.addAndGet(message.getPayloadLength());
        if (this.preSendCallback != null) {
            this.preSendCallback.accept(message);
        }
        boolean isTraceEnabled = logger.isTraceEnabled();
        while (tryFlushMessageBuffer()) {
            if (this.buffer.isEmpty() || shouldNotSend()) {
                return;
            }
        }
        if (isTraceEnabled) {
            logger.trace("Another send already in progress: session id '{}':, \"in-progress\" send time {} (ms), buffer size {} bytes", new Object[]{getId(), Long.valueOf(getTimeSinceSendStarted()), Integer.valueOf(getBufferSize())});
        }
        checkSessionLimits();
    }

    private boolean shouldNotSend() {
        return this.limitExceeded || this.closeInProgress;
    }

    private boolean tryFlushMessageBuffer() throws IOException {
        if (!this.flushLock.tryLock()) {
            return false;
        }
        while (true) {
            try {
                Message<?> poll = this.buffer.poll();
                if (poll == null || shouldNotSend()) {
                    break;
                }
                this.bufferSize.addAndGet(-poll.getPayloadLength());
                this.sendStartTime = System.currentTimeMillis();
                this.delegate.sendMessage(poll);
                this.sendStartTime = 0L;
            } finally {
                this.sendStartTime = 0L;
                this.flushLock.unlock();
            }
        }
        return true;
    }

    private void checkSessionLimits() {
        Message<?> poll;
        if (shouldNotSend() || !this.closeLock.tryLock()) {
            return;
        }
        try {
            if (getTimeSinceSendStarted() > getSendTimeLimit()) {
                limitExceeded("Send time %d (ms) for session '%s' exceeded the allowed limit %d".formatted(Long.valueOf(getTimeSinceSendStarted()), getId(), Integer.valueOf(getSendTimeLimit())));
            } else if (getBufferSize() > getBufferSizeLimit()) {
                switch (this.overflowStrategy) {
                    case TERMINATE:
                        limitExceeded("Buffer size %d bytes for session '%s' exceeds the allowed limit %d".formatted(Integer.valueOf(getBufferSize()), getId(), Integer.valueOf(getBufferSizeLimit())));
                        break;
                    case DROP:
                        int i = 0;
                        while (getBufferSize() > getBufferSizeLimit() && (poll = this.buffer.poll()) != null) {
                            this.bufferSize.addAndGet(-poll.getPayloadLength());
                            i++;
                        }
                        if (logger.isDebugEnabled()) {
                            logger.debug("Dropped {} messages, buffer size: {}", Integer.valueOf(i), Integer.valueOf(getBufferSize()));
                        }
                        break;
                    default:
                        throw new IllegalStateException("Unexpected OverflowStrategy: " + this.overflowStrategy);
                }
            }
        } finally {
            this.closeLock.unlock();
        }
    }

    private void limitExceeded(String str) {
        this.limitExceeded = true;
        throw new SessionLimitExceededException(str, CloseStatus.SESSION_NOT_RELIABLE);
    }

    @Override // infra.web.socket.handler.WebSocketSessionDecorator, infra.web.socket.WebSocketSession
    public void close() throws IOException {
        close(CloseStatus.NORMAL);
    }

    @Override // infra.web.socket.handler.WebSocketSessionDecorator, infra.web.socket.WebSocketSession
    public void close(CloseStatus closeStatus) throws IOException {
        if (this.closeLock.tryLock()) {
            try {
                if (this.closeInProgress) {
                    return;
                }
                if (!CloseStatus.SESSION_NOT_RELIABLE.equals(closeStatus)) {
                    try {
                        checkSessionLimits();
                    } catch (SessionLimitExceededException e) {
                    }
                    if (this.limitExceeded) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Changing close status {} to SESSION_NOT_RELIABLE.", closeStatus);
                        }
                        closeStatus = CloseStatus.SESSION_NOT_RELIABLE;
                    }
                }
                this.closeInProgress = true;
                super.close(closeStatus);
            } finally {
                this.closeLock.unlock();
            }
        }
    }

    @Override // infra.web.socket.handler.WebSocketSessionDecorator
    public String toString() {
        return this.delegate.toString();
    }
}
