package org.minbox.framework.message.pipe.server;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.minbox.framework.message.pipe.core.Message;
import org.minbox.framework.message.pipe.core.exception.MessagePipeException;
import org.minbox.framework.message.pipe.server.config.MessagePipeConfiguration;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/minbox/framework/message/pipe/server/MessagePipe.class */
public class MessagePipe {
    private static final Logger log = LoggerFactory.getLogger(MessagePipe.class);
    private String name;
    private String queueName;
    private AtomicInteger lastMessageCount = new AtomicInteger(0);
    private AtomicLong lastProcessTimeMillis = new AtomicLong(System.currentTimeMillis());
    private RedissonClient redissonClient;
    private MessagePipeConfiguration configuration;

    public MessagePipe(String str, RedissonClient redissonClient, MessagePipeConfiguration messagePipeConfiguration) {
        this.name = str;
        this.queueName = LockNames.MESSAGE_QUEUE.format(this.name);
        this.redissonClient = redissonClient;
        this.configuration = messagePipeConfiguration;
        if (this.name == null || this.name.trim().length() == 0) {
            throw new MessagePipeException("The MessagePipe name is required，cannot be null.");
        }
        if (this.redissonClient == null) {
            throw new MessagePipeException("The RedissonClient cannot be null.");
        }
        if (this.configuration == null) {
            throw new MessagePipeException("The MessagePipeConfiguration cannot be null.");
        }
    }

    public void put(Message message) {
        RLock lock = this.redissonClient.getLock(LockNames.PUT_MESSAGE.format(this.name));
        lock.lock();
        if (Thread.currentThread().isInterrupted()) {
            return;
        }
        try {
            try {
                RBlockingQueue blockingQueue = this.redissonClient.getBlockingQueue(LockNames.MESSAGE_QUEUE.format(this.name));
                boolean offer = blockingQueue.offer(message);
                this.lastMessageCount.set(blockingQueue.size());
                if (offer) {
                } else {
                    throw new MessagePipeException("Unsuccessful when writing the message to the queue.");
                }
            } catch (Exception e) {
                this.configuration.getExceptionHandler().handleException(e, message);
                lock.unlock();
            }
        } finally {
            lock.unlock();
        }
    }

    public void lockHandleTheFirst(Function<Message, Boolean> function) {
        Message message = null;
        RLock lock = this.redissonClient.getLock(LockNames.TAKE_MESSAGE.format(this.name));
        log.debug("lock:" + lock.toString() + ",interrupted:" + Thread.currentThread().isInterrupted() + ",hold:" + lock.isHeldByCurrentThread() + ",threadId:" + Thread.currentThread().getId());
        try {
            try {
                MessagePipeConfiguration.LockTime lockTime = this.configuration.getLockTime();
                if (lock.tryLock(lockTime.getWaitTime(), lockTime.getLeaseTime(), lockTime.getTimeUnit())) {
                    log.debug("Thread：{}, acquired lock.", Long.valueOf(Thread.currentThread().getId()));
                    RBlockingQueue blockingQueue = this.redissonClient.getBlockingQueue(this.queueName);
                    this.lastMessageCount.set(blockingQueue.size());
                    message = (Message) blockingQueue.peek();
                    if (message != null ? function.apply(message).booleanValue() : false) {
                        this.lastProcessTimeMillis.set(Long.valueOf(System.currentTimeMillis()).longValue());
                        blockingQueue.poll();
                    }
                }
                if (!checkClientIsShutdown() && lock.isLocked() && lock.isHeldByCurrentThread()) {
                    lock.unlock();
                }
            } catch (Exception e) {
                this.configuration.getExceptionHandler().handleException(e, message);
                if (!checkClientIsShutdown() && lock.isLocked() && lock.isHeldByCurrentThread()) {
                    lock.unlock();
                }
            }
        } catch (Throwable th) {
            if (!checkClientIsShutdown() && lock.isLocked() && lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
            throw th;
        }
    }

    public Message peek() {
        Message message = null;
        if (!checkClientIsShutdown()) {
            message = (Message) this.redissonClient.getBlockingQueue(this.queueName).peek();
        }
        return message;
    }

    public Message poll() {
        Message message = null;
        if (!checkClientIsShutdown()) {
            message = (Message) this.redissonClient.getBlockingQueue(this.queueName).poll();
        }
        return message;
    }

    public int size() {
        int i = 0;
        if (!checkClientIsShutdown()) {
            i = this.redissonClient.getBlockingQueue(this.queueName).size();
            this.lastMessageCount.set(i);
        }
        return i;
    }

    public Long getLastProcessTimeMillis() {
        return Long.valueOf(this.lastProcessTimeMillis.get());
    }

    public int getLastMessageCount() {
        return this.lastMessageCount.get();
    }

    private boolean checkClientIsShutdown() {
        return this.redissonClient.isShutdown() || this.redissonClient.isShuttingDown();
    }

    public String getName() {
        return this.name;
    }

    public MessagePipeConfiguration getConfiguration() {
        return this.configuration;
    }
}
