package de.codecentric.boot.admin.server.services;

import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceRegisteredEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceRegistrationUpdatedEvent;
import de.codecentric.boot.admin.server.domain.values.InstanceId;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;
import reactor.retry.Retry;

/* loaded from: input_file:BOOT-INF/lib/spring-boot-admin-server-2.0.6.jar:de/codecentric/boot/admin/server/services/StatusUpdateTrigger.class */
public class StatusUpdateTrigger extends ResubscribingEventHandler<InstanceEvent> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StatusUpdateTrigger.class);
    private final StatusUpdater statusUpdater;
    private Map<InstanceId, Instant> lastQueried;
    private Duration updateInterval;
    private Duration statusLifetime;
    private Disposable intervalSubscription;

    public StatusUpdateTrigger(StatusUpdater statusUpdater, Publisher<InstanceEvent> publisher) {
        super(publisher, InstanceEvent.class);
        this.lastQueried = new HashMap();
        this.updateInterval = Duration.ofSeconds(10L);
        this.statusLifetime = Duration.ofSeconds(10L);
        this.statusUpdater = statusUpdater;
    }

    @Override // de.codecentric.boot.admin.server.services.ResubscribingEventHandler
    public void start() {
        super.start();
        this.intervalSubscription = Flux.interval(this.updateInterval).doOnSubscribe(subscription -> {
            log.debug("Scheduled status update every {}", this.updateInterval);
        }).log(log.getName(), Level.FINEST, new SignalType[0]).subscribeOn(Schedulers.newSingle("status-monitor")).flatMap(l -> {
            return updateStatusForAllInstances();
        }).retryWhen(Retry.any().retryMax(Integer.MAX_VALUE).doOnRetry(retryContext -> {
            log.error("Resubscribing after uncaught error", retryContext.exception());
        })).subscribe();
    }

    @Override // de.codecentric.boot.admin.server.services.ResubscribingEventHandler
    protected Publisher<?> handle(Flux<InstanceEvent> flux) {
        return flux.subscribeOn(Schedulers.newSingle("status-updater")).filter(instanceEvent -> {
            return (instanceEvent instanceof InstanceRegisteredEvent) || (instanceEvent instanceof InstanceRegistrationUpdatedEvent);
        }).flatMap(instanceEvent2 -> {
            return updateStatus(instanceEvent2.getInstance());
        });
    }

    @Override // de.codecentric.boot.admin.server.services.ResubscribingEventHandler
    public void stop() {
        super.stop();
        if (this.intervalSubscription != null) {
            this.intervalSubscription.dispose();
        }
    }

    protected Mono<Void> updateStatusForAllInstances() {
        log.debug("Updating status for all instances");
        Instant minus = Instant.now().minus((TemporalAmount) this.statusLifetime);
        return Flux.fromIterable(this.lastQueried.entrySet()).filter(entry -> {
            return ((Instant) entry.getValue()).isBefore(minus);
        }).map((v0) -> {
            return v0.getKey();
        }).flatMap(this::updateStatus).then();
    }

    protected Mono<Void> updateStatus(InstanceId instanceId) {
        return this.statusUpdater.updateStatus(instanceId).doFinally(signalType -> {
            this.lastQueried.put(instanceId, Instant.now());
        });
    }

    public void setUpdateInterval(Duration duration) {
        this.updateInterval = duration;
    }

    public void setStatusLifetime(Duration duration) {
        this.statusLifetime = duration;
    }
}
