package org.forgerock.openam.notifications.integration.brokers;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import javax.inject.Inject;
import javax.inject.Named;
import org.forgerock.json.JsonValue;
import org.forgerock.openam.audit.context.AMExecutorServiceFactory;
import org.forgerock.openam.cts.CTSPersistentStore;
import org.forgerock.openam.cts.api.filter.TokenFilter;
import org.forgerock.openam.cts.api.filter.TokenFilterBuilder;
import org.forgerock.openam.cts.api.tokens.Token;
import org.forgerock.openam.cts.continuous.ChangeType;
import org.forgerock.openam.cts.continuous.ContinuousQueryListener;
import org.forgerock.openam.cts.exceptions.CoreTokenException;
import org.forgerock.openam.notifications.Consumer;
import org.forgerock.openam.notifications.NotificationBroker;
import org.forgerock.openam.notifications.Subscription;
import org.forgerock.openam.notifications.Topic;
import org.forgerock.openam.sm.datalayer.api.DataLayerException;
import org.forgerock.openam.tokens.CoreTokenField;
import org.forgerock.openam.tokens.TokenType;
import org.forgerock.openam.utils.JsonValueBuilder;
import org.forgerock.openam.utils.Time;
import org.forgerock.openam.utils.TimeUtils;
import org.forgerock.opendj.ldap.Attribute;
import org.forgerock.util.Reject;
import org.forgerock.util.generator.IdGenerator;
import org.forgerock.util.query.QueryFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/forgerock/openam/notifications/integration/brokers/CTSNotificationBroker.class */
public final class CTSNotificationBroker implements NotificationBroker {
    private static final Logger logger = LoggerFactory.getLogger(CTSNotificationBroker.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private final NotificationBroker localBroker;
    private final CTSPersistentStore store;
    private final SessionNotificationListener listener;
    private final long tokenExpirySeconds;
    private final IdGenerator idGenerator;
    private final BlockingQueue<NotificationEntry> queue;
    private final ScheduledExecutorService executorService;
    private volatile boolean shutdown;

    /* loaded from: input_file:org/forgerock/openam/notifications/integration/brokers/CTSNotificationBroker$CTSPublisher.class */
    private final class CTSPublisher implements Runnable {
        private CTSPublisher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            ArrayList<NotificationEntry> arrayList = new ArrayList();
            CTSNotificationBroker.this.queue.drainTo(arrayList);
            if (arrayList.isEmpty()) {
                return;
            }
            ArrayList arrayList2 = new ArrayList(arrayList.size());
            for (NotificationEntry notificationEntry : arrayList) {
                arrayList2.add(JsonValue.object(new Map.Entry[]{JsonValue.field("topic", notificationEntry.topic.getIdentifier()), JsonValue.field("content", notificationEntry.notification.getObject())}));
            }
            JsonValue json = JsonValue.json(arrayList2);
            try {
                Token token = new Token(CTSNotificationBroker.this.idGenerator.generate(), TokenType.NOTIFICATION);
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream);
                deflaterOutputStream.write(CTSNotificationBroker.mapper.writeValueAsBytes(json.getObject()));
                deflaterOutputStream.close();
                token.setBlob(byteArrayOutputStream.toByteArray());
                token.setExpiryTimestamp(TimeUtils.fromUnixTime(Time.currentTimeMillis() + TimeUnit.SECONDS.toMillis(CTSNotificationBroker.this.tokenExpirySeconds), TimeUnit.MILLISECONDS));
                CTSNotificationBroker.this.store.createAsync(token);
            } catch (CoreTokenException | IOException e) {
                CTSNotificationBroker.logger.info("Failed to write notification to CTS", e);
            }
        }
    }

    /* loaded from: input_file:org/forgerock/openam/notifications/integration/brokers/CTSNotificationBroker$NotificationEntry.class */
    private static final class NotificationEntry {
        private final Topic topic;
        private final JsonValue notification;

        private NotificationEntry(Topic topic, JsonValue jsonValue) {
            this.topic = topic;
            this.notification = jsonValue;
        }

        static NotificationEntry of(Topic topic, JsonValue jsonValue) {
            return new NotificationEntry(topic, jsonValue);
        }
    }

    /* loaded from: input_file:org/forgerock/openam/notifications/integration/brokers/CTSNotificationBroker$SessionNotificationListener.class */
    private final class SessionNotificationListener implements ContinuousQueryListener<Attribute> {
        private SessionNotificationListener() {
        }

        public void objectChanged(String str, Map<String, Attribute> map, ChangeType changeType) {
            if (changeType == ChangeType.ADD) {
                try {
                    Iterator it = JsonValueBuilder.toJsonArray(new InflaterInputStream(new ByteArrayInputStream(map.get(CoreTokenField.BLOB.toString()).firstValue().toByteArray()))).iterator();
                    while (it.hasNext()) {
                        JsonValue jsonValue = (JsonValue) it.next();
                        String asString = jsonValue.get("topic").asString();
                        CTSNotificationBroker.this.localBroker.publish(Topic.of(asString), jsonValue.get("content"));
                    }
                } catch (Exception e) {
                    CTSNotificationBroker.logger.error("Failed to publish notification to the local broker", e);
                }
            }
        }

        public void objectsChanged(Set<String> set) {
        }

        public void connectionLost() {
            CTSNotificationBroker.logger.warn("Continuous query listener has lost its connection");
        }

        public void processError(DataLayerException dataLayerException) {
            CTSNotificationBroker.logger.error("Notification token listener error", dataLayerException);
        }
    }

    @Inject
    public CTSNotificationBroker(CTSPersistentStore cTSPersistentStore, @Named("localBroker") NotificationBroker notificationBroker, @Named("ctsQueueSize") int i, @Named("tokenExpirySeconds") long j, @Named("publishFrequencyMilliseconds") long j2, AMExecutorServiceFactory aMExecutorServiceFactory) {
        Reject.ifNull(cTSPersistentStore, "CTS store must not be null");
        Reject.ifNull(notificationBroker, "Notification broker must not be null");
        Reject.ifNull(aMExecutorServiceFactory, "Executor service factory must not be null");
        Reject.ifTrue(j <= 0, "Token expiry must be a positive integer");
        Reject.ifTrue(j2 <= 0, "Publish frequency must be a positive integer");
        this.localBroker = notificationBroker;
        this.store = cTSPersistentStore;
        this.tokenExpirySeconds = j;
        this.executorService = aMExecutorServiceFactory.createScheduledService(1, "CTSNotificationsBroker");
        this.idGenerator = IdGenerator.DEFAULT;
        this.listener = new SessionNotificationListener();
        this.queue = new ArrayBlockingQueue(i);
        this.executorService.scheduleAtFixedRate(new CTSPublisher(), j2, j2, TimeUnit.MILLISECONDS);
        try {
            cTSPersistentStore.addContinuousQueryListener(this.listener, getTokenFilter());
        } catch (CoreTokenException e) {
            throw new RuntimeException("Unable to register session notifications", e);
        }
    }

    public boolean publish(Topic topic, JsonValue jsonValue) {
        Reject.ifNull(topic, "Topic must not be null");
        Reject.ifNull(jsonValue, "Notification must not be null");
        if (this.shutdown) {
            logger.info("Not publishing notification as broker shutting down");
            return false;
        }
        if (this.queue.offer(NotificationEntry.of(topic, jsonValue))) {
            return true;
        }
        logger.info("Failed to publish notification because queue is full. Notification discarded");
        return false;
    }

    public Subscription subscribe(Consumer consumer) {
        return this.localBroker.subscribe(consumer);
    }

    public void shutdown() {
        this.shutdown = true;
        this.executorService.shutdownNow();
        this.localBroker.shutdown();
        try {
            this.store.removeContinuousQueryListener(this.listener, getTokenFilter());
        } catch (CoreTokenException e) {
            logger.warn("Failed to remove continuous query listener", e);
        }
    }

    private static TokenFilter getTokenFilter() {
        return new TokenFilterBuilder().returnAttribute(CoreTokenField.BLOB).withQuery(QueryFilter.equalTo(CoreTokenField.TOKEN_TYPE, TokenType.NOTIFICATION)).build();
    }
}
