package io.reacted.core.mailboxes;

import io.reacted.core.messages.Message;
import io.reacted.core.messages.reactors.DeliveryStatus;
import io.reacted.core.messages.reactors.ReActorInit;
import io.reacted.core.messages.reactors.ReActorStop;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.patterns.ObjectUtils;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

/* loaded from: input_file:io/reacted/core/mailboxes/BackpressuringMbox.class */
public class BackpressuringMbox implements MailBox {
    public static final long DEFAULT_MESSAGES_REQUESTED_ON_STARTUP = 1;
    private final MailBox realMbox;
    private final AtomicReference<Set<Class<? extends Serializable>>> notDelayable;
    private final Set<? extends Serializable> outOfStreamControl;
    private final ReActorContext realMailboxOwner;
    private final long backpressuringThreshold;
    private final BlockingDeque<Message> bufferQueue = new LinkedBlockingDeque();
    private long available;

    /* loaded from: input_file:io/reacted/core/mailboxes/BackpressuringMbox$Builder.class */
    public static class Builder {
        private ReActorContext realMailboxOwner;
        private MailBox realMbox = new UnboundedMbox();
        private long backpressuringThreshold = 1;
        private long availableOnStartup = 1;
        private Set<Class<? extends Serializable>> notDelayable = Set.of(ReActorInit.class, ReActorStop.class);
        private Set<Class<? extends Serializable>> outOfStreamControl = Set.of();

        private Builder() {
        }

        @SafeVarargs
        public final Builder setOutOfStreamControl(Class<? extends Serializable>... clsArr) {
            this.outOfStreamControl = Set.of((Object[]) clsArr);
            return this;
        }

        public final Builder setBackpressuringThreshold(long j) {
            this.backpressuringThreshold = j;
            return this;
        }

        public final Builder setRealMbox(MailBox mailBox) {
            this.realMbox = mailBox;
            return this;
        }

        public final Builder setAvailableOnStartup(int i) {
            this.availableOnStartup = i;
            return this;
        }

        @SafeVarargs
        public final Builder setNonDelayable(Class<? extends Serializable>... clsArr) {
            this.notDelayable = Set.of((Object[]) clsArr);
            return this;
        }

        public final Builder setRealMailboxOwner(ReActorContext reActorContext) {
            this.realMailboxOwner = reActorContext;
            return this;
        }

        public BackpressuringMbox build() {
            return new BackpressuringMbox(this);
        }
    }

    private BackpressuringMbox(Builder builder) {
        this.outOfStreamControl = (Set) Objects.requireNonNull(builder.outOfStreamControl, "Out of Stream control set cannot be null");
        this.available = ((Long) ObjectUtils.requiredInRange(Long.valueOf(builder.availableOnStartup), 0L, Long.MAX_VALUE, IllegalArgumentException::new)).longValue();
        this.realMailboxOwner = (ReActorContext) Objects.requireNonNull(builder.realMailboxOwner, "Mailbox owner reactor cannot be null");
        this.realMbox = (MailBox) Objects.requireNonNull(builder.realMbox, "A backing mailbox must be provided");
        this.notDelayable = new AtomicReference<>((Set) ((Set) Objects.requireNonNull(builder.notDelayable, "Non delayable messages set cannot be a null")).stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toUnmodifiableSet()));
        this.backpressuringThreshold = ((Long) ObjectUtils.requiredInRange(Long.valueOf(builder.backpressuringThreshold), 1L, Long.MAX_VALUE, IllegalArgumentException::new)).longValue();
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    @Override // io.reacted.core.mailboxes.MailBox
    public boolean isEmpty() {
        return this.realMbox.isEmpty();
    }

    @Override // io.reacted.core.mailboxes.MailBox
    public boolean isFull() {
        return this.realMbox.isFull();
    }

    @Override // io.reacted.core.mailboxes.MailBox
    public long getMsgNum() {
        return this.realMbox.getMsgNum();
    }

    @Override // io.reacted.core.mailboxes.MailBox
    public long getMaxSize() {
        return this.realMbox.getMaxSize();
    }

    @Override // io.reacted.core.mailboxes.MailBox
    @Nonnull
    public Message getNextMessage() {
        return this.realMbox.getNextMessage();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.reacted.core.mailboxes.MailBox
    @Nonnull
    public DeliveryStatus deliver(Message message) {
        if (!isDelayable(message.getPayload().getClass())) {
            return this.realMbox.deliver(message);
        }
        DeliveryStatus deliveryStatus = DeliveryStatus.DELIVERED;
        synchronized (this) {
            if (isAnotherMessageAllowed() && this.bufferQueue.isEmpty()) {
                if (!this.outOfStreamControl.contains(message.getPayload().getClass())) {
                    decreaseAllowedMessages();
                }
                return this.realMbox.deliver(message);
            }
            this.bufferQueue.addLast(message);
            if (this.bufferQueue.size() >= this.backpressuringThreshold) {
                deliveryStatus = DeliveryStatus.BACKPRESSURE_REQUIRED;
            }
            return deliveryStatus;
        }
    }

    @Override // io.reacted.core.mailboxes.MailBox
    public void request(long j) {
        boolean z = false;
        synchronized (this) {
            updateAllowedMessages(j);
            while (isAnotherMessageAllowed() && !this.bufferQueue.isEmpty()) {
                Message removeFirst = this.bufferQueue.removeFirst();
                z |= this.realMbox.deliver(removeFirst).isRescheduleRequired();
                if (!this.outOfStreamControl.contains(removeFirst.getPayload().getClass())) {
                    decreaseAllowedMessages();
                }
            }
        }
        if (z) {
            this.realMailboxOwner.reschedule();
        }
    }

    public static Optional<BackpressuringMbox> toBackpressuringMailbox(MailBox mailBox) {
        return BackpressuringMbox.class.isAssignableFrom(mailBox.getClass()) ? Optional.of((BackpressuringMbox) mailBox) : Optional.empty();
    }

    public boolean isDelayable(Class<? extends Serializable> cls) {
        return !this.notDelayable.get().contains(cls);
    }

    public BackpressuringMbox addNonDelayableTypes(Class<? extends Serializable>... clsArr) {
        return addNonDelayableTypes((Set<Class<? extends Serializable>>) Arrays.stream(clsArr).collect(Collectors.toSet()));
    }

    public BackpressuringMbox addNonDelayableTypes(Set<Class<? extends Serializable>> set) {
        Set<Class<? extends Serializable>> set2;
        do {
            set2 = this.notDelayable.get();
        } while (!this.notDelayable.compareAndSet(set2, (Set) Stream.concat(set2.stream(), ((Set) Objects.requireNonNull(set, "Non delayable types cannot be null")).stream().filter((v0) -> {
            return Objects.nonNull(v0);
        })).collect(Collectors.toUnmodifiableSet())));
        return this;
    }

    private boolean isAnotherMessageAllowed() {
        return this.available > 0;
    }

    private void decreaseAllowedMessages() {
        updateAllowedMessages(-1L);
    }

    private void updateAllowedMessages(long j) {
        this.available += j;
    }
}
