package org.jetlinks.core.device;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.server.MessageHandler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;

/* loaded from: input_file:org/jetlinks/core/device/StandaloneDeviceMessageBroker.class */
public class StandaloneDeviceMessageBroker implements DeviceOperationBroker, MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(StandaloneDeviceMessageBroker.class);
    private FluxProcessor<Message, Message> messageEmitterProcessor;
    private Map<String, FluxProcessor<DeviceMessageReply, DeviceMessageReply>> replyProcessor;
    private Map<String, AtomicInteger> partCache;
    private ReplyFailureHandler replyFailureHandler;
    private Map<String, Function<Publisher<String>, Flux<DeviceStateInfo>>> stateHandler;

    public StandaloneDeviceMessageBroker() {
        this(EmitterProcessor.create(false));
    }

    public StandaloneDeviceMessageBroker(FluxProcessor<Message, Message> fluxProcessor) {
        this.replyProcessor = new ConcurrentHashMap();
        this.partCache = new ConcurrentHashMap();
        this.replyFailureHandler = (th, deviceMessageReply) -> {
            log.warn("unhandled reply message:{}", deviceMessageReply, th);
        };
        this.stateHandler = new ConcurrentHashMap();
        this.messageEmitterProcessor = fluxProcessor;
    }

    @Override // org.jetlinks.core.server.MessageHandler
    public Flux<Message> handleSendToDeviceMessage(String str) {
        return this.messageEmitterProcessor.map(Function.identity());
    }

    @Override // org.jetlinks.core.server.MessageHandler
    public void handleGetDeviceState(String str, Function<Publisher<String>, Flux<DeviceStateInfo>> function) {
        this.stateHandler.put(str, function);
    }

    @Override // org.jetlinks.core.device.DeviceOperationBroker
    public Flux<DeviceStateInfo> getDeviceState(String str, Collection<String> collection) {
        return Mono.justOrEmpty(this.stateHandler.get(str)).flatMapMany(function -> {
            return (Flux) function.apply(Flux.fromIterable(collection));
        });
    }

    @Override // org.jetlinks.core.server.MessageHandler
    public Mono<Boolean> reply(DeviceMessageReply deviceMessageReply) {
        return Mono.defer(() -> {
            String messageId = deviceMessageReply.getMessageId();
            String str = (String) deviceMessageReply.getHeader(Headers.fragmentBodyMessageId).orElse(null);
            if (str == null) {
                FluxProcessor<DeviceMessageReply, DeviceMessageReply> fluxProcessor = this.replyProcessor.get(messageId);
                if (fluxProcessor == null || fluxProcessor.isDisposed()) {
                    this.replyProcessor.remove(messageId);
                    this.replyFailureHandler.handle(new NullPointerException("no reply handler"), deviceMessageReply);
                    return Mono.just(false);
                }
                fluxProcessor.onNext(deviceMessageReply);
                fluxProcessor.onComplete();
                return Mono.just(true);
            }
            FluxProcessor<DeviceMessageReply, DeviceMessageReply> orDefault = this.replyProcessor.getOrDefault(str, this.replyProcessor.get(messageId));
            if (orDefault == null || orDefault.isDisposed()) {
                this.replyFailureHandler.handle(new NullPointerException("no reply handler"), deviceMessageReply);
                this.replyProcessor.remove(str);
                return Mono.just(false);
            }
            int intValue = ((Integer) deviceMessageReply.getHeader(Headers.fragmentNumber).orElse(1)).intValue();
            AtomicInteger computeIfAbsent = this.partCache.computeIfAbsent(str, str2 -> {
                return new AtomicInteger(intValue);
            });
            orDefault.onNext(deviceMessageReply);
            if (computeIfAbsent.decrementAndGet() <= 0) {
                orDefault.onComplete();
                this.replyProcessor.remove(str);
            }
            return Mono.just(true);
        }).doOnError(th -> {
            this.replyFailureHandler.handle(th, deviceMessageReply);
        });
    }

    @Override // org.jetlinks.core.device.DeviceOperationBroker
    public Flux<DeviceMessageReply> handleReply(String str, Duration duration) {
        return this.replyProcessor.computeIfAbsent(str, str2 -> {
            return UnicastProcessor.create();
        }).timeout(duration, Mono.error(() -> {
            return new DeviceOperationException(ErrorCode.TIME_OUT);
        })).doFinally(signalType -> {
            this.replyProcessor.remove(str);
        });
    }

    @Override // org.jetlinks.core.device.DeviceOperationBroker
    public Mono<Integer> send(String str, Publisher<? extends Message> publisher) {
        if (!this.messageEmitterProcessor.hasDownstreams()) {
            return Mono.just(0);
        }
        Flux from = Flux.from(publisher);
        FluxProcessor<Message, Message> fluxProcessor = this.messageEmitterProcessor;
        fluxProcessor.getClass();
        return from.doOnNext((v1) -> {
            r1.onNext(v1);
        }).then(Mono.just(Integer.valueOf(Long.valueOf(this.messageEmitterProcessor.downstreamCount()).intValue())));
    }

    @Override // org.jetlinks.core.device.DeviceOperationBroker
    public Mono<Integer> send(Publisher<? extends BroadcastMessage> publisher) {
        return Mono.just(0);
    }

    public void setReplyFailureHandler(ReplyFailureHandler replyFailureHandler) {
        this.replyFailureHandler = replyFailureHandler;
    }
}
