package org.jetlinks.core.defaults;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.jetlinks.core.device.DeviceMessageSender;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.FunctionInvokeMessageSender;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.ReadPropertyMessageSender;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.message.WritePropertyMessageSender;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/core/defaults/DefaultDeviceMessageSender.class */
public class DefaultDeviceMessageSender implements DeviceMessageSender {
    private static final Logger log = LoggerFactory.getLogger(DefaultDeviceMessageSender.class);
    private DeviceOperationBroker handler;
    private DeviceOperator operator;
    private long defaultTimeout = TimeUnit.SECONDS.toMillis(10);

    public DefaultDeviceMessageSender(DeviceOperationBroker deviceOperationBroker, DeviceOperator deviceOperator) {
        this.handler = deviceOperationBroker;
        this.operator = deviceOperator;
    }

    @Override // org.jetlinks.core.device.DeviceMessageSender
    public <R extends DeviceMessageReply> Flux<R> send(Publisher<RepayableDeviceMessage<R>> publisher) {
        return send(publisher, this::convertReply);
    }

    protected <T extends DeviceMessageReply> T convertReply(Object obj) {
        if (!(obj instanceof DeviceMessageReply)) {
            throw new DeviceOperationException(ErrorCode.SYSTEM_ERROR, new ClassCastException("can not cast " + obj.getClass() + " to DeviceMessageReply"));
        }
        T t = (T) obj;
        if (!t.isSuccess()) {
            ErrorCode.of(t.getCode()).map(DeviceOperationException::new).ifPresent(deviceOperationException -> {
                throw deviceOperationException;
            });
        }
        return t;
    }

    private <R extends DeviceMessage> Flux<R> logReply(DeviceMessage deviceMessage, Flux<R> flux) {
        return log.isDebugEnabled() ? flux.doOnNext(deviceMessage2 -> {
            if (log.isDebugEnabled()) {
                log.debug("receive device[{}] message[{}]: {}", new Object[]{this.operator.getDeviceId(), deviceMessage2.getMessageId(), deviceMessage2});
            }
        }).doOnComplete(() -> {
            if (log.isDebugEnabled()) {
                log.debug("complete receive device[{}] message[{}]", this.operator.getDeviceId(), deviceMessage.getMessageId());
            }
        }).doOnCancel(() -> {
            if (log.isDebugEnabled()) {
                log.debug("cancel receive device[{}] message[{}]", this.operator.getDeviceId(), deviceMessage.getMessageId());
            }
        }) : flux;
    }

    @Override // org.jetlinks.core.device.DeviceMessageSender
    public <R extends DeviceMessage> Flux<R> send(Publisher<? extends DeviceMessage> publisher, Function<Object, R> function) {
        return Mono.zip(this.operator.getConnectionServerId().defaultIfEmpty("-"), this.operator.getProtocol().flatMap((v0) -> {
            return v0.getSenderInterceptor();
        }).defaultIfEmpty(DeviceMessageSenderInterceptor.DO_NOTING)).flatMapMany(tuple2 -> {
            String str = (String) tuple2.getT1();
            DeviceMessageSenderInterceptor deviceMessageSenderInterceptor = (DeviceMessageSenderInterceptor) tuple2.getT2();
            return Flux.from(publisher).flatMap(deviceMessage -> {
                return deviceMessageSenderInterceptor.preSend(this.operator, deviceMessage);
            }).concatMap(deviceMessage2 -> {
                if ("-".equals(str)) {
                    return deviceMessageSenderInterceptor.afterSent(this.operator, deviceMessage2, Flux.error(new DeviceOperationException(ErrorCode.CLIENT_OFFLINE)));
                }
                return (Publisher) this.handler.send(str, Mono.just(deviceMessage2)).defaultIfEmpty(-1).flatMap(num -> {
                    if (num.intValue() == 0) {
                        return this.operator.checkState().then(Mono.error(new DeviceOperationException(ErrorCode.CLIENT_OFFLINE)));
                    }
                    if (num.intValue() == -1) {
                        return Mono.error(new DeviceOperationException(ErrorCode.CLIENT_OFFLINE));
                    }
                    log.debug("send device[{}] message complete", this.operator.getDeviceId());
                    return Mono.just(true);
                }).thenMany((Flux) this.handler.handleReply(deviceMessage2.getMessageId(), Duration.ofMillis(((Long) deviceMessage2.getHeader(Headers.timeout).orElse(Long.valueOf(this.defaultTimeout))).longValue())).map(function).onErrorResume(DeviceOperationException.class, deviceOperationException -> {
                    return deviceOperationException.getCode() == ErrorCode.CLIENT_OFFLINE ? this.operator.checkState().then(Mono.error(deviceOperationException)) : Mono.error(deviceOperationException);
                }).onErrorMap(TimeoutException.class, timeoutException -> {
                    return new DeviceOperationException(ErrorCode.TIME_OUT, timeoutException);
                }).as(flux -> {
                    return logReply(deviceMessage2, flux);
                })).as(flux2 -> {
                    return deviceMessageSenderInterceptor.afterSent(this.operator, deviceMessage2, flux2);
                });
            });
        });
    }

    @Override // org.jetlinks.core.device.DeviceMessageSender
    public FunctionInvokeMessageSender invokeFunction(String str) {
        return new DefaultFunctionInvokeMessageSender(this.operator, str);
    }

    @Override // org.jetlinks.core.device.DeviceMessageSender
    public ReadPropertyMessageSender readProperty(String... strArr) {
        return new DefaultReadPropertyMessageSender(this.operator).read(strArr);
    }

    @Override // org.jetlinks.core.device.DeviceMessageSender
    public WritePropertyMessageSender writeProperty() {
        return new DefaultWritePropertyMessageSender(this.operator);
    }

    public void setDefaultTimeout(long j) {
        this.defaultTimeout = j;
    }

    public long getDefaultTimeout() {
        return this.defaultTimeout;
    }
}
