package org.minbox.framework.message.pipe.server;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.minbox.framework.message.pipe.core.Message;
import org.minbox.framework.message.pipe.core.exception.MessagePipeException;
import org.minbox.framework.message.pipe.server.config.LockNames;
import org.minbox.framework.message.pipe.server.config.MessagePipeConfiguration;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:org/minbox/framework/message/pipe/server/MessagePipe.class */
public class MessagePipe {
    private static final Logger log = LoggerFactory.getLogger(MessagePipe.class);
    private String name;
    private RBlockingQueue<Message> queue;
    private RedissonClient redissonClient;
    private String queueName;
    private String putLockName;
    private String takeLockName;
    private AtomicLong lastProcessTimeMillis = new AtomicLong(System.currentTimeMillis());
    private boolean runningHandleAll = false;
    private boolean transfer = false;
    private MessagePipeConfiguration configuration;

    public MessagePipe(String str, RedissonClient redissonClient, MessagePipeConfiguration messagePipeConfiguration) {
        this.name = str;
        this.queueName = LockNames.MESSAGE_QUEUE.format(this.name);
        this.putLockName = LockNames.PUT_MESSAGE.format(this.name);
        this.takeLockName = LockNames.TAKE_MESSAGE.format(this.name);
        this.redissonClient = redissonClient;
        this.configuration = messagePipeConfiguration;
        this.queue = redissonClient.getBlockingQueue(this.queueName, messagePipeConfiguration.getCodec());
        if (this.name == null || this.name.trim().length() == 0) {
            throw new MessagePipeException("The MessagePipe name is required，cannot be null.");
        }
        if (this.redissonClient == null) {
            throw new MessagePipeException("The RedissonClient cannot be null.");
        }
        if (this.configuration == null) {
            throw new MessagePipeException("The MessagePipeConfiguration cannot be null.");
        }
    }

    public synchronized void putLast(Message message) {
        this.transfer = true;
        RLock lock = this.redissonClient.getLock(this.putLockName);
        try {
            try {
                MessagePipeConfiguration.LockTime lockTime = this.configuration.getLockTime();
                if (!lock.tryLock(lockTime.getWaitTime(), lockTime.getLeaseTime(), lockTime.getTimeUnit()) || this.queue.offer(message)) {
                } else {
                    throw new MessagePipeException("Unsuccessful when writing the message to the queue.");
                }
            } catch (Exception e) {
                doHandleException(e, message);
                this.transfer = false;
                lock.unlock();
                notifyAll();
            }
        } finally {
            this.transfer = false;
            lock.unlock();
            notifyAll();
        }
    }

    public synchronized void handleFirst(Function<Message, Boolean> function) {
        while (true) {
            if (!this.transfer && !this.runningHandleAll) {
                break;
            }
            try {
                wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error(e.getMessage(), e);
            }
        }
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        RLock lock = this.redissonClient.getLock(this.takeLockName);
        try {
            try {
                MessagePipeConfiguration.LockTime lockTime = this.configuration.getLockTime();
                if (lock.tryLock(lockTime.getWaitTime(), lockTime.getLeaseTime(), lockTime.getTimeUnit())) {
                    Message peek = peek();
                    if (ObjectUtils.isEmpty(peek)) {
                        log.warn("Message pipeline: {}, no message to be processed was found.", this.name);
                        this.lastProcessTimeMillis.set(valueOf.longValue());
                        this.transfer = true;
                        lock.unlock();
                        notifyAll();
                        return;
                    }
                    if (!function.apply(peek).booleanValue()) {
                        throw new MessagePipeException("MessagePipe [" + this.name + "] , Handle message exception, message content: " + new String(peek.getBody()));
                    }
                    poll();
                }
                this.lastProcessTimeMillis.set(valueOf.longValue());
                this.transfer = true;
                lock.unlock();
                notifyAll();
            } catch (Exception e2) {
                doHandleException(e2, null);
                this.lastProcessTimeMillis.set(valueOf.longValue());
                this.transfer = true;
                lock.unlock();
                notifyAll();
            }
        } catch (Throwable th) {
            this.lastProcessTimeMillis.set(valueOf.longValue());
            this.transfer = true;
            lock.unlock();
            notifyAll();
            throw th;
        }
    }

    public synchronized void handleToLast(Function<Message, Boolean> function) {
        this.runningHandleAll = true;
        RLock lock = this.redissonClient.getLock(this.takeLockName);
        Message message = null;
        try {
            try {
                MessagePipeConfiguration.LockTime lockTime = this.configuration.getLockTime();
                if (lock.tryLock(lockTime.getWaitTime(), lockTime.getLeaseTime(), lockTime.getTimeUnit())) {
                    while (this.queue.size() > 0) {
                        message = peek();
                        if (!function.apply(message).booleanValue()) {
                            throw new MessagePipeException("Handle message exception, message content: " + new String(message.getBody()));
                        }
                        poll();
                    }
                }
            } catch (Exception e) {
                doHandleException(e, message);
                this.transfer = true;
                this.runningHandleAll = false;
                lock.unlock();
                notifyAll();
            }
        } finally {
            this.transfer = true;
            this.runningHandleAll = false;
            lock.unlock();
            notifyAll();
        }
    }

    public Message peek() {
        Message message = null;
        if (!checkClientIsShutdown()) {
            message = (Message) this.queue.peek();
        }
        return message;
    }

    public Message poll() {
        Message message = null;
        if (!checkClientIsShutdown()) {
            message = (Message) this.queue.poll();
        }
        return message;
    }

    public int size() {
        int i = 0;
        if (!checkClientIsShutdown()) {
            i = this.queue.size();
        }
        return i;
    }

    public Long getLastProcessTimeMillis() {
        return Long.valueOf(this.lastProcessTimeMillis.get());
    }

    private boolean checkClientIsShutdown() {
        return this.redissonClient.isShutdown() || this.redissonClient.isShuttingDown();
    }

    private void doHandleException(Exception exc, Message message) {
        if (exc instanceof InterruptedException) {
            Thread.currentThread().interrupt();
        }
        this.configuration.getExceptionHandler().handleException(exc, message);
    }

    public String getName() {
        return this.name;
    }

    public RBlockingQueue<Message> getQueue() {
        return this.queue;
    }

    public RedissonClient getRedissonClient() {
        return this.redissonClient;
    }

    public MessagePipeConfiguration getConfiguration() {
        return this.configuration;
    }
}
