package org.wicketstuff.push.timer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.wicket.Application;
import org.apache.wicket.Component;
import org.apache.wicket.behavior.IBehavior;
import org.apache.wicket.util.time.Duration;
import org.apache.wicket.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wicketstuff.push.IPushChannel;
import org.wicketstuff.push.IPushChannelDisconnectedListener;
import org.wicketstuff.push.IPushEventHandler;
import org.wicketstuff.push.IPushService;

/* loaded from: input_file:WEB-INF/lib/push-timer-1.4.13.jar:org/wicketstuff/push/timer/TimerPushService.class */
public class TimerPushService implements IPushService {
    private static final Logger LOG = LoggerFactory.getLogger(TimerPushService.class);
    private static final Map<Application, TimerPushService> INSTANCES = new WeakHashMap(2);
    private Duration _defaultPollingInterval = Duration.seconds(2);
    private Duration _maxTimeLag = Duration.seconds(10);
    private final ConcurrentMap<TimerPushChannel<?>, PushChannelState> _channelStates = new ConcurrentHashMap();
    private final Set<IPushChannelDisconnectedListener> _disconnectListeners = new CopyOnWriteArraySet();
    private final ScheduledThreadPoolExecutor _cleanupExecutor = new ScheduledThreadPoolExecutor(1);
    private ScheduledFuture<?> _cleanupFuture = null;
    private final Runnable _cleanupTask = new Runnable() { // from class: org.wicketstuff.push.timer.TimerPushService.1
        @Override // java.lang.Runnable
        public void run() {
            TimerPushService.LOG.debug("Running timer push channel cleanup task...");
            Time now = Time.now();
            int i = 0;
            for (PushChannelState pushChannelState : TimerPushService.this._channelStates.values()) {
                if (now.subtract(pushChannelState.lastPolledAt).greaterThan(TimerPushService.this._maxTimeLag)) {
                    TimerPushService.this.onDisconnect(pushChannelState.channel);
                    i++;
                }
            }
            TimerPushService.LOG.debug("Cleaned up {} timer push channels.", Integer.valueOf(i));
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/push-timer-1.4.13.jar:org/wicketstuff/push/timer/TimerPushService$PushChannelState.class */
    public static final class PushChannelState {
        protected final TimerPushChannel<?> channel;
        protected Time lastPolledAt = Time.now();
        protected List<Object> queuedEvents = new ArrayList(2);
        protected final Object queuedEventsLock = new Object();

        protected PushChannelState(TimerPushChannel<?> timerPushChannel) {
            this.channel = timerPushChannel;
        }
    }

    public static TimerPushService get() {
        return get(Application.get());
    }

    public static TimerPushService get(Application application) {
        TimerPushService timerPushService = INSTANCES.get(application);
        if (timerPushService == null) {
            timerPushService = new TimerPushService();
            INSTANCES.put(application, timerPushService);
        }
        return timerPushService;
    }

    private TimerPushService() {
        setCleanupInterval(Duration.seconds(60));
    }

    private TimerPushBehavior _findPushBehaviour(Component component) {
        for (IBehavior iBehavior : component.getBehaviors()) {
            if (iBehavior instanceof TimerPushBehavior) {
                return (TimerPushBehavior) iBehavior;
            }
        }
        return null;
    }

    private void _onConnect(TimerPushChannel<?> timerPushChannel) {
        this._channelStates.put(timerPushChannel, new PushChannelState(timerPushChannel));
    }

    @Override // org.wicketstuff.push.IPushService
    public void addPushChannelDisconnectedListener(IPushChannelDisconnectedListener iPushChannelDisconnectedListener) {
        this._disconnectListeners.add(iPushChannelDisconnectedListener);
    }

    public Duration getDefaultPollingInterval() {
        return this._defaultPollingInterval;
    }

    public Duration getMaxTimeLag() {
        return this._maxTimeLag;
    }

    public <EventType> TimerPushChannel<EventType> installPush(Component component, IPushEventHandler<EventType> iPushEventHandler, Duration duration) {
        TimerPushBehavior _findPushBehaviour = _findPushBehaviour(component);
        if (_findPushBehaviour == null) {
            _findPushBehaviour = new TimerPushBehavior(duration);
            component.add(_findPushBehaviour);
        }
        TimerPushChannel<EventType> addPushChannel = _findPushBehaviour.addPushChannel(iPushEventHandler, duration);
        _onConnect(addPushChannel);
        return addPushChannel;
    }

    @Override // org.wicketstuff.push.IPushService
    public <EventType> TimerPushChannel<EventType> installPushChannel(Component component, IPushEventHandler<EventType> iPushEventHandler) {
        return installPush(component, iPushEventHandler, this._defaultPollingInterval);
    }

    @Override // org.wicketstuff.push.IPushService
    public boolean isConnected(IPushChannel<?> iPushChannel) {
        if (!(iPushChannel instanceof TimerPushChannel)) {
            LOG.warn("Unsupported push channel type {}", iPushChannel);
            return false;
        }
        PushChannelState pushChannelState = this._channelStates.get(iPushChannel);
        if (pushChannelState == null) {
            return false;
        }
        if (!Time.now().subtract(pushChannelState.lastPolledAt).greaterThan(this._maxTimeLag)) {
            return true;
        }
        onDisconnect(pushChannelState.channel);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onDisconnect(TimerPushChannel<?> timerPushChannel) {
        if (this._channelStates.remove(timerPushChannel) != null) {
            LOG.debug("Timer push channel {} disconnected.", timerPushChannel);
            for (IPushChannelDisconnectedListener iPushChannelDisconnectedListener : this._disconnectListeners) {
                try {
                    iPushChannelDisconnectedListener.onDisconnect(timerPushChannel);
                } catch (RuntimeException e) {
                    LOG.error("Failed to notify " + iPushChannelDisconnectedListener, (Throwable) e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <EventType> List<EventType> pollEvents(TimerPushChannel<EventType> timerPushChannel) {
        List<EventType> list;
        PushChannelState pushChannelState = this._channelStates.get(timerPushChannel);
        if (pushChannelState == null) {
            LOG.debug("Reconnecting push channel {}...", timerPushChannel);
            _onConnect(timerPushChannel);
            return Collections.EMPTY_LIST;
        }
        pushChannelState.lastPolledAt = Time.now();
        if (pushChannelState.queuedEvents.size() == 0) {
            return Collections.EMPTY_LIST;
        }
        synchronized (pushChannelState.queuedEventsLock) {
            list = (List<EventType>) pushChannelState.queuedEvents;
            pushChannelState.queuedEvents = new ArrayList(2);
        }
        return list;
    }

    @Override // org.wicketstuff.push.IPushService
    public <EventType> void publish(IPushChannel<EventType> iPushChannel, EventType eventtype) {
        PushChannelState pushChannelState;
        if (!(iPushChannel instanceof TimerPushChannel)) {
            LOG.warn("Unsupported push channel type {}", iPushChannel);
        } else {
            if (!isConnected(iPushChannel) || (pushChannelState = this._channelStates.get(iPushChannel)) == null) {
                return;
            }
            synchronized (pushChannelState.queuedEventsLock) {
                pushChannelState.queuedEvents.add(eventtype);
            }
        }
    }

    @Override // org.wicketstuff.push.IPushService
    public void removePushChannelDisconnectedListener(IPushChannelDisconnectedListener iPushChannelDisconnectedListener) {
        this._disconnectListeners.remove(iPushChannelDisconnectedListener);
    }

    public void setCleanupInterval(Duration duration) {
        synchronized (this) {
            if (this._cleanupFuture != null) {
                this._cleanupFuture.cancel(false);
            }
            this._cleanupFuture = this._cleanupExecutor.scheduleAtFixedRate(this._cleanupTask, duration.getMilliseconds(), duration.getMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    public void setDefaultPollingInterval(Duration duration) {
        this._defaultPollingInterval = duration;
    }

    public void setMaxTimeLag(Duration duration) {
        this._maxTimeLag = duration;
    }

    @Override // org.wicketstuff.push.IPushService
    public void uninstallPushChannel(Component component, IPushChannel<?> iPushChannel) {
        if (!(iPushChannel instanceof TimerPushChannel)) {
            LOG.warn("Unsupported push channel type {}", iPushChannel);
            return;
        }
        TimerPushBehavior _findPushBehaviour = _findPushBehaviour(component);
        if (_findPushBehaviour != null && _findPushBehaviour.removePushChannel(iPushChannel) == 0) {
            component.remove(_findPushBehaviour);
        }
    }
}
