package org.jacorb.notification.servant;

import java.util.List;
import org.jacorb.config.Configuration;
import org.jacorb.config.ConfigurationException;
import org.jacorb.notification.OfferManager;
import org.jacorb.notification.SubscriptionManager;
import org.jacorb.notification.conf.Attributes;
import org.jacorb.notification.engine.TaskProcessor;
import org.jacorb.notification.interfaces.Message;
import org.jacorb.notification.interfaces.MessageConsumer;
import org.jacorb.notification.queue.EventQueueFactory;
import org.jacorb.notification.queue.MessageQueue;
import org.jacorb.notification.queue.RWLockEventQueueDecorator;
import org.jacorb.notification.util.CollectionsWrapper;
import org.jacorb.notification.util.PropertySet;
import org.jacorb.notification.util.PropertySetAdapter;
import org.omg.CORBA.Any;
import org.omg.CORBA.NO_IMPLEMENT;
import org.omg.CORBA.ORB;
import org.omg.CORBA.Object;
import org.omg.CosNotification.DiscardPolicy;
import org.omg.CosNotification.EventType;
import org.omg.CosNotification.MaxEventsPerConsumer;
import org.omg.CosNotification.OrderPolicy;
import org.omg.CosNotification.Property;
import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
import org.omg.CosNotifyChannelAdmin.ObtainInfoMode;
import org.omg.CosNotifyComm.InvalidEventType;
import org.omg.CosNotifyComm.NotifyPublish;
import org.omg.CosNotifyComm.NotifyPublishHelper;
import org.omg.CosNotifyComm.NotifyPublishOperations;
import org.omg.CosNotifyComm.NotifySubscribeOperations;
import org.omg.PortableServer.POA;

/* loaded from: input_file:org/jacorb/notification/servant/AbstractProxySupplier.class */
public abstract class AbstractProxySupplier extends AbstractProxy implements MessageConsumer, NotifySubscribeOperations, AbstractProxySupplierMBean {
    private static final String EVENT_MESSAGE_DISCARDED = "notification.proxy.message_discarded";
    private int numberOfDiscardedMessages_;
    private MessageQueue.DiscardListener discardListener_;
    private static final Runnable EMPTY_RUNNABLE = new Runnable() { // from class: org.jacorb.notification.servant.AbstractProxySupplier.2
        @Override // java.lang.Runnable
        public void run() {
        }
    };
    private static final EventType[] EMPTY_EVENT_TYPE_ARRAY = new EventType[0];
    private static final Message[] EMPTY_MESSAGE = new Message[0];
    private final RWLockEventQueueDecorator pendingMessages_;
    private final int errorThreshold_;
    private final ConsumerAdmin consumerAdmin_;
    private final EventQueueFactory eventQueueFactory_;
    private NotifyPublishOperations proxyOfferListener_;
    private NotifyPublish offerListener_;
    private PropertySetAdapter eventQueueConfigurationChangedCB;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractProxySupplier(IAdmin iAdmin, ORB orb, POA poa, Configuration configuration, TaskProcessor taskProcessor, OfferManager offerManager, SubscriptionManager subscriptionManager, ConsumerAdmin consumerAdmin) throws ConfigurationException {
        super(iAdmin, orb, poa, configuration, taskProcessor, offerManager, subscriptionManager);
        this.numberOfDiscardedMessages_ = 0;
        this.discardListener_ = new MessageQueue.DiscardListener() { // from class: org.jacorb.notification.servant.AbstractProxySupplier.1
            private long sendTimestamp_;
            private int discardedMessagesSinceLastBroadcast_ = 1;

            @Override // org.jacorb.notification.queue.MessageQueue.DiscardListener
            public void messageDiscarded(int i) {
                AbstractProxySupplier.access$008(AbstractProxySupplier.this);
                if (System.currentTimeMillis() - this.sendTimestamp_ < 5000) {
                    this.discardedMessagesSinceLastBroadcast_++;
                    return;
                }
                AbstractProxySupplier.this.sendNotification(AbstractProxySupplier.EVENT_MESSAGE_DISCARDED, this.discardedMessagesSinceLastBroadcast_ + " Message(s) discarded. Queue Limit: " + i);
                this.sendTimestamp_ = System.currentTimeMillis();
                this.discardedMessagesSinceLastBroadcast_ = 1;
                if (AbstractProxySupplier.this.logger_.isInfoEnabled()) {
                    AbstractProxySupplier.this.logger_.info(this.discardedMessagesSinceLastBroadcast_ + " Message(s) discarded. Queue Limit: " + i);
                }
            }
        };
        this.eventQueueConfigurationChangedCB = new PropertySetAdapter() { // from class: org.jacorb.notification.servant.AbstractProxySupplier.3
            @Override // org.jacorb.notification.util.PropertySetAdapter, org.jacorb.notification.util.PropertySetListener
            public void actionPropertySetChanged(PropertySet propertySet) {
                AbstractProxySupplier.this.configureEventQueue();
            }
        };
        this.consumerAdmin_ = consumerAdmin;
        this.eventQueueFactory_ = new EventQueueFactory(configuration);
        this.errorThreshold_ = configuration.getAttributeAsInteger(Attributes.EVENTCONSUMER_ERROR_THRESHOLD, 3);
        if (this.logger_.isInfoEnabled()) {
            this.logger_.info("set Error Threshold to : " + this.errorThreshold_);
        }
        this.qosSettings_.addPropertySetListener(new String[]{OrderPolicy.value, DiscardPolicy.value, MaxEventsPerConsumer.value}, this.eventQueueConfigurationChangedCB);
        this.pendingMessages_ = new RWLockEventQueueDecorator(getMessageQueueFactory().newMessageQueue(this.qosSettings_));
        this.pendingMessages_.addDiscardListener(this.discardListener_);
        this.eventTypes_.add(EVENT_MESSAGE_DISCARDED);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EventQueueFactory getMessageQueueFactory() {
        return this.eventQueueFactory_;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void configureEventQueue() {
        try {
            this.pendingMessages_.replaceDelegate(getMessageQueueFactory().newMessageQueue(this.qosSettings_));
        } catch (InterruptedException e) {
        }
    }

    @Override // org.jacorb.notification.servant.AbstractProxySupplierMBean
    public int getPendingMessagesCount() {
        try {
            return this.pendingMessages_.getPendingMessagesCount();
        } catch (InterruptedException e) {
            return -1;
        }
    }

    @Override // org.jacorb.notification.servant.AbstractProxySupplierMBean
    public final String getOrderPolicy() {
        return this.pendingMessages_.getOrderPolicyName();
    }

    @Override // org.jacorb.notification.servant.AbstractProxySupplierMBean
    public final String getDiscardPolicy() {
        return this.pendingMessages_.getDiscardPolicyName();
    }

    @Override // org.jacorb.notification.servant.AbstractProxySupplierMBean
    public final int getMaxEventsPerConsumer() {
        return this.qosSettings_.get(MaxEventsPerConsumer.value).extract_long();
    }

    @Override // org.jacorb.notification.servant.AbstractProxySupplierMBean
    public void setMaxEventsPerConsumer(int i) {
        Any create_any = getORB().create_any();
        create_any.insert_long(i);
        this.qosSettings_.set_qos(new Property[]{new Property(MaxEventsPerConsumer.value, create_any)});
    }

    @Override // org.jacorb.notification.servant.AbstractProxySupplierMBean
    public int getNumberOfDiscardedMessages() {
        return this.numberOfDiscardedMessages_;
    }

    public boolean hasPendingData() {
        try {
            return this.pendingMessages_.hasPendingMessages();
        } catch (InterruptedException e) {
            return false;
        }
    }

    protected void enqueue(Message message) {
        Message message2 = (Message) message.clone();
        try {
            this.pendingMessages_.enqeue(message2);
            if (this.logger_.isDebugEnabled()) {
                this.logger_.debug("enqueue " + message + " to pending Messages.");
            }
        } catch (InterruptedException e) {
            message2.dispose();
            this.logger_.info("enqueue was interrupted", (Throwable) e);
        }
    }

    public Message getMessageBlocking() throws InterruptedException {
        return this.pendingMessages_.getMessageBlocking();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message getMessageNoBlock() {
        try {
            return this.pendingMessages_.getMessageNoBlock();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    protected Message[] getAllMessages() {
        try {
            return this.pendingMessages_.getAllMessages();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return EMPTY_MESSAGE;
        }
    }

    @Override // org.jacorb.notification.interfaces.MessageConsumer
    public void queueMessage(Message message) {
        if (this.logger_.isDebugEnabled()) {
            this.logger_.debug("queueMessage() connected=" + getConnected() + " suspended=" + isSuspended());
        }
        if (getConnected()) {
            enqueue(message);
            messageQueued();
        }
    }

    protected void messageQueued() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message[] getUpToMessages(int i) {
        try {
            return this.pendingMessages_.getUpToMessages(i);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return EMPTY_MESSAGE;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message[] getAtLeastMessages(int i) {
        try {
            return this.pendingMessages_.getAtLeastMessages(i);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return EMPTY_MESSAGE;
        }
    }

    public int getErrorThreshold() {
        return this.errorThreshold_;
    }

    @Override // org.jacorb.notification.servant.AbstractProxy, org.picocontainer.Disposable
    public final void dispose() {
        super.dispose();
        this.pendingMessages_.clear();
        getTaskProcessor().executeTaskAfterDelay(1000L, EMPTY_RUNNABLE);
    }

    public final ConsumerAdmin MyAdmin() {
        return this.consumerAdmin_;
    }

    @Override // org.omg.CosNotifyComm.NotifySubscribeOperations
    public final void subscription_change(EventType[] eventTypeArr, EventType[] eventTypeArr2) throws InvalidEventType {
        this.subscriptionManager_.subscription_change(eventTypeArr, eventTypeArr2);
    }

    public final EventType[] obtain_offered_types(ObtainInfoMode obtainInfoMode) {
        EventType[] eventTypeArr = EMPTY_EVENT_TYPE_ARRAY;
        switch (obtainInfoMode.value()) {
            case 0:
                eventTypeArr = this.offerManager_.obtain_offered_types();
                removeListener();
                break;
            case 1:
                registerListener();
                eventTypeArr = this.offerManager_.obtain_offered_types();
                break;
            case 2:
                removeListener();
                break;
            case 3:
                registerListener();
                break;
            default:
                throw new IllegalArgumentException("Illegal ObtainInfoMode");
        }
        return eventTypeArr;
    }

    private void registerListener() {
        final NotifyPublishOperations offerListener;
        if (this.proxyOfferListener_ != null || (offerListener = getOfferListener()) == null) {
            return;
        }
        this.proxyOfferListener_ = new NotifyPublishOperations() { // from class: org.jacorb.notification.servant.AbstractProxySupplier.4
            @Override // org.omg.CosNotifyComm.NotifyPublishOperations
            public void offer_change(EventType[] eventTypeArr, EventType[] eventTypeArr2) {
                try {
                    offerListener.offer_change(eventTypeArr, eventTypeArr2);
                } catch (NO_IMPLEMENT e) {
                    AbstractProxySupplier.this.logger_.info("disable offer_change for connected Consumer.", (Throwable) e);
                    AbstractProxySupplier.this.removeListener();
                } catch (InvalidEventType e2) {
                    AbstractProxySupplier.this.logger_.warn("invalid event type", (Throwable) e2);
                } catch (Exception e3) {
                    AbstractProxySupplier.this.logger_.warn("offer_change failed", (Throwable) e3);
                }
            }
        };
        this.offerManager_.addListener(this.proxyOfferListener_);
    }

    @Override // org.jacorb.notification.servant.AbstractProxy
    protected void removeListener() {
        if (this.proxyOfferListener_ != null) {
            this.offerManager_.removeListener(this.proxyOfferListener_);
            this.proxyOfferListener_ = null;
        }
    }

    final NotifyPublishOperations getOfferListener() {
        return this.offerListener_;
    }

    @Override // org.jacorb.notification.servant.AbstractProxy
    protected final void clientDisconnected() {
        this.offerListener_ = null;
    }

    @Override // org.jacorb.notification.servant.AbstractProxy
    public void connectClient(Object object) {
        super.connectClient(object);
        try {
            this.offerListener_ = NotifyPublishHelper.narrow(object);
            this.logger_.debug("successfully narrowed connecting Client to IF NotifyPublish");
        } catch (Exception e) {
            this.logger_.info("disable offer_change for connecting Consumer");
        }
    }

    public boolean isRetryAllowed() {
        return !isDestroyed() && getErrorCounter() < getErrorThreshold();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract long getCost();

    @Override // java.lang.Comparable
    public int compareTo(Object obj) {
        return (int) (getCost() - ((AbstractProxySupplier) obj).getCost());
    }

    @Override // org.jacorb.notification.interfaces.FilterStage
    public final boolean hasMessageConsumer() {
        return true;
    }

    @Override // org.jacorb.notification.interfaces.FilterStageSource
    public final List getSubsequentFilterStages() {
        return CollectionsWrapper.singletonList(this);
    }

    @Override // org.jacorb.notification.interfaces.FilterStage
    public final MessageConsumer getMessageConsumer() {
        return this;
    }

    @Override // org.jacorb.notification.servant.AbstractProxySupplierMBean
    public void clearPendingMessageQueue() {
        this.pendingMessages_.clear();
    }

    static /* synthetic */ int access$008(AbstractProxySupplier abstractProxySupplier) {
        int i = abstractProxySupplier.numberOfDiscardedMessages_;
        abstractProxySupplier.numberOfDiscardedMessages_ = i + 1;
        return i;
    }
}
