/*
 * Decompiled with CFR 0.152.
 */
package org.forgerock.openam.notifications.integration.brokers;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import javax.inject.Inject;
import javax.inject.Named;
import org.forgerock.json.JsonValue;
import org.forgerock.openam.audit.context.AMExecutorServiceFactory;
import org.forgerock.openam.cts.CTSPersistentStore;
import org.forgerock.openam.cts.api.filter.TokenFilter;
import org.forgerock.openam.cts.api.filter.TokenFilterBuilder;
import org.forgerock.openam.cts.api.tokens.Token;
import org.forgerock.openam.cts.continuous.ChangeType;
import org.forgerock.openam.cts.continuous.ContinuousQueryListener;
import org.forgerock.openam.cts.exceptions.CoreTokenException;
import org.forgerock.openam.notifications.Consumer;
import org.forgerock.openam.notifications.NotificationBroker;
import org.forgerock.openam.notifications.Subscription;
import org.forgerock.openam.notifications.Topic;
import org.forgerock.openam.sm.datalayer.api.DataLayerException;
import org.forgerock.openam.tokens.CoreTokenField;
import org.forgerock.openam.tokens.TokenType;
import org.forgerock.openam.utils.JsonValueBuilder;
import org.forgerock.openam.utils.Time;
import org.forgerock.openam.utils.TimeUtils;
import org.forgerock.opendj.ldap.Attribute;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.util.Reject;
import org.forgerock.util.generator.IdGenerator;
import org.forgerock.util.query.QueryFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CTSNotificationBroker
implements NotificationBroker {
    private static final Logger logger = LoggerFactory.getLogger(CTSNotificationBroker.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private final NotificationBroker localBroker;
    private final CTSPersistentStore store;
    private final SessionNotificationListener listener;
    private final long tokenExpirySeconds;
    private final IdGenerator idGenerator;
    private final BlockingQueue<NotificationEntry> queue;
    private final ScheduledExecutorService executorService;
    private volatile boolean shutdown;

    @Inject
    public CTSNotificationBroker(CTSPersistentStore store, @Named(value="localBroker") NotificationBroker localBroker, @Named(value="ctsQueueSize") int queueSize, @Named(value="tokenExpirySeconds") long tokenExpirySeconds, @Named(value="publishFrequencyMilliseconds") long publishFrequencyMilliseconds, AMExecutorServiceFactory executorServiceFactory) {
        Reject.ifNull((Object)store, (String)"CTS store must not be null");
        Reject.ifNull((Object)localBroker, (String)"Notification broker must not be null");
        Reject.ifNull((Object)executorServiceFactory, (String)"Executor service factory must not be null");
        Reject.ifTrue((tokenExpirySeconds <= 0L ? 1 : 0) != 0, (String)"Token expiry must be a positive integer");
        Reject.ifTrue((publishFrequencyMilliseconds <= 0L ? 1 : 0) != 0, (String)"Publish frequency must be a positive integer");
        this.localBroker = localBroker;
        this.store = store;
        this.tokenExpirySeconds = tokenExpirySeconds;
        this.executorService = executorServiceFactory.createScheduledService(1, "CTSNotificationsBroker");
        this.idGenerator = IdGenerator.DEFAULT;
        this.listener = new SessionNotificationListener();
        this.queue = new ArrayBlockingQueue<NotificationEntry>(queueSize);
        this.executorService.scheduleAtFixedRate(new CTSPublisher(), publishFrequencyMilliseconds, publishFrequencyMilliseconds, TimeUnit.MILLISECONDS);
        try {
            store.addContinuousQueryListener((ContinuousQueryListener)this.listener, CTSNotificationBroker.getTokenFilter());
        }
        catch (CoreTokenException ctE) {
            throw new RuntimeException("Unable to register session notifications", ctE);
        }
    }

    public boolean publish(Topic topic, JsonValue notification) {
        Reject.ifNull((Object)topic, (String)"Topic must not be null");
        Reject.ifNull((Object)notification, (String)"Notification must not be null");
        if (this.shutdown) {
            logger.info("Not publishing notification as broker shutting down");
            return false;
        }
        if (!this.queue.offer(NotificationEntry.of(topic, notification))) {
            logger.info("Failed to publish notification because queue is full. Notification discarded");
            return false;
        }
        return true;
    }

    public Subscription subscribe(Consumer consumer) {
        return this.localBroker.subscribe(consumer);
    }

    public void shutdown() {
        this.shutdown = true;
        this.executorService.shutdownNow();
        this.localBroker.shutdown();
        try {
            this.store.removeContinuousQueryListener((ContinuousQueryListener)this.listener, CTSNotificationBroker.getTokenFilter());
        }
        catch (CoreTokenException ctE) {
            logger.warn("Failed to remove continuous query listener", (Throwable)ctE);
        }
    }

    private static TokenFilter getTokenFilter() {
        return new TokenFilterBuilder().returnAttribute(CoreTokenField.BLOB).withQuery(QueryFilter.equalTo((Object)CoreTokenField.TOKEN_TYPE, (Object)TokenType.NOTIFICATION)).build();
    }

    private static final class NotificationEntry {
        private final Topic topic;
        private final JsonValue notification;

        private NotificationEntry(Topic topic, JsonValue notification) {
            this.topic = topic;
            this.notification = notification;
        }

        static NotificationEntry of(Topic topic, JsonValue notification) {
            return new NotificationEntry(topic, notification);
        }
    }

    private final class CTSPublisher
    implements Runnable {
        private CTSPublisher() {
        }

        @Override
        public void run() {
            ArrayList entries = new ArrayList();
            CTSNotificationBroker.this.queue.drainTo(entries);
            if (entries.isEmpty()) {
                return;
            }
            ArrayList<Map> jsonEntries = new ArrayList<Map>(entries.size());
            for (NotificationEntry entry : entries) {
                jsonEntries.add(JsonValue.object((Map.Entry[])new Map.Entry[]{JsonValue.field((String)"topic", (Object)entry.topic.getIdentifier()), JsonValue.field((String)"content", (Object)entry.notification.getObject())}));
            }
            JsonValue entry = JsonValue.json(jsonEntries);
            try {
                Token token = new Token(CTSNotificationBroker.this.idGenerator.generate(), TokenType.NOTIFICATION);
                ByteArrayOutputStream stream = new ByteArrayOutputStream();
                DeflaterOutputStream dos = new DeflaterOutputStream(stream);
                ((OutputStream)dos).write(mapper.writeValueAsBytes(entry.getObject()));
                ((OutputStream)dos).close();
                token.setBlob(stream.toByteArray());
                long expiryTime = Time.currentTimeMillis() + TimeUnit.SECONDS.toMillis(CTSNotificationBroker.this.tokenExpirySeconds);
                Calendar expiryTimeStamp = TimeUtils.fromUnixTime((long)expiryTime, (TimeUnit)TimeUnit.MILLISECONDS);
                token.setExpiryTimestamp(expiryTimeStamp);
                CTSNotificationBroker.this.store.createAsync(token);
            }
            catch (IOException | CoreTokenException e) {
                logger.info("Failed to write notification to CTS", e);
            }
        }
    }

    private final class SessionNotificationListener
    implements ContinuousQueryListener<Attribute> {
        private SessionNotificationListener() {
        }

        public void objectChanged(String tokenId, Map<String, Attribute> changeSet, ChangeType changeType) {
            if (changeType == ChangeType.ADD) {
                try {
                    ByteString entryBlob = changeSet.get(CoreTokenField.BLOB.toString()).firstValue();
                    InflaterInputStream stream = new InflaterInputStream(new ByteArrayInputStream(entryBlob.toByteArray()));
                    JsonValue entries = JsonValueBuilder.toJsonArray((InputStream)stream);
                    for (JsonValue entry : entries) {
                        String topic = entry.get("topic").asString();
                        JsonValue content = entry.get("content");
                        CTSNotificationBroker.this.localBroker.publish(Topic.of((String)topic), content);
                    }
                }
                catch (Exception e) {
                    logger.error("Failed to publish notification to the local broker", (Throwable)e);
                }
            }
        }

        public void objectsChanged(Set<String> tokenIds) {
        }

        public void connectionLost() {
            logger.warn("Continuous query listener has lost its connection");
        }

        public void processError(DataLayerException dlE) {
            logger.error("Notification token listener error", (Throwable)dlE);
        }
    }
}

