package org.opencastproject.index.service.message;

import java.io.Serializable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.opencastproject.index.service.impl.index.AbstractSearchIndex;
import org.opencastproject.message.broker.api.BaseMessage;
import org.opencastproject.message.broker.api.MessageReceiver;
import org.opencastproject.message.broker.api.MessageSender;
import org.opencastproject.message.broker.api.index.IndexRecreateObject;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.User;
import org.opencastproject.util.OsgiUtil;
import org.opencastproject.util.data.Effect2;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opencastproject/index/service/message/BaseMessageReceiverImpl.class */
public abstract class BaseMessageReceiverImpl<T extends Serializable> {
    private static final String DESTINATION_ID_KEY = "destinationId";
    private static final Logger logger = LoggerFactory.getLogger(BaseMessageReceiverImpl.class);
    private SecurityService securityService;
    private MessageSender messageSender;
    private MessageReceiver messageReceiver;
    private BaseMessageReceiverImpl<T>.MessageWatcher messageWatcher;
    private AbstractSearchIndex index;
    private MessageReceiverLockService lockService;
    private String destinationId;
    private MessageSender.DestinationType destinationType;
    private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    private final Effect2<Serializable, String> execute = new Effect2<Serializable, String>() { // from class: org.opencastproject.index.service.message.BaseMessageReceiverImpl.1
        /* JADX INFO: Access modifiers changed from: protected */
        public void run(Serializable serializable, String str) {
            BaseMessageReceiverImpl.this.execute(serializable);
        }
    };

    /* loaded from: input_file:org/opencastproject/index/service/message/BaseMessageReceiverImpl$MessageWatcher.class */
    private class MessageWatcher implements Runnable {
        private FutureTask<Serializable> future;
        private final String clazzName;
        private final MessageReceiverLockService lockService;
        private final Logger logger = LoggerFactory.getLogger(MessageWatcher.class);
        private boolean listening = true;
        private final ExecutorService executor = Executors.newSingleThreadExecutor();

        MessageWatcher(MessageReceiverLockService messageReceiverLockService) {
            this.clazzName = BaseMessageReceiverImpl.this.getClass().getName();
            this.lockService = messageReceiverLockService;
        }

        public void stopListening() {
            this.listening = false;
            this.future.cancel(true);
        }

        @Override // java.lang.Runnable
        public void run() {
            this.logger.info("Starting to listen for {} Messages for {}", this.clazzName, BaseMessageReceiverImpl.this.destinationId);
            while (this.listening) {
                this.future = BaseMessageReceiverImpl.this.messageReceiver.receiveSerializable(BaseMessageReceiverImpl.this.getDestinationId(), BaseMessageReceiverImpl.this.getDestinationType());
                this.executor.execute(this.future);
                try {
                    BaseMessage baseMessage = this.future.get();
                    if (baseMessage == null) {
                        BaseMessageReceiverImpl.this.securityService.setOrganization((Organization) null);
                        BaseMessageReceiverImpl.this.securityService.setUser((User) null);
                    } else {
                        BaseMessageReceiverImpl.this.securityService.setOrganization(baseMessage.getOrganization());
                        BaseMessageReceiverImpl.this.securityService.setUser(baseMessage.getUser());
                        if (baseMessage.getObject() instanceof IndexRecreateObject) {
                            IndexRecreateObject object = baseMessage.getObject();
                            if (IndexRecreateObject.Status.End.equals(object.getStatus())) {
                                BaseMessageReceiverImpl.this.messageSender.sendObjectMessage("INDEX_RESPONSE.QUEUE", MessageSender.DestinationType.Queue, IndexRecreateObject.end(object.getIndexName(), object.getService()));
                            }
                        } else {
                            this.lockService.synchronize(baseMessage.getId().get(), BaseMessageReceiverImpl.this.execute.curry(baseMessage.getObject()).toFn());
                        }
                    }
                } catch (CancellationException e) {
                    this.logger.trace("Listening for messages {} has been cancelled.", this.clazzName);
                } catch (InterruptedException e2) {
                    this.logger.error("Problem while getting {} message events {}", this.clazzName, ExceptionUtils.getStackTrace(e2));
                } catch (ExecutionException e3) {
                    this.logger.error("Problem while getting {} message events {}", this.clazzName, ExceptionUtils.getStackTrace(e3));
                } catch (Throwable th) {
                    this.logger.error("Problem while getting {} message events {}", this.clazzName, ExceptionUtils.getStackTrace(th));
                } finally {
                    BaseMessageReceiverImpl.this.securityService.setOrganization((Organization) null);
                    BaseMessageReceiverImpl.this.securityService.setUser((User) null);
                }
            }
            this.logger.info("Stopping listening for {} Messages", this.clazzName);
        }
    }

    public BaseMessageReceiverImpl(MessageSender.DestinationType destinationType) {
        this.destinationType = destinationType;
    }

    public void activate(ComponentContext componentContext) {
        logger.info("Activating {}", getClass().getName());
        this.destinationId = OsgiUtil.getComponentContextProperty(componentContext, DESTINATION_ID_KEY);
        logger.info("The {} for this message receiver is '{}'", DESTINATION_ID_KEY, this.destinationId);
        this.messageWatcher = new MessageWatcher(this.lockService);
        this.singleThreadExecutor.execute(this.messageWatcher);
    }

    public void deactivate(ComponentContext componentContext) {
        logger.info("Deactivating {}", getClass().getName());
        if (this.messageWatcher != null) {
            this.messageWatcher.stopListening();
        }
        this.singleThreadExecutor.shutdown();
    }

    protected abstract void execute(T t);

    protected String getDestinationId() {
        return this.destinationId;
    }

    protected MessageSender.DestinationType getDestinationType() {
        return this.destinationType;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSearchIndex getSearchIndex() {
        return this.index;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SecurityService getSecurityService() {
        return this.securityService;
    }

    public void setSecurityService(SecurityService securityService) {
        this.securityService = securityService;
    }

    public void setMessageSender(MessageSender messageSender) {
        this.messageSender = messageSender;
    }

    public void setMessageReceiver(MessageReceiver messageReceiver) {
        this.messageReceiver = messageReceiver;
    }

    public void setSearchIndex(AbstractSearchIndex abstractSearchIndex) {
        this.index = abstractSearchIndex;
    }

    public void setMessageReceiverLockService(MessageReceiverLockService messageReceiverLockService) {
        this.lockService = messageReceiverLockService;
    }
}
